001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet; 018 019import org.jspecify.annotations.NonNull; 020import org.jspecify.annotations.Nullable; 021 022import javax.annotation.concurrent.NotThreadSafe; 023import java.time.Duration; 024import java.util.Optional; 025import java.util.concurrent.ExecutorService; 026import java.util.function.Consumer; 027import java.util.function.Supplier; 028 029import static java.util.Objects.requireNonNull; 030 031/** 032 * A special HTTP server whose only purpose is to provide <a href="https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events">Server-Sent Event</a> functionality. 033 * <p> 034 * A Soklet application which supports Server-Sent Events will be configured with a {@link SseServer}. 035 * A regular {@link HttpServer} is only required if the same application also serves ordinary HTTP <em>Resource Methods</em>. 036 * <p> 037 * For example: 038 * <pre>{@code // Set up our SSE server 039 * SseServer sseServer = SseServer.fromPort(8081); 040 * 041 * // Wire the SSE server into our config 042 * SokletConfig config = SokletConfig.withSseServer(sseServer) 043 * .build(); 044 * 045 * // Add .httpServer(HttpServer.fromPort(8080)) if you also serve ordinary HTTP resources 046 * 047 * // Run the app 048 * try (Soklet soklet = Soklet.fromConfig(config)) { 049 * soklet.start(); 050 * System.out.println("Soklet started, press [enter] to exit"); 051 * soklet.awaitShutdown(ShutdownTrigger.ENTER_KEY); 052 * }}</pre> 053 * <p> 054 * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation. 055 * 056 * @author <a href="https://www.revetkn.com">Mark Allen</a> 057 */ 058public interface SseServer extends AutoCloseable { 059 /** 060 * Starts the SSE server, which makes it able to accept requests from clients. 061 * <p> 062 * If the server is already started, no action is taken. 063 * <p> 064 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 065 */ 066 void start(); 067 068 /** 069 * Stops the SSE server, which makes it unable to accept requests from clients. 070 * <p> 071 * If the server is already stopped, no action is taken. 072 * <p> 073 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 074 */ 075 void stop(); 076 077 /** 078 * Is this SSE server started (that is, able to handle requests from clients)? 079 * 080 * @return {@code true} if the server is started, {@code false} otherwise 081 */ 082 @NonNull 083 Boolean isStarted(); 084 085 /** 086 * {@link AutoCloseable}-enabled synonym for {@link #stop()}. 087 * <p> 088 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 089 * 090 * @throws Exception if an exception occurs while stopping the server 091 */ 092 @Override 093 default void close() throws Exception { 094 stop(); 095 } 096 097 /** 098 * Given a {@link ResourcePath} that corresponds to a <em>Resource Method</em> annotated with {@link com.soklet.annotation.SseEventSource}, acquire a {@link SseBroadcaster} which is capable of "pushing" messages to all connected Server-Sent Event clients. 099 * <p> 100 * When using the default {@link SseServer}, Soklet guarantees exactly one {@link SseBroadcaster} instance exists per {@link ResourcePath} (within the same JVM process). Soklet is responsible for the creation and management of {@link SseBroadcaster} instances. 101 * <p> 102 * Your code should not hold long-lived references to {@link SseBroadcaster} instances (e.g. in a cache or instance variables) - the recommended usage pattern is to invoke {@link #acquireBroadcaster(ResourcePath)} every time you need a broadcaster reference. 103 * <p> 104 * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation. 105 * 106 * @param resourcePath the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em> for which to acquire a broadcaster 107 * @return a broadcaster for the given {@link ResourcePath}, or {@link Optional#empty()} if there is no broadcaster available 108 */ 109 @NonNull 110 Optional<? extends SseBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath); 111 112 /** 113 * The {@link com.soklet.Soklet} instance which manages this {@link SseServer} will invoke this method exactly once at initialization time - this allows {@link com.soklet.Soklet} to "talk" to your {@link SseServer}. 114 * <p> 115 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 116 * 117 * @param sokletConfig configuration for the Soklet instance that controls this server 118 * @param requestHandler a {@link com.soklet.Soklet}-internal request handler which takes a {@link SseServer}-provided request as input and supplies a {@link MarshaledResponse} as output for the {@link SseServer} to write back to the client 119 */ 120 void initialize(@NonNull SokletConfig sokletConfig, 121 @NonNull RequestHandler requestHandler); 122 123 /** 124 * Request/response processing contract for {@link SseServer} implementations. 125 * <p> 126 * This is used internally by {@link com.soklet.Soklet} instances to "talk" to a {@link SseServer} via {@link SseServer#initialize(SokletConfig, RequestHandler)}. 127 * It's the responsibility of the {@link SseServer} to implement HTTP mechanics: read bytes from the request, write bytes to the response, and so forth. 128 * <p> 129 * <strong>Most Soklet applications will use Soklet's default {@link SseServer} implementation and therefore do not need to implement this interface directly.</strong> 130 * 131 * @author <a href="https://www.revetkn.com">Mark Allen</a> 132 */ 133 @FunctionalInterface 134 interface RequestHandler { 135 /** 136 * Callback to be invoked by a {@link SseServer} implementation after it has received a Server-Sent Event Source HTTP request but prior to writing initial data to the HTTP response. 137 * <p> 138 * <strong>Note: this method is only invoked during the initial request "handshake" - it is not called for subsequent Server-Sent Event stream writes performed via {@link SseBroadcaster#broadcastEvent(SseEvent)} invocations.</strong> 139 * <p> 140 * For example, when a Server-Sent Event Source HTTP request is received, you might immediately write an HTTP 200 OK response if all looks good, or reject with a 401 due to invalid credentials. 141 * That is the extent of the request-handling logic performed here. The Server-Sent Event stream then remains open and can be written to via {@link SseBroadcaster#broadcastEvent(SseEvent)}. 142 * <p> 143 * The {@link SseServer} is responsible for converting its internal request representation into a {@link Request}, which a {@link com.soklet.Soklet} instance consumes and performs Soklet application request processing logic. 144 * <p> 145 * The {@link com.soklet.Soklet} instance will generate a {@link MarshaledResponse} for the request, which it "hands back" to the {@link SseServer} to be sent over the wire to the client. 146 * 147 * @param request a Soklet {@link Request} representation of the {@link SseServer}'s internal HTTP request data 148 * @param requestResultConsumer invoked by {@link com.soklet.Soklet} when it's time for the {@link SseServer} to write HTTP response data to the client 149 */ 150 void handleRequest(@NonNull Request request, 151 @NonNull Consumer<HttpRequestResult> requestResultConsumer); 152 } 153 154 /** 155 * Acquires a builder for {@link SseServer} instances. 156 * 157 * @param port the port number on which the server should listen 158 * @return the builder 159 */ 160 @NonNull 161 static Builder withPort(@NonNull Integer port) { 162 requireNonNull(port); 163 return new Builder(port); 164 } 165 166 /** 167 * Creates a {@link SseServer} configured with the given port and default settings. 168 * 169 * @param port the port number on which the server should listen 170 * @return a {@link SseServer} instance 171 */ 172 @NonNull 173 static SseServer fromPort(@NonNull Integer port) { 174 return withPort(port).build(); 175 } 176 177 /** 178 * Builder used to construct a standard implementation of {@link SseServer}. 179 * <p> 180 * This class is intended for use by a single thread. 181 * 182 * @author <a href="https://www.revetkn.com">Mark Allen</a> 183 */ 184 @NotThreadSafe 185 final class Builder { 186 @NonNull 187 Integer port; 188 @Nullable 189 String host; 190 @Nullable 191 Duration requestHeaderTimeout; 192 @Nullable 193 Duration requestHandlerTimeout; 194 @Nullable 195 Integer requestHandlerConcurrency; 196 @Nullable 197 Integer requestHandlerQueueCapacity; 198 @Nullable 199 Duration writeTimeout; 200 @Nullable 201 Duration shutdownTimeout; 202 @Nullable 203 Duration heartbeatInterval; 204 @Nullable 205 Integer maximumRequestSizeInBytes; 206 @Nullable 207 Integer maximumHeaderCount; 208 @Nullable 209 Integer maximumRequestTargetLengthInBytes; 210 @Nullable 211 Integer requestReadBufferSizeInBytes; 212 @Nullable 213 Supplier<ExecutorService> requestHandlerExecutorServiceSupplier; 214 @Nullable 215 Integer concurrentConnectionLimit; 216 @Nullable 217 Integer broadcasterCacheCapacity; 218 @Nullable 219 Integer resourcePathCacheCapacity; 220 @Nullable 221 Integer connectionQueueCapacity; 222 @Nullable 223 Boolean verifyConnectionOnceEstablished; 224 @Nullable 225 IdGenerator<?> idGenerator; 226 227 @NonNull 228 protected Builder(@NonNull Integer port) { 229 requireNonNull(port); 230 this.port = port; 231 } 232 233 @NonNull 234 public Builder port(@NonNull Integer port) { 235 requireNonNull(port); 236 this.port = port; 237 return this; 238 } 239 240 @NonNull 241 public Builder host(@Nullable String host) { 242 this.host = host; 243 return this; 244 } 245 246 /** 247 * Sets the maximum duration for reading the SSE handshake request line and headers. 248 * <p> 249 * If this value is not specified, Soklet uses the server default. 250 * 251 * @param requestHeaderTimeout the request header timeout, or {@code null} for the default 252 * @return this builder 253 */ 254 @NonNull 255 public Builder requestHeaderTimeout(@Nullable Duration requestHeaderTimeout) { 256 this.requestHeaderTimeout = requestHeaderTimeout; 257 return this; 258 } 259 260 @NonNull 261 public Builder requestHandlerTimeout(@Nullable Duration requestHandlerTimeout) { 262 this.requestHandlerTimeout = requestHandlerTimeout; 263 return this; 264 } 265 266 @NonNull 267 public Builder requestHandlerConcurrency(@Nullable Integer requestHandlerConcurrency) { 268 this.requestHandlerConcurrency = requestHandlerConcurrency; 269 return this; 270 } 271 272 @NonNull 273 public Builder requestHandlerQueueCapacity(@Nullable Integer requestHandlerQueueCapacity) { 274 this.requestHandlerQueueCapacity = requestHandlerQueueCapacity; 275 return this; 276 } 277 278 @NonNull 279 public Builder writeTimeout(@Nullable Duration writeTimeout) { 280 this.writeTimeout = writeTimeout; 281 return this; 282 } 283 284 @NonNull 285 public Builder shutdownTimeout(@Nullable Duration shutdownTimeout) { 286 this.shutdownTimeout = shutdownTimeout; 287 return this; 288 } 289 290 @NonNull 291 public Builder heartbeatInterval(@Nullable Duration heartbeatInterval) { 292 this.heartbeatInterval = heartbeatInterval; 293 return this; 294 } 295 296 /** 297 * Sets the maximum accepted SSE handshake request size in bytes. 298 * <p> 299 * This limit applies to the whole received handshake request, including request line 300 * and headers. Established SSE stream writes are governed by the write timeout and 301 * connection queue capacity settings instead. 302 * 303 * @param maximumRequestSizeInBytes the maximum handshake request size, or {@code null} for the default 304 * @return this builder 305 */ 306 @NonNull 307 public Builder maximumRequestSizeInBytes(@Nullable Integer maximumRequestSizeInBytes) { 308 this.maximumRequestSizeInBytes = maximumRequestSizeInBytes; 309 return this; 310 } 311 312 /** 313 * Sets the maximum number of HTTP header fields accepted in one SSE handshake. 314 * 315 * @param maximumHeaderCount the maximum header count, or {@code null} for the default 316 * @return this builder 317 */ 318 @NonNull 319 public Builder maximumHeaderCount(@Nullable Integer maximumHeaderCount) { 320 this.maximumHeaderCount = maximumHeaderCount; 321 return this; 322 } 323 324 /** 325 * Sets the maximum SSE handshake request-target length accepted in bytes. 326 * 327 * @param maximumRequestTargetLengthInBytes the maximum request-target length, or {@code null} for the default 328 * @return this builder 329 */ 330 @NonNull 331 public Builder maximumRequestTargetLengthInBytes(@Nullable Integer maximumRequestTargetLengthInBytes) { 332 this.maximumRequestTargetLengthInBytes = maximumRequestTargetLengthInBytes; 333 return this; 334 } 335 336 @NonNull 337 public Builder requestReadBufferSizeInBytes(@Nullable Integer requestReadBufferSizeInBytes) { 338 this.requestReadBufferSizeInBytes = requestReadBufferSizeInBytes; 339 return this; 340 } 341 342 @NonNull 343 public Builder requestHandlerExecutorServiceSupplier(@Nullable Supplier<ExecutorService> requestHandlerExecutorServiceSupplier) { 344 this.requestHandlerExecutorServiceSupplier = requestHandlerExecutorServiceSupplier; 345 return this; 346 } 347 348 @NonNull 349 public Builder concurrentConnectionLimit(@Nullable Integer concurrentConnectionLimit) { 350 this.concurrentConnectionLimit = concurrentConnectionLimit; 351 return this; 352 } 353 354 @NonNull 355 public Builder broadcasterCacheCapacity(@Nullable Integer broadcasterCacheCapacity) { 356 this.broadcasterCacheCapacity = broadcasterCacheCapacity; 357 return this; 358 } 359 360 @NonNull 361 public Builder resourcePathCacheCapacity(@Nullable Integer resourcePathCacheCapacity) { 362 this.resourcePathCacheCapacity = resourcePathCacheCapacity; 363 return this; 364 } 365 366 @NonNull 367 public Builder connectionQueueCapacity(@Nullable Integer connectionQueueCapacity) { 368 this.connectionQueueCapacity = connectionQueueCapacity; 369 return this; 370 } 371 372 @NonNull 373 public Builder verifyConnectionOnceEstablished(@Nullable Boolean verifyConnectionOnceEstablished) { 374 this.verifyConnectionOnceEstablished = verifyConnectionOnceEstablished; 375 return this; 376 } 377 378 @NonNull 379 public Builder idGenerator(@Nullable IdGenerator<?> idGenerator) { 380 this.idGenerator = idGenerator; 381 return this; 382 } 383 384 @NonNull 385 public SseServer build() { 386 return new DefaultSseServer(this); 387 } 388 } 389}