001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet; 018 019import org.jspecify.annotations.NonNull; 020import org.jspecify.annotations.Nullable; 021 022import javax.annotation.concurrent.NotThreadSafe; 023import java.time.Duration; 024import java.util.Optional; 025import java.util.concurrent.ExecutorService; 026import java.util.function.Consumer; 027import java.util.function.Supplier; 028 029import static java.util.Objects.requireNonNull; 030 031/** 032 * A special HTTP server whose only purpose is to provide <a href="https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events">Server-Sent Event</a> functionality. 033 * <p> 034 * A Soklet application which supports Server-Sent Events will be configured with a {@link SseServer}. 035 * A regular {@link HttpServer} is only required if the same application also serves ordinary HTTP <em>Resource Methods</em>. 036 * <p> 037 * For example: 038 * <pre>{@code // Set up our SSE server 039 * SseServer sseServer = SseServer.fromPort(8081); 040 * 041 * // Wire the SSE server into our config 042 * SokletConfig config = SokletConfig.withSseServer(sseServer) 043 * .build(); 044 * 045 * // Add .httpServer(HttpServer.fromPort(8080)) if you also serve ordinary HTTP resources 046 * 047 * // Run the app 048 * try (Soklet soklet = Soklet.fromConfig(config)) { 049 * soklet.start(); 050 * System.out.println("Soklet started, press [enter] to exit"); 051 * soklet.awaitShutdown(ShutdownTrigger.ENTER_KEY); 052 * }}</pre> 053 * <p> 054 * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation. 055 * 056 * @author <a href="https://www.revetkn.com">Mark Allen</a> 057 */ 058public interface SseServer extends AutoCloseable { 059 /** 060 * Starts the SSE server, which makes it able to accept requests from clients. 061 * <p> 062 * If the server is already started, no action is taken. 063 * <p> 064 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 065 */ 066 void start(); 067 068 /** 069 * Stops the SSE server, which makes it unable to accept requests from clients. 070 * <p> 071 * If the server is already stopped, no action is taken. 072 * <p> 073 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 074 */ 075 void stop(); 076 077 /** 078 * Is this SSE server started (that is, able to handle requests from clients)? 079 * 080 * @return {@code true} if the server is started, {@code false} otherwise 081 */ 082 @NonNull 083 Boolean isStarted(); 084 085 /** 086 * {@link AutoCloseable}-enabled synonym for {@link #stop()}. 087 * <p> 088 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 089 * 090 * @throws Exception if an exception occurs while stopping the server 091 */ 092 @Override 093 default void close() throws Exception { 094 stop(); 095 } 096 097 /** 098 * Given a {@link ResourcePath} that corresponds to a <em>Resource Method</em> annotated with {@link com.soklet.annotation.SseEventSource}, acquire a {@link SseBroadcaster} which is capable of "pushing" messages to all connected Server-Sent Event clients. 099 * <p> 100 * When using the default {@link SseServer}, Soklet guarantees exactly one {@link SseBroadcaster} instance exists per {@link ResourcePath} (within the same JVM process). Soklet is responsible for the creation and management of {@link SseBroadcaster} instances. 101 * <p> 102 * Your code should not hold long-lived references to {@link SseBroadcaster} instances (e.g. in a cache or instance variables) - the recommended usage pattern is to invoke {@link #acquireBroadcaster(ResourcePath)} every time you need a broadcaster reference. 103 * <p> 104 * See <a href="https://www.soklet.com/docs/server-sent-events">https://www.soklet.com/docs/server-sent-events</a> for detailed documentation. 105 * 106 * @param resourcePath the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em> for which to acquire a broadcaster 107 * @return a broadcaster for the given {@link ResourcePath}, or {@link Optional#empty()} if there is no broadcaster available 108 */ 109 @NonNull 110 Optional<? extends SseBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath); 111 112 /** 113 * The {@link com.soklet.Soklet} instance which manages this {@link SseServer} will invoke this method exactly once at initialization time - this allows {@link com.soklet.Soklet} to "talk" to your {@link SseServer}. 114 * <p> 115 * <strong>This method is designed for internal use by {@link com.soklet.Soklet} only and should not be invoked elsewhere.</strong> 116 * 117 * @param sokletConfig configuration for the Soklet instance that controls this server 118 * @param requestHandler a {@link com.soklet.Soklet}-internal request handler which takes a {@link SseServer}-provided request as input and supplies a {@link MarshaledResponse} as output for the {@link SseServer} to write back to the client 119 */ 120 void initialize(@NonNull SokletConfig sokletConfig, 121 @NonNull RequestHandler requestHandler); 122 123 /** 124 * Request/response processing contract for {@link SseServer} implementations. 125 * <p> 126 * This is used internally by {@link com.soklet.Soklet} instances to "talk" to a {@link SseServer} via {@link SseServer#initialize(SokletConfig, RequestHandler)}. 127 * It's the responsibility of the {@link SseServer} to implement HTTP mechanics: read bytes from the request, write bytes to the response, and so forth. 128 * <p> 129 * <strong>Most Soklet applications will use Soklet's default {@link SseServer} implementation and therefore do not need to implement this interface directly.</strong> 130 * 131 * @author <a href="https://www.revetkn.com">Mark Allen</a> 132 */ 133 @FunctionalInterface 134 interface RequestHandler { 135 /** 136 * Callback to be invoked by a {@link SseServer} implementation after it has received a Server-Sent Event Source HTTP request but prior to writing initial data to the HTTP response. 137 * <p> 138 * <strong>Note: this method is only invoked during the initial request "handshake" - it is not called for subsequent Server-Sent Event stream writes performed via {@link SseBroadcaster#broadcastEvent(SseEvent)} invocations.</strong> 139 * <p> 140 * For example, when a Server-Sent Event Source HTTP request is received, you might immediately write an HTTP 200 OK response if all looks good, or reject with a 401 due to invalid credentials. 141 * That is the extent of the request-handling logic performed here. The Server-Sent Event stream then remains open and can be written to via {@link SseBroadcaster#broadcastEvent(SseEvent)}. 142 * <p> 143 * The {@link SseServer} is responsible for converting its internal request representation into a {@link Request}, which a {@link com.soklet.Soklet} instance consumes and performs Soklet application request processing logic. 144 * <p> 145 * The {@link com.soklet.Soklet} instance will generate a {@link MarshaledResponse} for the request, which it "hands back" to the {@link SseServer} to be sent over the wire to the client. 146 * 147 * @param request a Soklet {@link Request} representation of the {@link SseServer}'s internal HTTP request data 148 * @param requestResultConsumer invoked by {@link com.soklet.Soklet} when it's time for the {@link SseServer} to write HTTP response data to the client 149 */ 150 void handleRequest(@NonNull Request request, 151 @NonNull Consumer<HttpRequestResult> requestResultConsumer); 152 } 153 154 /** 155 * Acquires a builder for {@link SseServer} instances. 156 * 157 * @param port the port number on which the server should listen 158 * @return the builder 159 */ 160 @NonNull 161 static Builder withPort(@NonNull Integer port) { 162 requireNonNull(port); 163 return new Builder(port); 164 } 165 166 /** 167 * Creates a {@link SseServer} configured with the given port and default settings. 168 * 169 * @param port the port number on which the server should listen 170 * @return a {@link SseServer} instance 171 */ 172 @NonNull 173 static SseServer fromPort(@NonNull Integer port) { 174 return withPort(port).build(); 175 } 176 177 /** 178 * Builder used to construct a standard implementation of {@link SseServer}. 179 * <p> 180 * This class is intended for use by a single thread. 181 * 182 * @author <a href="https://www.revetkn.com">Mark Allen</a> 183 */ 184 @NotThreadSafe 185 final class Builder { 186 @NonNull 187 Integer port; 188 @Nullable 189 String host; 190 @Nullable 191 Duration requestTimeout; 192 @Nullable 193 Duration requestHandlerTimeout; 194 @Nullable 195 Integer requestHandlerConcurrency; 196 @Nullable 197 Integer requestHandlerQueueCapacity; 198 @Nullable 199 Duration writeTimeout; 200 @Nullable 201 Duration shutdownTimeout; 202 @Nullable 203 Duration heartbeatInterval; 204 @Nullable 205 Integer maximumRequestSizeInBytes; 206 @Nullable 207 Integer requestReadBufferSizeInBytes; 208 @Nullable 209 Supplier<ExecutorService> requestHandlerExecutorServiceSupplier; 210 @Nullable 211 Integer concurrentConnectionLimit; 212 @Nullable 213 Integer broadcasterCacheCapacity; 214 @Nullable 215 Integer resourcePathCacheCapacity; 216 @Nullable 217 Integer connectionQueueCapacity; 218 @Nullable 219 Boolean verifyConnectionOnceEstablished; 220 @Nullable 221 IdGenerator<?> idGenerator; 222 223 @NonNull 224 protected Builder(@NonNull Integer port) { 225 requireNonNull(port); 226 this.port = port; 227 } 228 229 @NonNull 230 public Builder port(@NonNull Integer port) { 231 requireNonNull(port); 232 this.port = port; 233 return this; 234 } 235 236 @NonNull 237 public Builder host(@Nullable String host) { 238 this.host = host; 239 return this; 240 } 241 242 @NonNull 243 public Builder requestTimeout(@Nullable Duration requestTimeout) { 244 this.requestTimeout = requestTimeout; 245 return this; 246 } 247 248 @NonNull 249 public Builder requestHandlerTimeout(@Nullable Duration requestHandlerTimeout) { 250 this.requestHandlerTimeout = requestHandlerTimeout; 251 return this; 252 } 253 254 @NonNull 255 public Builder requestHandlerConcurrency(@Nullable Integer requestHandlerConcurrency) { 256 this.requestHandlerConcurrency = requestHandlerConcurrency; 257 return this; 258 } 259 260 @NonNull 261 public Builder requestHandlerQueueCapacity(@Nullable Integer requestHandlerQueueCapacity) { 262 this.requestHandlerQueueCapacity = requestHandlerQueueCapacity; 263 return this; 264 } 265 266 @NonNull 267 public Builder writeTimeout(@Nullable Duration writeTimeout) { 268 this.writeTimeout = writeTimeout; 269 return this; 270 } 271 272 @NonNull 273 public Builder shutdownTimeout(@Nullable Duration shutdownTimeout) { 274 this.shutdownTimeout = shutdownTimeout; 275 return this; 276 } 277 278 @NonNull 279 public Builder heartbeatInterval(@Nullable Duration heartbeatInterval) { 280 this.heartbeatInterval = heartbeatInterval; 281 return this; 282 } 283 284 @NonNull 285 public Builder maximumRequestSizeInBytes(@Nullable Integer maximumRequestSizeInBytes) { 286 this.maximumRequestSizeInBytes = maximumRequestSizeInBytes; 287 return this; 288 } 289 290 @NonNull 291 public Builder requestReadBufferSizeInBytes(@Nullable Integer requestReadBufferSizeInBytes) { 292 this.requestReadBufferSizeInBytes = requestReadBufferSizeInBytes; 293 return this; 294 } 295 296 @NonNull 297 public Builder requestHandlerExecutorServiceSupplier(@Nullable Supplier<ExecutorService> requestHandlerExecutorServiceSupplier) { 298 this.requestHandlerExecutorServiceSupplier = requestHandlerExecutorServiceSupplier; 299 return this; 300 } 301 302 @NonNull 303 public Builder concurrentConnectionLimit(@Nullable Integer concurrentConnectionLimit) { 304 this.concurrentConnectionLimit = concurrentConnectionLimit; 305 return this; 306 } 307 308 @NonNull 309 public Builder broadcasterCacheCapacity(@Nullable Integer broadcasterCacheCapacity) { 310 this.broadcasterCacheCapacity = broadcasterCacheCapacity; 311 return this; 312 } 313 314 @NonNull 315 public Builder resourcePathCacheCapacity(@Nullable Integer resourcePathCacheCapacity) { 316 this.resourcePathCacheCapacity = resourcePathCacheCapacity; 317 return this; 318 } 319 320 @NonNull 321 public Builder connectionQueueCapacity(@Nullable Integer connectionQueueCapacity) { 322 this.connectionQueueCapacity = connectionQueueCapacity; 323 return this; 324 } 325 326 @NonNull 327 public Builder verifyConnectionOnceEstablished(@Nullable Boolean verifyConnectionOnceEstablished) { 328 this.verifyConnectionOnceEstablished = verifyConnectionOnceEstablished; 329 return this; 330 } 331 332 @NonNull 333 public Builder idGenerator(@Nullable IdGenerator<?> idGenerator) { 334 this.idGenerator = idGenerator; 335 return this; 336 } 337 338 @NonNull 339 public SseServer build() { 340 return new DefaultSseServer(this); 341 } 342 } 343}