001/*
002 * Copyright 2022-2026 Revetware LLC.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.soklet;
018
019import org.jspecify.annotations.NonNull;
020import org.jspecify.annotations.Nullable;
021
022import javax.annotation.concurrent.NotThreadSafe;
023import java.time.Duration;
024import java.util.Optional;
025import java.util.concurrent.ExecutorService;
026import java.util.function.Consumer;
027import java.util.function.Supplier;
028
029import static java.util.Objects.requireNonNull;
030
031/**
032 * A special HTTP server whose only purpose is to provide <a href="https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events">Server-Sent Event</a> functionality.
033 * <p>
034 * A Soklet application which supports Server-Sent Events will be configured with a {@link SseServer}.
035 * A regular {@link HttpServer} is only required if the same application also serves ordinary HTTP <em>Resource Methods</em>.
036 * <p>
037 * For example:
038 * <pre>{@code // Set up our SSE server
039 * SseServer sseServer = SseServer.fromPort(8081);
040 *
041 * // Wire the SSE server into our config
042 * SokletConfig config = SokletConfig.withSseServer(sseServer)
043 *   .build();
044 *
045 * // Add .httpServer(HttpServer.fromPort(8080)) if you also serve ordinary HTTP resources
046 *
047 * // Run the app
048 * try (Soklet soklet = Soklet.fromConfig(config)) {
049 *   soklet.start();
050 *   System.out.println("Soklet started, press [enter] to exit");
051 *   soklet.awaitShutdown(ShutdownTrigger.ENTER_KEY);
052 * }}</pre>
053 * <p>
054 * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation.
055 *
056 * @author <a href="https://www.revetkn.com">Mark Allen</a>
057 */
058public interface SseServer extends AutoCloseable {
059        /**
060         * Starts the SSE server, which makes it able to accept requests from clients.
061         * <p>
062         * If the server is already started, no action is taken.
063         * <p>
064         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
065         */
066        void start();
067
068        /**
069         * Stops the SSE server, which makes it unable to accept requests from clients.
070         * <p>
071         * If the server is already stopped, no action is taken.
072         * <p>
073         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
074         */
075        void stop();
076
077        /**
078         * Is this SSE server started (that is, able to handle requests from clients)?
079         *
080         * @return {@code true} if the server is started, {@code false} otherwise
081         */
082        @NonNull
083        Boolean isStarted();
084
085        /**
086         * {@link AutoCloseable}-enabled synonym for {@link #stop()}.
087         * <p>
088         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
089         *
090         * @throws Exception if an exception occurs while stopping the server
091         */
092        @Override
093        default void close() throws Exception {
094                stop();
095        }
096
097        /**
098         * Given a {@link ResourcePath} that corresponds to a <em>Resource Method</em> annotated with {@link com.soklet.annotation.SseEventSource}, acquire a {@link SseBroadcaster} which is capable of "pushing" messages to all connected Server-Sent Event clients.
099         * <p>
100         * When using the default {@link SseServer}, Soklet guarantees exactly one {@link SseBroadcaster} instance exists per {@link ResourcePath} (within the same JVM process).  Soklet is responsible for the creation and management of {@link SseBroadcaster} instances.
101         * <p>
102         * Your code should not hold long-lived references to {@link SseBroadcaster} instances (e.g. in a cache or instance variables) - the recommended usage pattern is to invoke {@link #acquireBroadcaster(ResourcePath)} every time you need a broadcaster reference.
103         * <p>
104         * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation.
105         *
106         * @param resourcePath the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em> for which to acquire a broadcaster
107         * @return a broadcaster for the given {@link ResourcePath}, or {@link Optional#empty()} if there is no broadcaster available
108         */
109        @NonNull
110        Optional<? extends SseBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath);
111
112        /**
113         * The {@link com.soklet.Soklet} instance which manages this {@link SseServer} will invoke this method exactly once at initialization time - this allows {@link com.soklet.Soklet} to "talk" to your {@link SseServer}.
114         * <p>
115         * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong>
116         *
117         * @param sokletConfig   configuration for the Soklet instance that controls this server
118         * @param requestHandler a {@link com.soklet.Soklet}-internal request handler which takes a {@link SseServer}-provided request as input and supplies a {@link MarshaledResponse} as output for the {@link SseServer} to write back to the client
119         */
120        void initialize(@NonNull SokletConfig sokletConfig,
121                                                                        @NonNull RequestHandler requestHandler);
122
123        /**
124         * Request/response processing contract for {@link SseServer} implementations.
125         * <p>
126         * This is used internally by {@link com.soklet.Soklet} instances to "talk" to a {@link SseServer} via {@link SseServer#initialize(SokletConfig, RequestHandler)}.
127         * It's the responsibility of the {@link SseServer} to implement HTTP mechanics: read bytes from the request, write bytes to the response, and so forth.
128         * <p>
129         * <strong>Most Soklet applications will use Soklet's default {@link SseServer} implementation and therefore do not need to implement this interface directly.</strong>
130         *
131         * @author <a href="https://www.revetkn.com">Mark Allen</a>
132         */
133        @FunctionalInterface
134        interface RequestHandler {
135                /**
136                 * Callback to be invoked by a {@link SseServer} implementation after it has received a Server-Sent Event Source HTTP request but prior to writing initial data to the HTTP response.
137                 * <p>
138                 * <strong>Note: this method is only invoked during the initial request "handshake" - it is not called for subsequent Server-Sent Event stream writes performed via {@link SseBroadcaster#broadcastEvent(SseEvent)} invocations.</strong>
139                 * <p>
140                 * For example, when a Server-Sent Event Source HTTP request is received, you might immediately write an HTTP 200 OK response if all looks good, or reject with a 401 due to invalid credentials.
141                 * That is the extent of the request-handling logic performed here.  The Server-Sent Event stream then remains open and can be written to via {@link SseBroadcaster#broadcastEvent(SseEvent)}.
142                 * <p>
143                 * The {@link SseServer} is responsible for converting its internal request representation into a {@link Request}, which a {@link com.soklet.Soklet} instance consumes and performs Soklet application request processing logic.
144                 * <p>
145                 * The {@link com.soklet.Soklet} instance will generate a {@link MarshaledResponse} for the request, which it "hands back" to the {@link SseServer} to be sent over the wire to the client.
146                 *
147                 * @param request               a Soklet {@link Request} representation of the {@link SseServer}'s internal HTTP request data
148                 * @param requestResultConsumer invoked by {@link com.soklet.Soklet} when it's time for the {@link SseServer} to write HTTP response data to the client
149                 */
150                void handleRequest(@NonNull Request request,
151                                                                                         @NonNull Consumer<HttpRequestResult> requestResultConsumer);
152        }
153
154        /**
155         * Acquires a builder for {@link SseServer} instances.
156         *
157         * @param port the port number on which the server should listen
158         * @return the builder
159         */
160        @NonNull
161        static Builder withPort(@NonNull Integer port) {
162                requireNonNull(port);
163                return new Builder(port);
164        }
165
166        /**
167         * Creates a {@link SseServer} configured with the given port and default settings.
168         *
169         * @param port the port number on which the server should listen
170         * @return a {@link SseServer} instance
171         */
172        @NonNull
173        static SseServer fromPort(@NonNull Integer port) {
174                return withPort(port).build();
175        }
176
177        /**
178         * Builder used to construct a standard implementation of {@link SseServer}.
179         * <p>
180         * This class is intended for use by a single thread.
181         *
182         * @author <a href="https://www.revetkn.com">Mark Allen</a>
183         */
184        @NotThreadSafe
185        final class Builder {
186                @NonNull
187                Integer port;
188                @Nullable
189                String host;
190                @Nullable
191                Duration requestHeaderTimeout;
192                @Nullable
193                Duration requestHandlerTimeout;
194                @Nullable
195                Integer requestHandlerConcurrency;
196                @Nullable
197                Integer requestHandlerQueueCapacity;
198                @Nullable
199                Duration writeTimeout;
200                @Nullable
201                Duration shutdownTimeout;
202                @Nullable
203                Duration heartbeatInterval;
204                @Nullable
205                Integer maximumRequestSizeInBytes;
206                @Nullable
207                Integer maximumHeaderCount;
208                @Nullable
209                Integer maximumHeadersSizeInBytes;
210                @Nullable
211                Integer maximumRequestTargetLengthInBytes;
212                @Nullable
213                Integer requestReadBufferSizeInBytes;
214                @Nullable
215                Supplier<ExecutorService> requestHandlerExecutorServiceSupplier;
216                @Nullable
217                Integer concurrentConnectionLimit;
218                @Nullable
219                Integer broadcasterCacheCapacity;
220                @Nullable
221                Integer resourcePathCacheCapacity;
222                @Nullable
223                Integer connectionQueueCapacity;
224                @Nullable
225                Boolean verifyConnectionOnceEstablished;
226                @Nullable
227                IdGenerator<?> idGenerator;
228
229                protected Builder(@NonNull Integer port) {
230                        requireNonNull(port);
231                        this.port = port;
232                }
233
234                @NonNull
235                public Builder port(@NonNull Integer port) {
236                        requireNonNull(port);
237                        this.port = port;
238                        return this;
239                }
240
241                @NonNull
242                public Builder host(@Nullable String host) {
243                        this.host = host;
244                        return this;
245                }
246
247                /**
248                 * Sets the maximum duration for reading the SSE handshake request line and headers.
249                 * <p>
250                 * If this value is not specified, Soklet uses the server default.
251                 *
252                 * @param requestHeaderTimeout the request header timeout, or {@code null} for the default
253                 * @return this builder
254                 */
255                @NonNull
256                public Builder requestHeaderTimeout(@Nullable Duration requestHeaderTimeout) {
257                        this.requestHeaderTimeout = requestHeaderTimeout;
258                        return this;
259                }
260
261                @NonNull
262                public Builder requestHandlerTimeout(@Nullable Duration requestHandlerTimeout) {
263                        this.requestHandlerTimeout = requestHandlerTimeout;
264                        return this;
265                }
266
267                @NonNull
268                public Builder requestHandlerConcurrency(@Nullable Integer requestHandlerConcurrency) {
269                        this.requestHandlerConcurrency = requestHandlerConcurrency;
270                        return this;
271                }
272
273                @NonNull
274                public Builder requestHandlerQueueCapacity(@Nullable Integer requestHandlerQueueCapacity) {
275                        this.requestHandlerQueueCapacity = requestHandlerQueueCapacity;
276                        return this;
277                }
278
279                /**
280                 * Sets the transport write timeout for established SSE streams.
281                 * <p>
282                 * If this value is not specified, Soklet uses the server default. Use
283                 * {@link Duration#ZERO} to disable SSE stream write timeouts.
284                 *
285                 * @param writeTimeout the write timeout, or {@code null} for the default
286                 * @return this builder
287                 */
288                @NonNull
289                public Builder writeTimeout(@Nullable Duration writeTimeout) {
290                        this.writeTimeout = writeTimeout;
291                        return this;
292                }
293
294                @NonNull
295                public Builder shutdownTimeout(@Nullable Duration shutdownTimeout) {
296                        this.shutdownTimeout = shutdownTimeout;
297                        return this;
298                }
299
300                @NonNull
301                public Builder heartbeatInterval(@Nullable Duration heartbeatInterval) {
302                        this.heartbeatInterval = heartbeatInterval;
303                        return this;
304                }
305
306                /**
307                 * Sets the maximum accepted SSE handshake request size in bytes.
308                 * <p>
309                 * This limit applies to the whole received handshake request, including request line
310                 * and headers. Established SSE stream writes are governed by the write timeout and
311                 * connection queue capacity settings instead.
312                 *
313                 * @param maximumRequestSizeInBytes the maximum handshake request size, or {@code null} for the default
314                 * @return this builder
315                 */
316                @NonNull
317                public Builder maximumRequestSizeInBytes(@Nullable Integer maximumRequestSizeInBytes) {
318                        this.maximumRequestSizeInBytes = maximumRequestSizeInBytes;
319                        return this;
320                }
321
322                /**
323                 * Sets the maximum number of HTTP header fields accepted in one SSE handshake.
324                 *
325                 * @param maximumHeaderCount the maximum header count, or {@code null} for the default
326                 * @return this builder
327                 */
328                @NonNull
329                public Builder maximumHeaderCount(@Nullable Integer maximumHeaderCount) {
330                        this.maximumHeaderCount = maximumHeaderCount;
331                        return this;
332                }
333
334                /**
335                 * Sets the maximum accepted SSE handshake header-section size in bytes.
336                 * <p>
337                 * This limit applies to the header bytes after the request line, including
338                 * header-field line endings and the terminating blank line.
339                 *
340                 * @param maximumHeadersSizeInBytes the maximum headers size, or {@code null} for the default
341                 * @return this builder
342                 */
343                @NonNull
344                public Builder maximumHeadersSizeInBytes(@Nullable Integer maximumHeadersSizeInBytes) {
345                        this.maximumHeadersSizeInBytes = maximumHeadersSizeInBytes;
346                        return this;
347                }
348
349                /**
350                 * Sets the maximum SSE handshake request-target length accepted in bytes.
351                 *
352                 * @param maximumRequestTargetLengthInBytes the maximum request-target length, or {@code null} for the default
353                 * @return this builder
354                 */
355                @NonNull
356                public Builder maximumRequestTargetLengthInBytes(@Nullable Integer maximumRequestTargetLengthInBytes) {
357                        this.maximumRequestTargetLengthInBytes = maximumRequestTargetLengthInBytes;
358                        return this;
359                }
360
361                @NonNull
362                public Builder requestReadBufferSizeInBytes(@Nullable Integer requestReadBufferSizeInBytes) {
363                        this.requestReadBufferSizeInBytes = requestReadBufferSizeInBytes;
364                        return this;
365                }
366
367                @NonNull
368                public Builder requestHandlerExecutorServiceSupplier(@Nullable Supplier<ExecutorService> requestHandlerExecutorServiceSupplier) {
369                        this.requestHandlerExecutorServiceSupplier = requestHandlerExecutorServiceSupplier;
370                        return this;
371                }
372
373                @NonNull
374                public Builder concurrentConnectionLimit(@Nullable Integer concurrentConnectionLimit) {
375                        this.concurrentConnectionLimit = concurrentConnectionLimit;
376                        return this;
377                }
378
379                @NonNull
380                public Builder broadcasterCacheCapacity(@Nullable Integer broadcasterCacheCapacity) {
381                        this.broadcasterCacheCapacity = broadcasterCacheCapacity;
382                        return this;
383                }
384
385                @NonNull
386                public Builder resourcePathCacheCapacity(@Nullable Integer resourcePathCacheCapacity) {
387                        this.resourcePathCacheCapacity = resourcePathCacheCapacity;
388                        return this;
389                }
390
391                @NonNull
392                public Builder connectionQueueCapacity(@Nullable Integer connectionQueueCapacity) {
393                        this.connectionQueueCapacity = connectionQueueCapacity;
394                        return this;
395                }
396
397                @NonNull
398                public Builder verifyConnectionOnceEstablished(@Nullable Boolean verifyConnectionOnceEstablished) {
399                        this.verifyConnectionOnceEstablished = verifyConnectionOnceEstablished;
400                        return this;
401                }
402
403                @NonNull
404                public Builder idGenerator(@Nullable IdGenerator<?> idGenerator) {
405                        this.idGenerator = idGenerator;
406                        return this;
407                }
408
409                @NonNull
410                public SseServer build() {
411                        return new DefaultSseServer(this);
412                }
413        }
414}