001/*
002 * Copyright 2022-2026 Revetware LLC.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.soklet;
018
019import org.jspecify.annotations.NonNull;
020import org.jspecify.annotations.Nullable;
021
022import javax.annotation.concurrent.NotThreadSafe;
023import java.time.Duration;
024import java.util.Optional;
025import java.util.concurrent.ExecutorService;
026import java.util.function.Consumer;
027import java.util.function.Supplier;
028
029import static java.util.Objects.requireNonNull;
030
031/**
032 * A special HTTP server whose only purpose is to provide <a href="https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events">Server-Sent Event</a> functionality.
033 * <p>
034 * A Soklet application which supports Server-Sent Events will be configured with a {@link SseServer}.
035 * A regular {@link HttpServer} is only required if the same application also serves ordinary HTTP <em>Resource Methods</em>.
036 * <p>
037 * For example:
038 * <pre>{@code // Set up our SSE server
039 * SseServer sseServer = SseServer.fromPort(8081);
040 *
041 * // Wire the SSE server into our config
042 * SokletConfig config = SokletConfig.withSseServer(sseServer)
043 *   .build();
044 *
045 * // Add .httpServer(HttpServer.fromPort(8080)) if you also serve ordinary HTTP resources
046 *
047 * // Run the app
048 * try (Soklet soklet = Soklet.fromConfig(config)) {
049 *   soklet.start();
050 *   System.out.println("Soklet started, press [enter] to exit");
051 *   soklet.awaitShutdown(ShutdownTrigger.ENTER_KEY);
052 * }}</pre>
053 * <p>
054 * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation.
055 *
056 * @author <a href="https://www.revetkn.com">Mark Allen</a>
057 */
058public interface SseServer extends AutoCloseable {
059        /**
060         * Starts the SSE server, which makes it able to accept requests from clients.
061         * <p>
062         * If the server is already started, no action is taken.
063         * <p>
064         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
065         */
066        void start();
067
068        /**
069         * Stops the SSE server, which makes it unable to accept requests from clients.
070         * <p>
071         * If the server is already stopped, no action is taken.
072         * <p>
073         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
074         */
075        void stop();
076
077        /**
078         * Is this SSE server started (that is, able to handle requests from clients)?
079         *
080         * @return {@code true} if the server is started, {@code false} otherwise
081         */
082        @NonNull
083        Boolean isStarted();
084
085        /**
086         * {@link AutoCloseable}-enabled synonym for {@link #stop()}.
087         * <p>
088         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
089         *
090         * @throws Exception if an exception occurs while stopping the server
091         */
092        @Override
093        default void close() throws Exception {
094                stop();
095        }
096
097        /**
098         * Given a {@link ResourcePath} that corresponds to a <em>Resource Method</em> annotated with {@link com.soklet.annotation.SseEventSource}, acquire a {@link SseBroadcaster} which is capable of "pushing" messages to all connected Server-Sent Event clients.
099         * <p>
100         * When using the default {@link SseServer}, Soklet guarantees exactly one {@link SseBroadcaster} instance exists per {@link ResourcePath} (within the same JVM process).  Soklet is responsible for the creation and management of {@link SseBroadcaster} instances.
101         * <p>
102         * Your code should not hold long-lived references to {@link SseBroadcaster} instances (e.g. in a cache or instance variables) - the recommended usage pattern is to invoke {@link #acquireBroadcaster(ResourcePath)} every time you need a broadcaster reference.
103         * <p>
104         * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation.
105         *
106         * @param resourcePath the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em> for which to acquire a broadcaster
107         * @return a broadcaster for the given {@link ResourcePath}, or {@link Optional#empty()} if there is no broadcaster available
108         */
109        @NonNull
110        Optional<? extends SseBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath);
111
112        /**
113         * The {@link com.soklet.Soklet} instance which manages this {@link SseServer} will invoke this method exactly once at initialization time - this allows {@link com.soklet.Soklet} to "talk" to your {@link SseServer}.
114         * <p>
115         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
116         *
117         * @param sokletConfig   configuration for the Soklet instance that controls this server
118         * @param requestHandler a {@link com.soklet.Soklet}-internal request handler which takes a {@link SseServer}-provided request as input and supplies a {@link MarshaledResponse} as output for the {@link SseServer} to write back to the client
119         */
120        void initialize(@NonNull SokletConfig sokletConfig,
121                                                                        @NonNull RequestHandler requestHandler);
122
123        /**
124         * Request/response processing contract for {@link SseServer} implementations.
125         * <p>
126         * This is used internally by {@link com.soklet.Soklet} instances to "talk" to a {@link SseServer} via {@link SseServer#initialize(SokletConfig, RequestHandler)}.
127         * It's the responsibility of the {@link SseServer} to implement HTTP mechanics: read bytes from the request, write bytes to the response, and so forth.
128         * <p>
129         * <strong>Most Soklet applications will use Soklet's default {@link SseServer} implementation and therefore do not need to implement this interface directly.</strong>
130         *
131         * @author <a href="https://www.revetkn.com">Mark Allen</a>
132         */
133        @FunctionalInterface
134        interface RequestHandler {
135                /**
136                 * Callback to be invoked by a {@link SseServer} implementation after it has received a Server-Sent Event Source HTTP request but prior to writing initial data to the HTTP response.
137                 * <p>
138                 * <strong>Note: this method is only invoked during the initial request "handshake" - it is not called for subsequent Server-Sent Event stream writes performed via {@link SseBroadcaster#broadcastEvent(SseEvent)} invocations.</strong>
139                 * <p>
140                 * For example, when a Server-Sent Event Source HTTP request is received, you might immediately write an HTTP 200 OK response if all looks good, or reject with a 401 due to invalid credentials.
141                 * That is the extent of the request-handling logic performed here.  The Server-Sent Event stream then remains open and can be written to via {@link SseBroadcaster#broadcastEvent(SseEvent)}.
142                 * <p>
143                 * The {@link SseServer} is responsible for converting its internal request representation into a {@link Request}, which a {@link com.soklet.Soklet} instance consumes and performs Soklet application request processing logic.
144                 * <p>
145                 * The {@link com.soklet.Soklet} instance will generate a {@link MarshaledResponse} for the request, which it "hands back" to the {@link SseServer} to be sent over the wire to the client.
146                 *
147                 * @param request               a Soklet {@link Request} representation of the {@link SseServer}'s internal HTTP request data
148                 * @param requestResultConsumer invoked by {@link com.soklet.Soklet} when it's time for the {@link SseServer} to write HTTP response data to the client
149                 */
150                void handleRequest(@NonNull Request request,
151                                                                                         @NonNull Consumer<HttpRequestResult> requestResultConsumer);
152        }
153
154        /**
155         * Acquires a builder for {@link SseServer} instances.
156         *
157         * @param port the port number on which the server should listen
158         * @return the builder
159         */
160        @NonNull
161        static Builder withPort(@NonNull Integer port) {
162                requireNonNull(port);
163                return new Builder(port);
164        }
165
166        /**
167         * Creates a {@link SseServer} configured with the given port and default settings.
168         *
169         * @param port the port number on which the server should listen
170         * @return a {@link SseServer} instance
171         */
172        @NonNull
173        static SseServer fromPort(@NonNull Integer port) {
174                return withPort(port).build();
175        }
176
177        /**
178         * Builder used to construct a standard implementation of {@link SseServer}.
179         * <p>
180         * This class is intended for use by a single thread.
181         *
182         * @author <a href="https://www.revetkn.com">Mark Allen</a>
183         */
184        @NotThreadSafe
185        final class Builder {
186                @NonNull
187                Integer port;
188                @Nullable
189                String host;
190                @Nullable
191                Duration requestTimeout;
192                @Nullable
193                Duration requestHandlerTimeout;
194                @Nullable
195                Integer requestHandlerConcurrency;
196                @Nullable
197                Integer requestHandlerQueueCapacity;
198                @Nullable
199                Duration writeTimeout;
200                @Nullable
201                Duration shutdownTimeout;
202                @Nullable
203                Duration heartbeatInterval;
204                @Nullable
205                Integer maximumRequestSizeInBytes;
206                @Nullable
207                Integer requestReadBufferSizeInBytes;
208                @Nullable
209                Supplier<ExecutorService> requestHandlerExecutorServiceSupplier;
210                @Nullable
211                Integer concurrentConnectionLimit;
212                @Nullable
213                Integer broadcasterCacheCapacity;
214                @Nullable
215                Integer resourcePathCacheCapacity;
216                @Nullable
217                Integer connectionQueueCapacity;
218                @Nullable
219                Boolean verifyConnectionOnceEstablished;
220                @Nullable
221                IdGenerator<?> idGenerator;
222
223                @NonNull
224                protected Builder(@NonNull Integer port) {
225                        requireNonNull(port);
226                        this.port = port;
227                }
228
229                @NonNull
230                public Builder port(@NonNull Integer port) {
231                        requireNonNull(port);
232                        this.port = port;
233                        return this;
234                }
235
236                @NonNull
237                public Builder host(@Nullable String host) {
238                        this.host = host;
239                        return this;
240                }
241
242                @NonNull
243                public Builder requestTimeout(@Nullable Duration requestTimeout) {
244                        this.requestTimeout = requestTimeout;
245                        return this;
246                }
247
248                @NonNull
249                public Builder requestHandlerTimeout(@Nullable Duration requestHandlerTimeout) {
250                        this.requestHandlerTimeout = requestHandlerTimeout;
251                        return this;
252                }
253
254                @NonNull
255                public Builder requestHandlerConcurrency(@Nullable Integer requestHandlerConcurrency) {
256                        this.requestHandlerConcurrency = requestHandlerConcurrency;
257                        return this;
258                }
259
260                @NonNull
261                public Builder requestHandlerQueueCapacity(@Nullable Integer requestHandlerQueueCapacity) {
262                        this.requestHandlerQueueCapacity = requestHandlerQueueCapacity;
263                        return this;
264                }
265
266                @NonNull
267                public Builder writeTimeout(@Nullable Duration writeTimeout) {
268                        this.writeTimeout = writeTimeout;
269                        return this;
270                }
271
272                @NonNull
273                public Builder shutdownTimeout(@Nullable Duration shutdownTimeout) {
274                        this.shutdownTimeout = shutdownTimeout;
275                        return this;
276                }
277
278                @NonNull
279                public Builder heartbeatInterval(@Nullable Duration heartbeatInterval) {
280                        this.heartbeatInterval = heartbeatInterval;
281                        return this;
282                }
283
284                @NonNull
285                public Builder maximumRequestSizeInBytes(@Nullable Integer maximumRequestSizeInBytes) {
286                        this.maximumRequestSizeInBytes = maximumRequestSizeInBytes;
287                        return this;
288                }
289
290                @NonNull
291                public Builder requestReadBufferSizeInBytes(@Nullable Integer requestReadBufferSizeInBytes) {
292                        this.requestReadBufferSizeInBytes = requestReadBufferSizeInBytes;
293                        return this;
294                }
295
296                @NonNull
297                public Builder requestHandlerExecutorServiceSupplier(@Nullable Supplier<ExecutorService> requestHandlerExecutorServiceSupplier) {
298                        this.requestHandlerExecutorServiceSupplier = requestHandlerExecutorServiceSupplier;
299                        return this;
300                }
301
302                @NonNull
303                public Builder concurrentConnectionLimit(@Nullable Integer concurrentConnectionLimit) {
304                        this.concurrentConnectionLimit = concurrentConnectionLimit;
305                        return this;
306                }
307
308                @NonNull
309                public Builder broadcasterCacheCapacity(@Nullable Integer broadcasterCacheCapacity) {
310                        this.broadcasterCacheCapacity = broadcasterCacheCapacity;
311                        return this;
312                }
313
314                @NonNull
315                public Builder resourcePathCacheCapacity(@Nullable Integer resourcePathCacheCapacity) {
316                        this.resourcePathCacheCapacity = resourcePathCacheCapacity;
317                        return this;
318                }
319
320                @NonNull
321                public Builder connectionQueueCapacity(@Nullable Integer connectionQueueCapacity) {
322                        this.connectionQueueCapacity = connectionQueueCapacity;
323                        return this;
324                }
325
326                @NonNull
327                public Builder verifyConnectionOnceEstablished(@Nullable Boolean verifyConnectionOnceEstablished) {
328                        this.verifyConnectionOnceEstablished = verifyConnectionOnceEstablished;
329                        return this;
330                }
331
332                @NonNull
333                public Builder idGenerator(@Nullable IdGenerator<?> idGenerator) {
334                        this.idGenerator = idGenerator;
335                        return this;
336                }
337
338                @NonNull
339                public SseServer build() {
340                        return new DefaultSseServer(this);
341                }
342        }
343}