001/*
002 * Copyright 2022-2026 Revetware LLC.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.soklet;
018
019import org.jspecify.annotations.NonNull;
020import org.jspecify.annotations.Nullable;
021
022import javax.annotation.concurrent.NotThreadSafe;
023import java.time.Duration;
024import java.util.Optional;
025import java.util.concurrent.ExecutorService;
026import java.util.function.Consumer;
027import java.util.function.Supplier;
028
029import static java.util.Objects.requireNonNull;
030
031/**
032 * A special HTTP server whose only purpose is to provide <a href="https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events">Server-Sent Event</a> functionality.
033 * <p>
034 * A Soklet application which supports Server-Sent Events will be configured with a {@link SseServer}.
035 * A regular {@link HttpServer} is only required if the same application also serves ordinary HTTP <em>Resource Methods</em>.
036 * <p>
037 * For example:
038 * <pre>{@code // Set up our SSE server
039 * SseServer sseServer = SseServer.fromPort(8081);
040 *
041 * // Wire the SSE server into our config
042 * SokletConfig config = SokletConfig.withSseServer(sseServer)
043 *   .build();
044 *
045 * // Add .httpServer(HttpServer.fromPort(8080)) if you also serve ordinary HTTP resources
046 *
047 * // Run the app
048 * try (Soklet soklet = Soklet.fromConfig(config)) {
049 *   soklet.start();
050 *   System.out.println("Soklet started, press [enter] to exit");
051 *   soklet.awaitShutdown(ShutdownTrigger.ENTER_KEY);
052 * }}</pre>
053 * <p>
054 * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation.
055 *
056 * @author <a href="https://www.revetkn.com">Mark Allen</a>
057 */
058public interface SseServer extends AutoCloseable {
059        /**
060         * Starts the SSE server, which makes it able to accept requests from clients.
061         * <p>
062         * If the server is already started, no action is taken.
063         * <p>
064         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
065         */
066        void start();
067
068        /**
069         * Stops the SSE server, which makes it unable to accept requests from clients.
070         * <p>
071         * If the server is already stopped, no action is taken.
072         * <p>
073         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
074         */
075        void stop();
076
077        /**
078         * Is this SSE server started (that is, able to handle requests from clients)?
079         *
080         * @return {@code true} if the server is started, {@code false} otherwise
081         */
082        @NonNull
083        Boolean isStarted();
084
085        /**
086         * {@link AutoCloseable}-enabled synonym for {@link #stop()}.
087         * <p>
088         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
089         *
090         * @throws Exception if an exception occurs while stopping the server
091         */
092        @Override
093        default void close() throws Exception {
094                stop();
095        }
096
097        /**
098         * Given a {@link ResourcePath} that corresponds to a <em>Resource Method</em> annotated with {@link com.soklet.annotation.SseEventSource}, acquire a {@link SseBroadcaster} which is capable of "pushing" messages to all connected Server-Sent Event clients.
099         * <p>
100         * When using the default {@link SseServer}, Soklet guarantees exactly one {@link SseBroadcaster} instance exists per {@link ResourcePath} (within the same JVM process).  Soklet is responsible for the creation and management of {@link SseBroadcaster} instances.
101         * <p>
102         * Your code should not hold long-lived references to {@link SseBroadcaster} instances (e.g. in a cache or instance variables) - the recommended usage pattern is to invoke {@link #acquireBroadcaster(ResourcePath)} every time you need a broadcaster reference.
103         * <p>
104         * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation.
105         *
106         * @param resourcePath the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em> for which to acquire a broadcaster
107         * @return a broadcaster for the given {@link ResourcePath}, or {@link Optional#empty()} if there is no broadcaster available
108         */
109        @NonNull
110        Optional<? extends SseBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath);
111
112        /**
113         * The {@link com.soklet.Soklet} instance which manages this {@link SseServer} will invoke this method exactly once at initialization time - this allows {@link com.soklet.Soklet} to "talk" to your {@link SseServer}.
114         * <p>
115         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
116         *
117         * @param sokletConfig   configuration for the Soklet instance that controls this server
118         * @param requestHandler a {@link com.soklet.Soklet}-internal request handler which takes a {@link SseServer}-provided request as input and supplies a {@link MarshaledResponse} as output for the {@link SseServer} to write back to the client
119         */
120        void initialize(@NonNull SokletConfig sokletConfig,
121                                                                        @NonNull RequestHandler requestHandler);
122
123        /**
124         * Request/response processing contract for {@link SseServer} implementations.
125         * <p>
126         * This is used internally by {@link com.soklet.Soklet} instances to "talk" to a {@link SseServer} via {@link SseServer#initialize(SokletConfig, RequestHandler)}.
127         * It's the responsibility of the {@link SseServer} to implement HTTP mechanics: read bytes from the request, write bytes to the response, and so forth.
128         * <p>
129         * <strong>Most Soklet applications will use Soklet's default {@link SseServer} implementation and therefore do not need to implement this interface directly.</strong>
130         *
131         * @author <a href="https://www.revetkn.com">Mark Allen</a>
132         */
133        @FunctionalInterface
134        interface RequestHandler {
135                /**
136                 * Callback to be invoked by a {@link SseServer} implementation after it has received a Server-Sent Event Source HTTP request but prior to writing initial data to the HTTP response.
137                 * <p>
138                 * <strong>Note: this method is only invoked during the initial request "handshake" - it is not called for subsequent Server-Sent Event stream writes performed via {@link SseBroadcaster#broadcastEvent(SseEvent)} invocations.</strong>
139                 * <p>
140                 * For example, when a Server-Sent Event Source HTTP request is received, you might immediately write an HTTP 200 OK response if all looks good, or reject with a 401 due to invalid credentials.
141                 * That is the extent of the request-handling logic performed here.  The Server-Sent Event stream then remains open and can be written to via {@link SseBroadcaster#broadcastEvent(SseEvent)}.
142                 * <p>
143                 * The {@link SseServer} is responsible for converting its internal request representation into a {@link Request}, which a {@link com.soklet.Soklet} instance consumes and performs Soklet application request processing logic.
144                 * <p>
145                 * The {@link com.soklet.Soklet} instance will generate a {@link MarshaledResponse} for the request, which it "hands back" to the {@link SseServer} to be sent over the wire to the client.
146                 *
147                 * @param request               a Soklet {@link Request} representation of the {@link SseServer}'s internal HTTP request data
148                 * @param requestResultConsumer invoked by {@link com.soklet.Soklet} when it's time for the {@link SseServer} to write HTTP response data to the client
149                 */
150                void handleRequest(@NonNull Request request,
151                                                                                         @NonNull Consumer<HttpRequestResult> requestResultConsumer);
152        }
153
154        /**
155         * Acquires a builder for {@link SseServer} instances.
156         *
157         * @param port the port number on which the server should listen
158         * @return the builder
159         */
160        @NonNull
161        static Builder withPort(@NonNull Integer port) {
162                requireNonNull(port);
163                return new Builder(port);
164        }
165
166        /**
167         * Creates a {@link SseServer} configured with the given port and default settings.
168         *
169         * @param port the port number on which the server should listen
170         * @return a {@link SseServer} instance
171         */
172        @NonNull
173        static SseServer fromPort(@NonNull Integer port) {
174                return withPort(port).build();
175        }
176
177        /**
178         * Builder used to construct a standard implementation of {@link SseServer}.
179         * <p>
180         * This class is intended for use by a single thread.
181         *
182         * @author <a href="https://www.revetkn.com">Mark Allen</a>
183         */
184        @NotThreadSafe
185        final class Builder {
186                @NonNull
187                Integer port;
188                @Nullable
189                String host;
190                @Nullable
191                Duration requestHeaderTimeout;
192                @Nullable
193                Duration requestHandlerTimeout;
194                @Nullable
195                Integer requestHandlerConcurrency;
196                @Nullable
197                Integer requestHandlerQueueCapacity;
198                @Nullable
199                Duration writeTimeout;
200                @Nullable
201                Duration shutdownTimeout;
202                @Nullable
203                Duration heartbeatInterval;
204                @Nullable
205                Integer maximumRequestSizeInBytes;
206                @Nullable
207                Integer maximumHeaderCount;
208                @Nullable
209                Integer maximumRequestTargetLengthInBytes;
210                @Nullable
211                Integer requestReadBufferSizeInBytes;
212                @Nullable
213                Supplier<ExecutorService> requestHandlerExecutorServiceSupplier;
214                @Nullable
215                Integer concurrentConnectionLimit;
216                @Nullable
217                Integer broadcasterCacheCapacity;
218                @Nullable
219                Integer resourcePathCacheCapacity;
220                @Nullable
221                Integer connectionQueueCapacity;
222                @Nullable
223                Boolean verifyConnectionOnceEstablished;
224                @Nullable
225                IdGenerator<?> idGenerator;
226
227                @NonNull
228                protected Builder(@NonNull Integer port) {
229                        requireNonNull(port);
230                        this.port = port;
231                }
232
233                @NonNull
234                public Builder port(@NonNull Integer port) {
235                        requireNonNull(port);
236                        this.port = port;
237                        return this;
238                }
239
240                @NonNull
241                public Builder host(@Nullable String host) {
242                        this.host = host;
243                        return this;
244                }
245
246                /**
247                 * Sets the maximum duration for reading the SSE handshake request line and headers.
248                 * <p>
249                 * If this value is not specified, Soklet uses the server default.
250                 *
251                 * @param requestHeaderTimeout the request header timeout, or {@code null} for the default
252                 * @return this builder
253                 */
254                @NonNull
255                public Builder requestHeaderTimeout(@Nullable Duration requestHeaderTimeout) {
256                        this.requestHeaderTimeout = requestHeaderTimeout;
257                        return this;
258                }
259
260                @NonNull
261                public Builder requestHandlerTimeout(@Nullable Duration requestHandlerTimeout) {
262                        this.requestHandlerTimeout = requestHandlerTimeout;
263                        return this;
264                }
265
266                @NonNull
267                public Builder requestHandlerConcurrency(@Nullable Integer requestHandlerConcurrency) {
268                        this.requestHandlerConcurrency = requestHandlerConcurrency;
269                        return this;
270                }
271
272                @NonNull
273                public Builder requestHandlerQueueCapacity(@Nullable Integer requestHandlerQueueCapacity) {
274                        this.requestHandlerQueueCapacity = requestHandlerQueueCapacity;
275                        return this;
276                }
277
278                @NonNull
279                public Builder writeTimeout(@Nullable Duration writeTimeout) {
280                        this.writeTimeout = writeTimeout;
281                        return this;
282                }
283
284                @NonNull
285                public Builder shutdownTimeout(@Nullable Duration shutdownTimeout) {
286                        this.shutdownTimeout = shutdownTimeout;
287                        return this;
288                }
289
290                @NonNull
291                public Builder heartbeatInterval(@Nullable Duration heartbeatInterval) {
292                        this.heartbeatInterval = heartbeatInterval;
293                        return this;
294                }
295
296                /**
297                 * Sets the maximum accepted SSE handshake request size in bytes.
298                 * <p>
299                 * This limit applies to the whole received handshake request, including request line
300                 * and headers. Established SSE stream writes are governed by the write timeout and
301                 * connection queue capacity settings instead.
302                 *
303                 * @param maximumRequestSizeInBytes the maximum handshake request size, or {@code null} for the default
304                 * @return this builder
305                 */
306                @NonNull
307                public Builder maximumRequestSizeInBytes(@Nullable Integer maximumRequestSizeInBytes) {
308                        this.maximumRequestSizeInBytes = maximumRequestSizeInBytes;
309                        return this;
310                }
311
312                /**
313                 * Sets the maximum number of HTTP header fields accepted in one SSE handshake.
314                 *
315                 * @param maximumHeaderCount the maximum header count, or {@code null} for the default
316                 * @return this builder
317                 */
318                @NonNull
319                public Builder maximumHeaderCount(@Nullable Integer maximumHeaderCount) {
320                        this.maximumHeaderCount = maximumHeaderCount;
321                        return this;
322                }
323
324                /**
325                 * Sets the maximum SSE handshake request-target length accepted in bytes.
326                 *
327                 * @param maximumRequestTargetLengthInBytes the maximum request-target length, or {@code null} for the default
328                 * @return this builder
329                 */
330                @NonNull
331                public Builder maximumRequestTargetLengthInBytes(@Nullable Integer maximumRequestTargetLengthInBytes) {
332                        this.maximumRequestTargetLengthInBytes = maximumRequestTargetLengthInBytes;
333                        return this;
334                }
335
336                @NonNull
337                public Builder requestReadBufferSizeInBytes(@Nullable Integer requestReadBufferSizeInBytes) {
338                        this.requestReadBufferSizeInBytes = requestReadBufferSizeInBytes;
339                        return this;
340                }
341
342                @NonNull
343                public Builder requestHandlerExecutorServiceSupplier(@Nullable Supplier<ExecutorService> requestHandlerExecutorServiceSupplier) {
344                        this.requestHandlerExecutorServiceSupplier = requestHandlerExecutorServiceSupplier;
345                        return this;
346                }
347
348                @NonNull
349                public Builder concurrentConnectionLimit(@Nullable Integer concurrentConnectionLimit) {
350                        this.concurrentConnectionLimit = concurrentConnectionLimit;
351                        return this;
352                }
353
354                @NonNull
355                public Builder broadcasterCacheCapacity(@Nullable Integer broadcasterCacheCapacity) {
356                        this.broadcasterCacheCapacity = broadcasterCacheCapacity;
357                        return this;
358                }
359
360                @NonNull
361                public Builder resourcePathCacheCapacity(@Nullable Integer resourcePathCacheCapacity) {
362                        this.resourcePathCacheCapacity = resourcePathCacheCapacity;
363                        return this;
364                }
365
366                @NonNull
367                public Builder connectionQueueCapacity(@Nullable Integer connectionQueueCapacity) {
368                        this.connectionQueueCapacity = connectionQueueCapacity;
369                        return this;
370                }
371
372                @NonNull
373                public Builder verifyConnectionOnceEstablished(@Nullable Boolean verifyConnectionOnceEstablished) {
374                        this.verifyConnectionOnceEstablished = verifyConnectionOnceEstablished;
375                        return this;
376                }
377
378                @NonNull
379                public Builder idGenerator(@Nullable IdGenerator<?> idGenerator) {
380                        this.idGenerator = idGenerator;
381                        return this;
382                }
383
384                @NonNull
385                public SseServer build() {
386                        return new DefaultSseServer(this);
387                }
388        }
389}