001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet; 018 019import org.jspecify.annotations.NonNull; 020import org.jspecify.annotations.Nullable; 021 022import javax.annotation.concurrent.NotThreadSafe; 023import java.time.Duration; 024import java.util.Optional; 025import java.util.concurrent.ExecutorService; 026import java.util.function.Consumer; 027import java.util.function.Supplier; 028 029import static java.util.Objects.requireNonNull; 030 031/** 032 * A special HTTP server whose only purpose is to provide <a href="https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events">Server-Sent Event</a> functionality. 033 * <p> 034 * A Soklet application which supports Server-Sent Events will be configured with a {@link SseServer}. 035 * A regular {@link HttpServer} is only required if the same application also serves ordinary HTTP <em>Resource Methods</em>. 036 * <p> 037 * For example: 038 * <pre>{@code // Set up our SSE server 039 * SseServer sseServer = SseServer.fromPort(8081); 040 * 041 * // Wire the SSE server into our config 042 * SokletConfig config = SokletConfig.withSseServer(sseServer) 043 * .build(); 044 * 045 * // Add .httpServer(HttpServer.fromPort(8080)) if you also serve ordinary HTTP resources 046 * 047 * // Run the app 048 * try (Soklet soklet = Soklet.fromConfig(config)) { 049 * soklet.start(); 050 * System.out.println("Soklet started, press [enter] to exit"); 051 * soklet.awaitShutdown(ShutdownTrigger.ENTER_KEY); 052 * }}</pre> 053 * <p> 054 * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation. 055 * 056 * @author <a href="https://www.revetkn.com">Mark Allen</a> 057 */ 058public interface SseServer extends AutoCloseable { 059 /** 060 * Starts the SSE server, which makes it able to accept requests from clients. 061 * <p> 062 * If the server is already started, no action is taken. 063 * <p> 064 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 065 */ 066 void start(); 067 068 /** 069 * Stops the SSE server, which makes it unable to accept requests from clients. 070 * <p> 071 * If the server is already stopped, no action is taken. 072 * <p> 073 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 074 */ 075 void stop(); 076 077 /** 078 * Is this SSE server started (that is, able to handle requests from clients)? 079 * 080 * @return {@code true} if the server is started, {@code false} otherwise 081 */ 082 @NonNull 083 Boolean isStarted(); 084 085 /** 086 * {@link AutoCloseable}-enabled synonym for {@link #stop()}. 087 * <p> 088 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 089 * 090 * @throws Exception if an exception occurs while stopping the server 091 */ 092 @Override 093 default void close() throws Exception { 094 stop(); 095 } 096 097 /** 098 * Given a {@link ResourcePath} that corresponds to a <em>Resource Method</em> annotated with {@link com.soklet.annotation.SseEventSource}, acquire a {@link SseBroadcaster} which is capable of "pushing" messages to all connected Server-Sent Event clients. 099 * <p> 100 * When using the default {@link SseServer}, Soklet guarantees exactly one {@link SseBroadcaster} instance exists per {@link ResourcePath} (within the same JVM process). Soklet is responsible for the creation and management of {@link SseBroadcaster} instances. 101 * <p> 102 * Your code should not hold long-lived references to {@link SseBroadcaster} instances (e.g. in a cache or instance variables) - the recommended usage pattern is to invoke {@link #acquireBroadcaster(ResourcePath)} every time you need a broadcaster reference. 103 * <p> 104 * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation. 105 * 106 * @param resourcePath the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em> for which to acquire a broadcaster 107 * @return a broadcaster for the given {@link ResourcePath}, or {@link Optional#empty()} if there is no broadcaster available 108 */ 109 @NonNull 110 Optional<? extends SseBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath); 111 112 /** 113 * The {@link com.soklet.Soklet} instance which manages this {@link SseServer} will invoke this method exactly once at initialization time - this allows {@link com.soklet.Soklet} to "talk" to your {@link SseServer}. 114 * <p> 115 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 116 * 117 * @param sokletConfig configuration for the Soklet instance that controls this server 118 * @param requestHandler a {@link com.soklet.Soklet}-internal request handler which takes a {@link SseServer}-provided request as input and supplies a {@link MarshaledResponse} as output for the {@link SseServer} to write back to the client 119 */ 120 void initialize(@NonNull SokletConfig sokletConfig, 121 @NonNull RequestHandler requestHandler); 122 123 /** 124 * Request/response processing contract for {@link SseServer} implementations. 125 * <p> 126 * This is used internally by {@link com.soklet.Soklet} instances to "talk" to a {@link SseServer} via {@link SseServer#initialize(SokletConfig, RequestHandler)}. 127 * It's the responsibility of the {@link SseServer} to implement HTTP mechanics: read bytes from the request, write bytes to the response, and so forth. 128 * <p> 129 * <strong>Most Soklet applications will use Soklet's default {@link SseServer} implementation and therefore do not need to implement this interface directly.</strong> 130 * 131 * @author <a href="https://www.revetkn.com">Mark Allen</a> 132 */ 133 @FunctionalInterface 134 interface RequestHandler { 135 /** 136 * Callback to be invoked by a {@link SseServer} implementation after it has received a Server-Sent Event Source HTTP request but prior to writing initial data to the HTTP response. 137 * <p> 138 * <strong>Note: this method is only invoked during the initial request "handshake" - it is not called for subsequent Server-Sent Event stream writes performed via {@link SseBroadcaster#broadcastEvent(SseEvent)} invocations.</strong> 139 * <p> 140 * For example, when a Server-Sent Event Source HTTP request is received, you might immediately write an HTTP 200 OK response if all looks good, or reject with a 401 due to invalid credentials. 141 * That is the extent of the request-handling logic performed here. The Server-Sent Event stream then remains open and can be written to via {@link SseBroadcaster#broadcastEvent(SseEvent)}. 142 * <p> 143 * The {@link SseServer} is responsible for converting its internal request representation into a {@link Request}, which a {@link com.soklet.Soklet} instance consumes and performs Soklet application request processing logic. 144 * <p> 145 * The {@link com.soklet.Soklet} instance will generate a {@link MarshaledResponse} for the request, which it "hands back" to the {@link SseServer} to be sent over the wire to the client. 146 * 147 * @param request a Soklet {@link Request} representation of the {@link SseServer}'s internal HTTP request data 148 * @param requestResultConsumer invoked by {@link com.soklet.Soklet} when it's time for the {@link SseServer} to write HTTP response data to the client 149 */ 150 void handleRequest(@NonNull Request request, 151 @NonNull Consumer<HttpRequestResult> requestResultConsumer); 152 } 153 154 /** 155 * Acquires a builder for {@link SseServer} instances. 156 * 157 * @param port the port number on which the server should listen 158 * @return the builder 159 */ 160 @NonNull 161 static Builder withPort(@NonNull Integer port) { 162 requireNonNull(port); 163 return new Builder(port); 164 } 165 166 /** 167 * Creates a {@link SseServer} configured with the given port and default settings. 168 * 169 * @param port the port number on which the server should listen 170 * @return a {@link SseServer} instance 171 */ 172 @NonNull 173 static SseServer fromPort(@NonNull Integer port) { 174 return withPort(port).build(); 175 } 176 177 /** 178 * Builder used to construct a standard implementation of {@link SseServer}. 179 * <p> 180 * This class is intended for use by a single thread. 181 * 182 * @author <a href="https://www.revetkn.com">Mark Allen</a> 183 */ 184 @NotThreadSafe 185 final class Builder { 186 @NonNull 187 Integer port; 188 @Nullable 189 String host; 190 @Nullable 191 Duration requestHeaderTimeout; 192 @Nullable 193 Duration requestHandlerTimeout; 194 @Nullable 195 Integer requestHandlerConcurrency; 196 @Nullable 197 Integer requestHandlerQueueCapacity; 198 @Nullable 199 Duration writeTimeout; 200 @Nullable 201 Duration shutdownTimeout; 202 @Nullable 203 Duration heartbeatInterval; 204 @Nullable 205 Integer maximumRequestSizeInBytes; 206 @Nullable 207 Integer maximumHeaderCount; 208 @Nullable 209 Integer maximumHeadersSizeInBytes; 210 @Nullable 211 Integer maximumRequestTargetLengthInBytes; 212 @Nullable 213 Integer requestReadBufferSizeInBytes; 214 @Nullable 215 Supplier<ExecutorService> requestHandlerExecutorServiceSupplier; 216 @Nullable 217 Integer concurrentConnectionLimit; 218 @Nullable 219 Integer broadcasterCacheCapacity; 220 @Nullable 221 Integer resourcePathCacheCapacity; 222 @Nullable 223 Integer connectionQueueCapacity; 224 @Nullable 225 Boolean verifyConnectionOnceEstablished; 226 @Nullable 227 IdGenerator<?> idGenerator; 228 229 protected Builder(@NonNull Integer port) { 230 requireNonNull(port); 231 this.port = port; 232 } 233 234 @NonNull 235 public Builder port(@NonNull Integer port) { 236 requireNonNull(port); 237 this.port = port; 238 return this; 239 } 240 241 @NonNull 242 public Builder host(@Nullable String host) { 243 this.host = host; 244 return this; 245 } 246 247 /** 248 * Sets the maximum duration for reading the SSE handshake request line and headers. 249 * <p> 250 * If this value is not specified, Soklet uses the server default. 251 * 252 * @param requestHeaderTimeout the request header timeout, or {@code null} for the default 253 * @return this builder 254 */ 255 @NonNull 256 public Builder requestHeaderTimeout(@Nullable Duration requestHeaderTimeout) { 257 this.requestHeaderTimeout = requestHeaderTimeout; 258 return this; 259 } 260 261 @NonNull 262 public Builder requestHandlerTimeout(@Nullable Duration requestHandlerTimeout) { 263 this.requestHandlerTimeout = requestHandlerTimeout; 264 return this; 265 } 266 267 @NonNull 268 public Builder requestHandlerConcurrency(@Nullable Integer requestHandlerConcurrency) { 269 this.requestHandlerConcurrency = requestHandlerConcurrency; 270 return this; 271 } 272 273 @NonNull 274 public Builder requestHandlerQueueCapacity(@Nullable Integer requestHandlerQueueCapacity) { 275 this.requestHandlerQueueCapacity = requestHandlerQueueCapacity; 276 return this; 277 } 278 279 /** 280 * Sets the transport write timeout for established SSE streams. 281 * <p> 282 * If this value is not specified, Soklet uses the server default. Use 283 * {@link Duration#ZERO} to disable SSE stream write timeouts. 284 * 285 * @param writeTimeout the write timeout, or {@code null} for the default 286 * @return this builder 287 */ 288 @NonNull 289 public Builder writeTimeout(@Nullable Duration writeTimeout) { 290 this.writeTimeout = writeTimeout; 291 return this; 292 } 293 294 @NonNull 295 public Builder shutdownTimeout(@Nullable Duration shutdownTimeout) { 296 this.shutdownTimeout = shutdownTimeout; 297 return this; 298 } 299 300 @NonNull 301 public Builder heartbeatInterval(@Nullable Duration heartbeatInterval) { 302 this.heartbeatInterval = heartbeatInterval; 303 return this; 304 } 305 306 /** 307 * Sets the maximum accepted SSE handshake request size in bytes. 308 * <p> 309 * This limit applies to the whole received handshake request, including request line 310 * and headers. Established SSE stream writes are governed by the write timeout and 311 * connection queue capacity settings instead. 312 * 313 * @param maximumRequestSizeInBytes the maximum handshake request size, or {@code null} for the default 314 * @return this builder 315 */ 316 @NonNull 317 public Builder maximumRequestSizeInBytes(@Nullable Integer maximumRequestSizeInBytes) { 318 this.maximumRequestSizeInBytes = maximumRequestSizeInBytes; 319 return this; 320 } 321 322 /** 323 * Sets the maximum number of HTTP header fields accepted in one SSE handshake. 324 * 325 * @param maximumHeaderCount the maximum header count, or {@code null} for the default 326 * @return this builder 327 */ 328 @NonNull 329 public Builder maximumHeaderCount(@Nullable Integer maximumHeaderCount) { 330 this.maximumHeaderCount = maximumHeaderCount; 331 return this; 332 } 333 334 /** 335 * Sets the maximum accepted SSE handshake header-section size in bytes. 336 * <p> 337 * This limit applies to the header bytes after the request line, including 338 * header-field line endings and the terminating blank line. 339 * 340 * @param maximumHeadersSizeInBytes the maximum headers size, or {@code null} for the default 341 * @return this builder 342 */ 343 @NonNull 344 public Builder maximumHeadersSizeInBytes(@Nullable Integer maximumHeadersSizeInBytes) { 345 this.maximumHeadersSizeInBytes = maximumHeadersSizeInBytes; 346 return this; 347 } 348 349 /** 350 * Sets the maximum SSE handshake request-target length accepted in bytes. 351 * 352 * @param maximumRequestTargetLengthInBytes the maximum request-target length, or {@code null} for the default 353 * @return this builder 354 */ 355 @NonNull 356 public Builder maximumRequestTargetLengthInBytes(@Nullable Integer maximumRequestTargetLengthInBytes) { 357 this.maximumRequestTargetLengthInBytes = maximumRequestTargetLengthInBytes; 358 return this; 359 } 360 361 @NonNull 362 public Builder requestReadBufferSizeInBytes(@Nullable Integer requestReadBufferSizeInBytes) { 363 this.requestReadBufferSizeInBytes = requestReadBufferSizeInBytes; 364 return this; 365 } 366 367 @NonNull 368 public Builder requestHandlerExecutorServiceSupplier(@Nullable Supplier<ExecutorService> requestHandlerExecutorServiceSupplier) { 369 this.requestHandlerExecutorServiceSupplier = requestHandlerExecutorServiceSupplier; 370 return this; 371 } 372 373 @NonNull 374 public Builder concurrentConnectionLimit(@Nullable Integer concurrentConnectionLimit) { 375 this.concurrentConnectionLimit = concurrentConnectionLimit; 376 return this; 377 } 378 379 @NonNull 380 public Builder broadcasterCacheCapacity(@Nullable Integer broadcasterCacheCapacity) { 381 this.broadcasterCacheCapacity = broadcasterCacheCapacity; 382 return this; 383 } 384 385 @NonNull 386 public Builder resourcePathCacheCapacity(@Nullable Integer resourcePathCacheCapacity) { 387 this.resourcePathCacheCapacity = resourcePathCacheCapacity; 388 return this; 389 } 390 391 @NonNull 392 public Builder connectionQueueCapacity(@Nullable Integer connectionQueueCapacity) { 393 this.connectionQueueCapacity = connectionQueueCapacity; 394 return this; 395 } 396 397 @NonNull 398 public Builder verifyConnectionOnceEstablished(@Nullable Boolean verifyConnectionOnceEstablished) { 399 this.verifyConnectionOnceEstablished = verifyConnectionOnceEstablished; 400 return this; 401 } 402 403 @NonNull 404 public Builder idGenerator(@Nullable IdGenerator<?> idGenerator) { 405 this.idGenerator = idGenerator; 406 return this; 407 } 408 409 @NonNull 410 public SseServer build() { 411 return new DefaultSseServer(this); 412 } 413 } 414}