001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet; 018 019import com.soklet.ServerSentEventRequestResult.HandshakeAccepted; 020import com.soklet.ServerSentEventRequestResult.HandshakeRejected; 021import com.soklet.annotation.ServerSentEventSource; 022import com.soklet.internal.spring.LinkedCaseInsensitiveMap; 023import org.jspecify.annotations.NonNull; 024import org.jspecify.annotations.Nullable; 025 026import javax.annotation.concurrent.ThreadSafe; 027import java.io.BufferedReader; 028import java.io.IOException; 029import java.io.InputStreamReader; 030import java.lang.reflect.InvocationTargetException; 031import java.nio.charset.Charset; 032import java.nio.charset.StandardCharsets; 033import java.time.Duration; 034import java.time.Instant; 035import java.time.ZoneId; 036import java.time.format.DateTimeFormatter; 037import java.util.ArrayList; 038import java.util.Collections; 039import java.util.EnumSet; 040import java.util.HashMap; 041import java.util.LinkedHashMap; 042import java.util.List; 043import java.util.Locale; 044import java.util.Map; 045import java.util.Map.Entry; 046import java.util.Objects; 047import java.util.Optional; 048import java.util.Set; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.CopyOnWriteArraySet; 051import java.util.concurrent.CountDownLatch; 052import java.util.concurrent.atomic.AtomicBoolean; 053import java.util.concurrent.atomic.AtomicReference; 054import java.util.concurrent.locks.ReentrantLock; 055import java.util.function.BiConsumer; 056import java.util.function.Consumer; 057import java.util.function.Function; 058import java.util.stream.Collectors; 059 060import static com.soklet.Utilities.emptyByteArray; 061import static java.lang.String.format; 062import static java.util.Objects.requireNonNull; 063 064/** 065 * Soklet's main class - manages a {@link Server} (and optionally a {@link ServerSentEventServer}) using the provided system configuration. 066 * <p> 067 * <pre>{@code // Use out-of-the-box defaults 068 * SokletConfig config = SokletConfig.withServer( 069 * Server.fromPort(8080) 070 * ).build(); 071 * 072 * try (Soklet soklet = Soklet.fromConfig(config)) { 073 * soklet.start(); 074 * System.out.println("Soklet started, press [enter] to exit"); 075 * soklet.awaitShutdown(ShutdownTrigger.ENTER_KEY); 076 * }}</pre> 077 * <p> 078 * Soklet also offers an off-network {@link Simulator} concept via {@link #runSimulator(SokletConfig, Consumer)}, useful for integration testing. 079 * <p> 080 * Given a <em>Resource Method</em>... 081 * <pre>{@code public class HelloResource { 082 * @GET("/hello") 083 * public String hello(@QueryParameter String name) { 084 * return String.format("Hello, %s", name); 085 * } 086 * }}</pre> 087 * ...we might test it like this: 088 * <pre>{@code @Test 089 * public void integrationTest() { 090 * // Just use your app's existing configuration 091 * SokletConfig config = obtainMySokletConfig(); 092 * 093 * // Instead of running on a real HTTP server that listens on a port, 094 * // a non-network Simulator is provided against which you can 095 * // issue requests and receive responses. 096 * Soklet.runSimulator(config, (simulator -> { 097 * // Construct a request 098 * Request request = Request.withPath(HttpMethod.GET, "/hello") 099 * .queryParameters(Map.of("name", Set.of("Mark"))) 100 * .build(); 101 * 102 * // Perform the request and get a handle to the response 103 * RequestResult result = simulator.performRequest(request); 104 * MarshaledResponse marshaledResponse = result.getMarshaledResponse(); 105 * 106 * // Verify status code 107 * Integer expectedCode = 200; 108 * Integer actualCode = marshaledResponse.getStatusCode(); 109 * assertEquals(expectedCode, actualCode, "Bad status code"); 110 * 111 * // Verify response body 112 * marshaledResponse.getBody().ifPresentOrElse(body -> { 113 * String expectedBody = "Hello, Mark"; 114 * String actualBody = new String(body, StandardCharsets.UTF_8); 115 * assertEquals(expectedBody, actualBody, "Bad response body"); 116 * }, () -> { 117 * Assertions.fail("No response body"); 118 * }); 119 * })); 120 * }}</pre> 121 * <p> 122 * The {@link Simulator} also supports Server-Sent Events. 123 * <p> 124 * Integration testing documentation is available at <a href="https://www.soklet.com/docs/testing">https://www.soklet.com/docs/testing</a>. 125 * 126 * @author <a href="https://www.revetkn.com">Mark Allen</a> 127 */ 128@ThreadSafe 129public final class Soklet implements AutoCloseable { 130 @NonNull 131 private static final Map<@NonNull String, @NonNull Set<@NonNull String>> DEFAULT_ACCEPTED_HANDSHAKE_HEADERS; 132 133 static { 134 // Generally speaking, we always want these headers for SSE streaming responses. 135 // Users can override if they think necessary 136 LinkedCaseInsensitiveMap<Set<String>> defaultAcceptedHandshakeHeaders = new LinkedCaseInsensitiveMap<>(4); 137 defaultAcceptedHandshakeHeaders.put("Content-Type", Set.of("text/event-stream; charset=UTF-8")); 138 defaultAcceptedHandshakeHeaders.put("Cache-Control", Set.of("no-cache", "no-transform")); 139 defaultAcceptedHandshakeHeaders.put("Connection", Set.of("keep-alive")); 140 defaultAcceptedHandshakeHeaders.put("X-Accel-Buffering", Set.of("no")); 141 142 DEFAULT_ACCEPTED_HANDSHAKE_HEADERS = Collections.unmodifiableMap(defaultAcceptedHandshakeHeaders); 143 } 144 145 /** 146 * Acquires a Soklet instance with the given configuration. 147 * 148 * @param sokletConfig configuration that drives the Soklet system 149 * @return a Soklet instance 150 */ 151 @NonNull 152public static Soklet fromConfig(@NonNull SokletConfig sokletConfig) { 153 requireNonNull(sokletConfig); 154 return new Soklet(sokletConfig); 155 } 156 157 @NonNull 158 private final SokletConfig sokletConfig; 159 @NonNull 160 private final ReentrantLock lock; 161 @NonNull 162 private final AtomicReference<CountDownLatch> awaitShutdownLatchReference; 163 164 /** 165 * Creates a Soklet instance with the given configuration. 166 * 167 * @param sokletConfig configuration that drives the Soklet system 168 */ 169 private Soklet(@NonNull SokletConfig sokletConfig) { 170 requireNonNull(sokletConfig); 171 172 this.sokletConfig = sokletConfig; 173 this.lock = new ReentrantLock(); 174 this.awaitShutdownLatchReference = new AtomicReference<>(new CountDownLatch(1)); 175 176 // Fail fast in the event that Soklet appears misconfigured 177 if (sokletConfig.getResourceMethodResolver().getResourceMethods().size() == 0) 178 throw new IllegalStateException(format("No Soklet Resource Methods were found. Please ensure your %s is configured correctly. " 179 + "See https://www.soklet.com/docs/request-handling#resource-method-resolution for details.", ResourceMethodResolver.class.getSimpleName())); 180 181 // SSE misconfiguration check: @ServerSentEventSource resource methods are declared, but not ServerSentEventServer exists 182 boolean hasSseResourceMethods = sokletConfig.getResourceMethodResolver().getResourceMethods().stream() 183 .anyMatch(resourceMethod -> resourceMethod.isServerSentEventSource()); 184 185 if (hasSseResourceMethods && sokletConfig.getServerSentEventServer().isEmpty()) 186 throw new IllegalStateException(format("Resource Methods annotated with @%s were found, but no %s is configured. See https://www.soklet.com/docs/server-sent-events for details.", 187 ServerSentEventSource.class.getSimpleName(), ServerSentEventServer.class.getSimpleName())); 188 189 MetricsCollector metricsCollector = sokletConfig.getMetricsCollector(); 190 191 if (metricsCollector instanceof DefaultMetricsCollector defaultMetricsCollector) { 192 try { 193 defaultMetricsCollector.initialize(sokletConfig); 194 } catch (Throwable t) { 195 sokletConfig.getLifecycleObserver().didReceiveLogEvent( 196 LogEvent.with(LogEventType.METRICS_COLLECTOR_FAILED, 197 format("An exception occurred while initializing %s", metricsCollector.getClass().getSimpleName())) 198 .throwable(t) 199 .build()); 200 } 201 } 202 203 // Use a layer of indirection here so the Soklet type does not need to directly implement the `RequestHandler` interface. 204 // Reasoning: the `handleRequest` method for Soklet should not be public, which might lead to accidental invocation by users. 205 // That method should only be called by the managed `Server` instance. 206 Soklet soklet = this; 207 208 sokletConfig.getServer().initialize(getSokletConfig(), (request, marshaledResponseConsumer) -> { 209 // Delegate to Soklet's internal request handling method 210 soklet.handleRequest(request, ServerType.STANDARD_HTTP, marshaledResponseConsumer); 211 }); 212 213 ServerSentEventServer serverSentEventServer = sokletConfig.getServerSentEventServer().orElse(null); 214 215 if (serverSentEventServer != null) 216 serverSentEventServer.initialize(sokletConfig, (request, marshaledResponseConsumer) -> { 217 // Delegate to Soklet's internal request handling method 218 soklet.handleRequest(request, ServerType.SERVER_SENT_EVENT, marshaledResponseConsumer); 219 }); 220 } 221 222 /** 223 * Starts the managed server instance[s]. 224 * <p> 225 * If the managed server[s] are already started, this is a no-op. 226 */ 227 public void start() { 228 getLock().lock(); 229 230 try { 231 if (isStarted()) 232 return; 233 234 getAwaitShutdownLatchReference().set(new CountDownLatch(1)); 235 236 SokletConfig sokletConfig = getSokletConfig(); 237 LifecycleObserver lifecycleObserver = sokletConfig.getLifecycleObserver(); 238 239 // 1. Notify global intent to start 240 lifecycleObserver.willStartSoklet(this); 241 242 try { 243 Server server = sokletConfig.getServer(); 244 245 // 2. Attempt to start Main Server 246 lifecycleObserver.willStartServer(server); 247 try { 248 server.start(); 249 lifecycleObserver.didStartServer(server); 250 } catch (Throwable t) { 251 lifecycleObserver.didFailToStartServer(server, t); 252 throw t; // Rethrow to trigger outer catch block 253 } 254 255 // 3. Attempt to start SSE Server (if present) 256 ServerSentEventServer serverSentEventServer = sokletConfig.getServerSentEventServer().orElse(null); 257 258 if (serverSentEventServer != null) { 259 lifecycleObserver.willStartServerSentEventServer(serverSentEventServer); 260 try { 261 serverSentEventServer.start(); 262 lifecycleObserver.didStartServerSentEventServer(serverSentEventServer); 263 } catch (Throwable t) { 264 lifecycleObserver.didFailToStartServerSentEventServer(serverSentEventServer, t); 265 throw t; // Rethrow to trigger outer catch block 266 } 267 } 268 269 // 4. Global success 270 lifecycleObserver.didStartSoklet(this); 271 } catch (Throwable t) { 272 // 5. Global failure 273 lifecycleObserver.didFailToStartSoklet(this, t); 274 275 // Ensure the exception bubbles up so the application knows startup failed 276 if (t instanceof RuntimeException) 277 throw (RuntimeException) t; 278 279 throw new RuntimeException(t); 280 } 281 } finally { 282 getLock().unlock(); 283 } 284 } 285 286 /** 287 * Stops the managed server instance[s]. 288 * <p> 289 * If the managed server[s] are already stopped, this is a no-op. 290 */ 291 public void stop() { 292 getLock().lock(); 293 294 try { 295 if (isStarted()) { 296 SokletConfig sokletConfig = getSokletConfig(); 297 LifecycleObserver lifecycleObserver = sokletConfig.getLifecycleObserver(); 298 299 // 1. Notify global intent to stop 300 lifecycleObserver.willStopSoklet(this); 301 302 Throwable firstEncounteredException = null; 303 Server server = sokletConfig.getServer(); 304 305 // 2. Attempt to stop Main Server 306 if (server.isStarted()) { 307 lifecycleObserver.willStopServer(server); 308 try { 309 server.stop(); 310 lifecycleObserver.didStopServer(server); 311 } catch (Throwable t) { 312 firstEncounteredException = t; 313 lifecycleObserver.didFailToStopServer(server, t); 314 } 315 } 316 317 // 3. Attempt to stop SSE Server 318 ServerSentEventServer serverSentEventServer = sokletConfig.getServerSentEventServer().orElse(null); 319 320 if (serverSentEventServer != null && serverSentEventServer.isStarted()) { 321 lifecycleObserver.willStopServerSentEventServer(serverSentEventServer); 322 try { 323 serverSentEventServer.stop(); 324 lifecycleObserver.didStopServerSentEventServer(serverSentEventServer); 325 } catch (Throwable t) { 326 if (firstEncounteredException == null) 327 firstEncounteredException = t; 328 329 lifecycleObserver.didFailToStopServerSentEventServer(serverSentEventServer, t); 330 } 331 } 332 333 // 4. Global completion (Success or Failure) 334 if (firstEncounteredException == null) 335 lifecycleObserver.didStopSoklet(this); 336 else 337 lifecycleObserver.didFailToStopSoklet(this, firstEncounteredException); 338 } 339 } finally { 340 try { 341 getAwaitShutdownLatchReference().get().countDown(); 342 } finally { 343 getLock().unlock(); 344 } 345 } 346 } 347 348 /** 349 * Blocks the current thread until JVM shutdown ({@code SIGTERM/SIGINT/System.exit(...)} and so forth), <strong>or</strong> if one of the provided {@code shutdownTriggers} occurs. 350 * <p> 351 * This method will automatically invoke this instance's {@link #stop()} method once it becomes unblocked. 352 * <p> 353 * <strong>Notes regarding {@link ShutdownTrigger#ENTER_KEY}:</strong> 354 * <ul> 355 * <li>It will invoke {@link #stop()} on <i>all</i> Soklet instances, as stdin is process-wide</li> 356 * <li>It is only supported for environments with an interactive TTY and will be ignored if none exists (e.g. running in a Docker container) - Soklet will detect this and fire {@link LifecycleObserver#didReceiveLogEvent(LogEvent)} with an event of type {@link LogEventType#CONFIGURATION_UNSUPPORTED}</li> 357 * </ul> 358 * 359 * @param shutdownTriggers additional trigger[s] which signal that shutdown should occur, e.g. {@link ShutdownTrigger#ENTER_KEY} for "enter key pressed" 360 * @throws InterruptedException if the current thread has its interrupted status set on entry to this method, or is interrupted while waiting 361 */ 362 public void awaitShutdown(@Nullable ShutdownTrigger... shutdownTriggers) throws InterruptedException { 363 Thread shutdownHook = null; 364 boolean registeredEnterKeyShutdownTrigger = false; 365 Set<ShutdownTrigger> shutdownTriggersAsSet = shutdownTriggers == null || shutdownTriggers.length == 0 ? Set.of() : EnumSet.copyOf(Set.of(shutdownTriggers)); 366 367 try { 368 // Optionally listen for enter key 369 if (shutdownTriggersAsSet.contains(ShutdownTrigger.ENTER_KEY)) { 370 registeredEnterKeyShutdownTrigger = KeypressManager.register(this); // returns false if stdin unusable/disabled 371 372 if (!registeredEnterKeyShutdownTrigger) { 373 LogEvent logEvent = LogEvent.with( 374 LogEventType.CONFIGURATION_UNSUPPORTED, 375 format("Ignoring request for %s.%s - it is unsupported in this environment (no interactive TTY detected)", ShutdownTrigger.class.getSimpleName(), ShutdownTrigger.ENTER_KEY.name()) 376 ).build(); 377 378 getSokletConfig().getLifecycleObserver().didReceiveLogEvent(logEvent); 379 } 380 } 381 382 // Always register a shutdown hook 383 shutdownHook = new Thread(() -> { 384 try { 385 stop(); 386 } catch (Throwable ignored) { 387 // Nothing to do 388 } 389 }, "soklet-shutdown-hook"); 390 391 Runtime.getRuntime().addShutdownHook(shutdownHook); 392 393 // Wait until "close" finishes 394 getAwaitShutdownLatchReference().get().await(); 395 } finally { 396 if (registeredEnterKeyShutdownTrigger) 397 KeypressManager.unregister(this); 398 399 try { 400 Runtime.getRuntime().removeShutdownHook(shutdownHook); 401 } catch (IllegalStateException ignored) { 402 // JVM shutting down 403 } 404 } 405 } 406 407 /** 408 * Handles "awaitShutdown" for {@link ShutdownTrigger#ENTER_KEY} by listening to stdin - all Soklet instances are terminated on keypress. 409 */ 410 @ThreadSafe 411 private static final class KeypressManager { 412 @NonNull 413 private static final Set<@NonNull Soklet> SOKLET_REGISTRY; 414 @NonNull 415 private static final AtomicBoolean LISTENER_STARTED; 416 417 static { 418 SOKLET_REGISTRY = new CopyOnWriteArraySet<>(); 419 LISTENER_STARTED = new AtomicBoolean(false); 420 } 421 422 /** 423 * Register a Soklet for Enter-to-stop support. Returns true iff a listener is (or was already) active. 424 * If System.in is not usable (or disabled), returns false and does nothing. 425 */ 426 @NonNull 427 synchronized static Boolean register(@NonNull Soklet soklet) { 428 requireNonNull(soklet); 429 430 // If stdin is not readable (e.g., container with no TTY), don't start a listener. 431 if (!canReadFromStdin()) 432 return false; 433 434 SOKLET_REGISTRY.add(soklet); 435 436 // Start a single process-wide listener once. 437 if (LISTENER_STARTED.compareAndSet(false, true)) { 438 Thread thread = new Thread(KeypressManager::runLoop, "soklet-keypress-shutdown-listener"); 439 thread.setDaemon(true); // never block JVM exit 440 thread.start(); 441 } 442 443 return true; 444 } 445 446 synchronized static void unregister(@NonNull Soklet soklet) { 447 SOKLET_REGISTRY.remove(soklet); 448 // We intentionally keep the listener alive; it's daemon and cheap. 449 // If stdin hits EOF, the listener exits on its own. 450 } 451 452 /** 453 * Heuristic: if System.in is present and calling available() doesn't throw, 454 * treat it as readable. Works even in IDEs where System.console() is null. 455 */ 456 @NonNull 457 private static Boolean canReadFromStdin() { 458 if (System.in == null) 459 return false; 460 461 try { 462 // available() >= 0 means stream is open; 0 means no buffered data (that’s fine). 463 return System.in.available() >= 0; 464 } catch (IOException e) { 465 return false; 466 } 467 } 468 469 /** 470 * Single blocking read on stdin. On any line (or EOF), stop all registered servers. 471 */ 472 private static void runLoop() { 473 try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8))) { 474 // Blocks until newline or EOF; EOF (null) happens with /dev/null or closed pipe. 475 bufferedReader.readLine(); 476 stopAllSoklets(); 477 } catch (Throwable ignored) { 478 // If stdin is closed mid-run, just exit quietly. 479 } 480 } 481 482 synchronized private static void stopAllSoklets() { 483 // Either a line or EOF → stop everything that’s currently registered. 484 for (Soklet soklet : SOKLET_REGISTRY) { 485 try { 486 soklet.stop(); 487 } catch (Throwable ignored) { 488 // Nothing to do 489 } 490 } 491 } 492 493 private KeypressManager() {} 494 } 495 496 /** 497 * Nonpublic "informal" implementation of {@link com.soklet.Server.RequestHandler} so Soklet does not need to expose {@code handleRequest} publicly. 498 * Reasoning: users of this library should never call {@code handleRequest} directly - it should only be invoked in response to events 499 * provided by a {@link Server} or {@link ServerSentEventServer} implementation. 500 */ 501 protected void handleRequest(@NonNull Request request, 502 @NonNull ServerType serverType, 503 @NonNull Consumer<RequestResult> requestResultConsumer) { 504 requireNonNull(request); 505 requireNonNull(serverType); 506 requireNonNull(requestResultConsumer); 507 508 Instant processingStarted = Instant.now(); 509 510 SokletConfig sokletConfig = getSokletConfig(); 511 ResourceMethodResolver resourceMethodResolver = sokletConfig.getResourceMethodResolver(); 512 ResponseMarshaler responseMarshaler = sokletConfig.getResponseMarshaler(); 513 LifecycleObserver lifecycleObserver = sokletConfig.getLifecycleObserver(); 514 RequestInterceptor requestInterceptor = sokletConfig.getRequestInterceptor(); 515 MetricsCollector metricsCollector = sokletConfig.getMetricsCollector(); 516 517 // Holders to permit mutable effectively-final variables 518 AtomicReference<MarshaledResponse> marshaledResponseHolder = new AtomicReference<>(); 519 AtomicReference<Throwable> resourceMethodResolutionExceptionHolder = new AtomicReference<>(); 520 AtomicReference<Request> requestHolder = new AtomicReference<>(request); 521 AtomicReference<ResourceMethod> resourceMethodHolder = new AtomicReference<>(); 522 AtomicReference<RequestResult> requestResultHolder = new AtomicReference<>(); 523 524 // Holders to permit mutable effectively-final state tracking 525 AtomicBoolean willStartResponseWritingCompleted = new AtomicBoolean(false); 526 AtomicBoolean didFinishResponseWritingCompleted = new AtomicBoolean(false); 527 AtomicBoolean didFinishRequestHandlingCompleted = new AtomicBoolean(false); 528 AtomicBoolean didInvokeWrapRequestConsumer = new AtomicBoolean(false); 529 530 List<Throwable> throwables = new ArrayList<>(10); 531 532 Consumer<LogEvent> safelyLog = (logEvent -> { 533 try { 534 lifecycleObserver.didReceiveLogEvent(logEvent); 535 } catch (Throwable throwable) { 536 throwable.printStackTrace(); 537 throwables.add(throwable); 538 } 539 }); 540 541 BiConsumer<String, Consumer<MetricsCollector>> safelyCollectMetrics = (message, metricsInvocation) -> { 542 if (metricsCollector == null) 543 return; 544 545 try { 546 metricsInvocation.accept(metricsCollector); 547 } catch (Throwable throwable) { 548 safelyLog.accept(LogEvent.with(LogEventType.METRICS_COLLECTOR_FAILED, message) 549 .throwable(throwable) 550 .request(requestHolder.get()) 551 .resourceMethod(resourceMethodHolder.get()) 552 .marshaledResponse(marshaledResponseHolder.get()) 553 .build()); 554 } 555 }; 556 557 requestHolder.set(request); 558 559 try { 560 requestInterceptor.wrapRequest(serverType, request, (wrappedRequest) -> { 561 didInvokeWrapRequestConsumer.set(true); 562 requestHolder.set(wrappedRequest); 563 564 try { 565 // Resolve after wrapping so path/method rewrites affect routing. 566 resourceMethodHolder.set(resourceMethodResolver.resourceMethodForRequest(requestHolder.get(), serverType).orElse(null)); 567 resourceMethodResolutionExceptionHolder.set(null); 568 } catch (Throwable t) { 569 safelyLog.accept(LogEvent.with(LogEventType.RESOURCE_METHOD_RESOLUTION_FAILED, "Unable to resolve Resource Method") 570 .throwable(t) 571 .request(requestHolder.get()) 572 .build()); 573 574 // If an exception occurs here, keep track of it - we will surface them after letting LifecycleObserver 575 // see that a request has come in. 576 throwables.add(t); 577 resourceMethodResolutionExceptionHolder.set(t); 578 resourceMethodHolder.set(null); 579 } 580 581 try { 582 lifecycleObserver.didStartRequestHandling(serverType, requestHolder.get(), resourceMethodHolder.get()); 583 } catch (Throwable t) { 584 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_OBSERVER_DID_START_REQUEST_HANDLING_FAILED, 585 format("An exception occurred while invoking %s::didStartRequestHandling", 586 LifecycleObserver.class.getSimpleName())) 587 .throwable(t) 588 .request(requestHolder.get()) 589 .resourceMethod(resourceMethodHolder.get()) 590 .build()); 591 592 throwables.add(t); 593 } 594 595 safelyCollectMetrics.accept( 596 format("An exception occurred while invoking %s::didStartRequestHandling", MetricsCollector.class.getSimpleName()), 597 (metricsInvocation) -> metricsInvocation.didStartRequestHandling(serverType, requestHolder.get(), resourceMethodHolder.get())); 598 599 try { 600 AtomicBoolean didInvokeMarshaledResponseConsumer = new AtomicBoolean(false); 601 602 requestInterceptor.interceptRequest(serverType, requestHolder.get(), resourceMethodHolder.get(), (interceptorRequest) -> { 603 requestHolder.set(interceptorRequest); 604 605 try { 606 if (resourceMethodResolutionExceptionHolder.get() != null) 607 throw resourceMethodResolutionExceptionHolder.get(); 608 609 RequestResult requestResult = toRequestResult(requestHolder.get(), resourceMethodHolder.get(), serverType); 610 requestResultHolder.set(requestResult); 611 612 MarshaledResponse originalMarshaledResponse = requestResult.getMarshaledResponse(); 613 MarshaledResponse updatedMarshaledResponse = requestResult.getMarshaledResponse(); 614 615 // A few special cases that are "global" in that they can affect all requests and 616 // need to happen after marshaling the response... 617 618 // 1. Customize response for HEAD (e.g. remove body, set Content-Length header) 619 updatedMarshaledResponse = applyHeadResponseIfApplicable(requestHolder.get(), updatedMarshaledResponse); 620 621 // 2. Apply other standard response customizations (CORS, Content-Length) 622 // Note that we don't want to write Content-Length for SSE "accepted" handshakes 623 HandshakeResult handshakeResult = requestResult.getHandshakeResult().orElse(null); 624 boolean suppressContentLength = handshakeResult != null && handshakeResult instanceof HandshakeResult.Accepted; 625 626 updatedMarshaledResponse = applyCommonPropertiesToMarshaledResponse(requestHolder.get(), updatedMarshaledResponse, suppressContentLength); 627 628 // Update our result holder with the modified response if necessary 629 if (originalMarshaledResponse != updatedMarshaledResponse) { 630 marshaledResponseHolder.set(updatedMarshaledResponse); 631 requestResultHolder.set(requestResult.copy() 632 .marshaledResponse(updatedMarshaledResponse) 633 .finish()); 634 } 635 636 return updatedMarshaledResponse; 637 } catch (Throwable t) { 638 if (!Objects.equals(t, resourceMethodResolutionExceptionHolder.get())) { 639 throwables.add(t); 640 641 safelyLog.accept(LogEvent.with(LogEventType.REQUEST_PROCESSING_FAILED, 642 "An exception occurred while processing request") 643 .throwable(t) 644 .request(requestHolder.get()) 645 .resourceMethod(resourceMethodHolder.get()) 646 .build()); 647 } 648 649 // Unhappy path. Try to use configuration's exception response marshaler... 650 try { 651 MarshaledResponse marshaledResponse = responseMarshaler.forThrowable(requestHolder.get(), t, resourceMethodHolder.get()); 652 marshaledResponse = applyCommonPropertiesToMarshaledResponse(requestHolder.get(), marshaledResponse); 653 marshaledResponseHolder.set(marshaledResponse); 654 655 return marshaledResponse; 656 } catch (Throwable t2) { 657 throwables.add(t2); 658 659 safelyLog.accept(LogEvent.with(LogEventType.RESPONSE_MARSHALER_FOR_THROWABLE_FAILED, 660 format("An exception occurred while trying to write an exception response for %s", t)) 661 .throwable(t2) 662 .request(requestHolder.get()) 663 .resourceMethod(resourceMethodHolder.get()) 664 .build()); 665 666 // The configuration's exception response marshaler failed - provide a failsafe response to recover 667 return provideFailsafeMarshaledResponse(requestHolder.get(), t2); 668 } 669 } 670 }, (interceptorMarshaledResponse) -> { 671 requireNonNull(interceptorMarshaledResponse); 672 didInvokeMarshaledResponseConsumer.set(true); 673 marshaledResponseHolder.set(interceptorMarshaledResponse); 674 }); 675 676 if (!didInvokeMarshaledResponseConsumer.get()) { 677 requestResultHolder.set(null); 678 throw new IllegalStateException(format("%s::interceptRequest must call responseWriter", RequestInterceptor.class.getSimpleName())); 679 } 680 } catch (Throwable t) { 681 throwables.add(t); 682 683 try { 684 // In the event that an error occurs during processing of a RequestInterceptor method, for example 685 safelyLog.accept(LogEvent.with(LogEventType.REQUEST_INTERCEPTOR_INTERCEPT_REQUEST_FAILED, 686 format("An exception occurred while invoking %s::interceptRequest", RequestInterceptor.class.getSimpleName())) 687 .throwable(t) 688 .request(requestHolder.get()) 689 .resourceMethod(resourceMethodHolder.get()) 690 .build()); 691 692 MarshaledResponse marshaledResponse = responseMarshaler.forThrowable(requestHolder.get(), t, resourceMethodHolder.get()); 693 marshaledResponse = applyCommonPropertiesToMarshaledResponse(requestHolder.get(), marshaledResponse); 694 marshaledResponseHolder.set(marshaledResponse); 695 } catch (Throwable t2) { 696 throwables.add(t2); 697 698 safelyLog.accept(LogEvent.with(LogEventType.RESPONSE_MARSHALER_FOR_THROWABLE_FAILED, 699 format("An exception occurred while invoking %s::forThrowable when trying to write an exception response for %s", ResponseMarshaler.class.getSimpleName(), t)) 700 .throwable(t2) 701 .request(requestHolder.get()) 702 .resourceMethod(resourceMethodHolder.get()) 703 .build()); 704 705 marshaledResponseHolder.set(provideFailsafeMarshaledResponse(requestHolder.get(), t2)); 706 } 707 } finally { 708 try { 709 try { 710 lifecycleObserver.willWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get()); 711 } finally { 712 willStartResponseWritingCompleted.set(true); 713 } 714 715 safelyCollectMetrics.accept( 716 format("An exception occurred while invoking %s::willWriteResponse", MetricsCollector.class.getSimpleName()), 717 (metricsInvocation) -> metricsInvocation.willWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get())); 718 719 Instant responseWriteStarted = Instant.now(); 720 721 try { 722 RequestResult requestResult = requestResultHolder.get(); 723 724 if (requestResult != null) 725 requestResultConsumer.accept(requestResult); 726 else 727 requestResultConsumer.accept(RequestResult.withMarshaledResponse(marshaledResponseHolder.get()) 728 .resourceMethod(resourceMethodHolder.get()) 729 .build()); 730 731 Instant responseWriteFinished = Instant.now(); 732 Duration responseWriteDuration = Duration.between(responseWriteStarted, responseWriteFinished); 733 734 try { 735 lifecycleObserver.didWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), responseWriteDuration); 736 } catch (Throwable t) { 737 throwables.add(t); 738 739 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_OBSERVER_DID_WRITE_RESPONSE_FAILED, 740 format("An exception occurred while invoking %s::didWriteResponse", 741 LifecycleObserver.class.getSimpleName())) 742 .throwable(t) 743 .request(requestHolder.get()) 744 .resourceMethod(resourceMethodHolder.get()) 745 .marshaledResponse(marshaledResponseHolder.get()) 746 .build()); 747 } finally { 748 didFinishResponseWritingCompleted.set(true); 749 } 750 751 safelyCollectMetrics.accept( 752 format("An exception occurred while invoking %s::didWriteResponse", MetricsCollector.class.getSimpleName()), 753 (metricsInvocation) -> metricsInvocation.didWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), 754 marshaledResponseHolder.get(), responseWriteDuration)); 755 } catch (Throwable t) { 756 throwables.add(t); 757 758 Instant responseWriteFinished = Instant.now(); 759 Duration responseWriteDuration = Duration.between(responseWriteStarted, responseWriteFinished); 760 761 try { 762 lifecycleObserver.didFailToWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), responseWriteDuration, t); 763 } catch (Throwable t2) { 764 throwables.add(t2); 765 766 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_OBSERVER_DID_WRITE_RESPONSE_FAILED, 767 format("An exception occurred while invoking %s::didFailToWriteResponse", 768 LifecycleObserver.class.getSimpleName())) 769 .throwable(t2) 770 .request(requestHolder.get()) 771 .resourceMethod(resourceMethodHolder.get()) 772 .marshaledResponse(marshaledResponseHolder.get()) 773 .build()); 774 } 775 776 safelyCollectMetrics.accept( 777 format("An exception occurred while invoking %s::didFailToWriteResponse", MetricsCollector.class.getSimpleName()), 778 (metricsInvocation) -> metricsInvocation.didFailToWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), 779 marshaledResponseHolder.get(), responseWriteDuration, t)); 780 } 781 } finally { 782 Duration processingDuration = Duration.between(processingStarted, Instant.now()); 783 784 safelyCollectMetrics.accept( 785 format("An exception occurred while invoking %s::didFinishRequestHandling", MetricsCollector.class.getSimpleName()), 786 (metricsInvocation) -> metricsInvocation.didFinishRequestHandling(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), processingDuration, Collections.unmodifiableList(throwables))); 787 788 try { 789 lifecycleObserver.didFinishRequestHandling(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), processingDuration, Collections.unmodifiableList(throwables)); 790 } catch (Throwable t) { 791 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_OBSERVER_DID_FINISH_REQUEST_HANDLING_FAILED, 792 format("An exception occurred while invoking %s::didFinishRequestHandling", 793 LifecycleObserver.class.getSimpleName())) 794 .throwable(t) 795 .request(requestHolder.get()) 796 .resourceMethod(resourceMethodHolder.get()) 797 .marshaledResponse(marshaledResponseHolder.get()) 798 .build()); 799 } finally { 800 didFinishRequestHandlingCompleted.set(true); 801 } 802 } 803 } 804 }); 805 806 if (!didInvokeWrapRequestConsumer.get()) 807 throw new IllegalStateException(format("%s::wrapRequest must call requestProcessor", RequestInterceptor.class.getSimpleName())); 808 } catch (Throwable t) { 809 // If an error occurred during request wrapping, it's possible a response was never written/communicated back to LifecycleObserver. 810 // Detect that here and inform LifecycleObserver accordingly. 811 safelyLog.accept(LogEvent.with(LogEventType.REQUEST_INTERCEPTOR_WRAP_REQUEST_FAILED, 812 format("An exception occurred while invoking %s::wrapRequest", 813 RequestInterceptor.class.getSimpleName())) 814 .throwable(t) 815 .request(requestHolder.get()) 816 .resourceMethod(resourceMethodHolder.get()) 817 .marshaledResponse(marshaledResponseHolder.get()) 818 .build()); 819 820 // If we don't have a response, let the marshaler try to make one for the exception. 821 // If that fails, use the failsafe. 822 if (marshaledResponseHolder.get() == null) { 823 try { 824 MarshaledResponse marshaledResponse = responseMarshaler.forThrowable(requestHolder.get(), t, resourceMethodHolder.get()); 825 marshaledResponse = applyCommonPropertiesToMarshaledResponse(requestHolder.get(), marshaledResponse); 826 marshaledResponseHolder.set(marshaledResponse); 827 } catch (Throwable t2) { 828 throwables.add(t2); 829 830 safelyLog.accept(LogEvent.with(LogEventType.RESPONSE_MARSHALER_FOR_THROWABLE_FAILED, 831 format("An exception occurred during request wrapping while invoking %s::forThrowable", 832 ResponseMarshaler.class.getSimpleName())) 833 .throwable(t2) 834 .request(requestHolder.get()) 835 .resourceMethod(resourceMethodHolder.get()) 836 .marshaledResponse(marshaledResponseHolder.get()) 837 .build()); 838 839 marshaledResponseHolder.set(provideFailsafeMarshaledResponse(requestHolder.get(), t)); 840 } 841 } 842 843 if (!willStartResponseWritingCompleted.get()) { 844 try { 845 lifecycleObserver.willWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get()); 846 } catch (Throwable t2) { 847 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_OBSERVER_WILL_WRITE_RESPONSE_FAILED, 848 format("An exception occurred while invoking %s::willWriteResponse", 849 LifecycleObserver.class.getSimpleName())) 850 .throwable(t2) 851 .request(requestHolder.get()) 852 .resourceMethod(resourceMethodHolder.get()) 853 .marshaledResponse(marshaledResponseHolder.get()) 854 .build()); 855 } 856 857 safelyCollectMetrics.accept( 858 format("An exception occurred while invoking %s::willWriteResponse", MetricsCollector.class.getSimpleName()), 859 (metricsInvocation) -> metricsInvocation.willWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get())); 860 } 861 862 try { 863 Instant responseWriteStarted = Instant.now(); 864 865 if (!didFinishResponseWritingCompleted.get()) { 866 try { 867 RequestResult requestResult = requestResultHolder.get(); 868 869 if (requestResult != null) 870 requestResultConsumer.accept(requestResult); 871 else 872 requestResultConsumer.accept(RequestResult.withMarshaledResponse(marshaledResponseHolder.get()) 873 .resourceMethod(resourceMethodHolder.get()) 874 .build()); 875 876 Instant responseWriteFinished = Instant.now(); 877 Duration responseWriteDuration = Duration.between(responseWriteStarted, responseWriteFinished); 878 879 try { 880 lifecycleObserver.didWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), responseWriteDuration); 881 } catch (Throwable t2) { 882 throwables.add(t2); 883 884 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_OBSERVER_DID_WRITE_RESPONSE_FAILED, 885 format("An exception occurred while invoking %s::didWriteResponse", 886 LifecycleObserver.class.getSimpleName())) 887 .throwable(t2) 888 .request(requestHolder.get()) 889 .resourceMethod(resourceMethodHolder.get()) 890 .marshaledResponse(marshaledResponseHolder.get()) 891 .build()); 892 } 893 894 safelyCollectMetrics.accept( 895 format("An exception occurred while invoking %s::didWriteResponse", MetricsCollector.class.getSimpleName()), 896 (metricsInvocation) -> metricsInvocation.didWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), 897 marshaledResponseHolder.get(), responseWriteDuration)); 898 } catch (Throwable t2) { 899 throwables.add(t2); 900 901 Instant responseWriteFinished = Instant.now(); 902 Duration responseWriteDuration = Duration.between(responseWriteStarted, responseWriteFinished); 903 904 try { 905 lifecycleObserver.didFailToWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), responseWriteDuration, t); 906 } catch (Throwable t3) { 907 throwables.add(t3); 908 909 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_OBSERVER_DID_WRITE_RESPONSE_FAILED, 910 format("An exception occurred while invoking %s::didFailToWriteResponse", 911 LifecycleObserver.class.getSimpleName())) 912 .throwable(t3) 913 .request(requestHolder.get()) 914 .resourceMethod(resourceMethodHolder.get()) 915 .marshaledResponse(marshaledResponseHolder.get()) 916 .build()); 917 } 918 919 safelyCollectMetrics.accept( 920 format("An exception occurred while invoking %s::didFailToWriteResponse", MetricsCollector.class.getSimpleName()), 921 (metricsInvocation) -> metricsInvocation.didFailToWriteResponse(serverType, requestHolder.get(), resourceMethodHolder.get(), 922 marshaledResponseHolder.get(), responseWriteDuration, t)); 923 } 924 } 925 } finally { 926 if (!didFinishRequestHandlingCompleted.get()) { 927 Duration processingDuration = Duration.between(processingStarted, Instant.now()); 928 929 safelyCollectMetrics.accept( 930 format("An exception occurred while invoking %s::didFinishRequestHandling", MetricsCollector.class.getSimpleName()), 931 (metricsInvocation) -> metricsInvocation.didFinishRequestHandling(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), processingDuration, Collections.unmodifiableList(throwables))); 932 933 try { 934 lifecycleObserver.didFinishRequestHandling(serverType, requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), processingDuration, Collections.unmodifiableList(throwables)); 935 } catch (Throwable t2) { 936 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_OBSERVER_DID_FINISH_REQUEST_HANDLING_FAILED, 937 format("An exception occurred while invoking %s::didFinishRequestHandling", 938 LifecycleObserver.class.getSimpleName())) 939 .throwable(t2) 940 .request(requestHolder.get()) 941 .resourceMethod(resourceMethodHolder.get()) 942 .marshaledResponse(marshaledResponseHolder.get()) 943 .build()); 944 } 945 } 946 } 947 } 948 } 949 950 @NonNull 951 protected RequestResult toRequestResult(@NonNull Request request, 952 @Nullable ResourceMethod resourceMethod, 953 @NonNull ServerType serverType) throws Throwable { 954 requireNonNull(request); 955 requireNonNull(serverType); 956 957 ResourceMethodParameterProvider resourceMethodParameterProvider = getSokletConfig().getResourceMethodParameterProvider(); 958 InstanceProvider instanceProvider = getSokletConfig().getInstanceProvider(); 959 CorsAuthorizer corsAuthorizer = getSokletConfig().getCorsAuthorizer(); 960 ResourceMethodResolver resourceMethodResolver = getSokletConfig().getResourceMethodResolver(); 961 ResponseMarshaler responseMarshaler = getSokletConfig().getResponseMarshaler(); 962 CorsPreflight corsPreflight = request.getCorsPreflight().orElse(null); 963 964 // Special short-circuit for big requests 965 if (request.isContentTooLarge()) 966 return RequestResult.withMarshaledResponse(responseMarshaler.forContentTooLarge(request, resourceMethodResolver.resourceMethodForRequest(request, serverType).orElse(null))) 967 .resourceMethod(resourceMethod) 968 .build(); 969 970 // Special short-circuit for OPTIONS * 971 if (request.getResourcePath() == ResourcePath.OPTIONS_SPLAT_RESOURCE_PATH) 972 return RequestResult.withMarshaledResponse(responseMarshaler.forOptionsSplat(request)).build(); 973 974 // No resource method was found for this HTTP method and path. 975 if (resourceMethod == null) { 976 // If this was an OPTIONS request, do special processing. 977 // If not, figure out if we should return a 404 or 405. 978 if (request.getHttpMethod() == HttpMethod.OPTIONS) { 979 // See what methods are available to us for this request's path 980 Map<HttpMethod, ResourceMethod> matchingResourceMethodsByHttpMethod = resolveMatchingResourceMethodsByHttpMethod(request, resourceMethodResolver, serverType); 981 982 // Special handling for CORS preflight requests, if needed 983 if (corsPreflight != null) { 984 // Let configuration function determine if we should authorize this request. 985 // Discard any OPTIONS references - see https://stackoverflow.com/a/68529748 986 Map<HttpMethod, ResourceMethod> nonOptionsMatchingResourceMethodsByHttpMethod = matchingResourceMethodsByHttpMethod.entrySet().stream() 987 .filter(entry -> entry.getKey() != HttpMethod.OPTIONS) 988 .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); 989 990 CorsPreflightResponse corsPreflightResponse = corsAuthorizer.authorizePreflight(request, corsPreflight, nonOptionsMatchingResourceMethodsByHttpMethod).orElse(null); 991 992 // Allow or reject CORS depending on what the function said to do 993 if (corsPreflightResponse != null) { 994 // Allow 995 MarshaledResponse marshaledResponse = responseMarshaler.forCorsPreflightAllowed(request, corsPreflight, corsPreflightResponse); 996 997 return RequestResult.withMarshaledResponse(marshaledResponse) 998 .corsPreflightResponse(corsPreflightResponse) 999 .resourceMethod(resourceMethod) 1000 .build(); 1001 } 1002 1003 // Reject 1004 return RequestResult.withMarshaledResponse(responseMarshaler.forCorsPreflightRejected(request, corsPreflight)) 1005 .resourceMethod(resourceMethod) 1006 .build(); 1007 } else { 1008 // Just a normal OPTIONS response (non-CORS-preflight). 1009 // If there's a matching OPTIONS resource method for this OPTIONS request, then invoke it. 1010 ResourceMethod optionsResourceMethod = matchingResourceMethodsByHttpMethod.get(HttpMethod.OPTIONS); 1011 1012 if (optionsResourceMethod != null) { 1013 resourceMethod = optionsResourceMethod; 1014 } else { 1015 Set<HttpMethod> allowedHttpMethods = allowedHttpMethodsForResponse(matchingResourceMethodsByHttpMethod, true); 1016 1017 return RequestResult.withMarshaledResponse(responseMarshaler.forOptions(request, allowedHttpMethods)) 1018 .resourceMethod(resourceMethod) 1019 .build(); 1020 } 1021 } 1022 } else if (request.getHttpMethod() == HttpMethod.HEAD) { 1023 // If there's a matching GET resource method for this HEAD request, then invoke it 1024 Request headGetRequest = request.copy().httpMethod(HttpMethod.GET).finish(); 1025 ResourceMethod headGetResourceMethod = resourceMethodResolver.resourceMethodForRequest(headGetRequest, serverType).orElse(null); 1026 1027 if (headGetResourceMethod != null) 1028 resourceMethod = headGetResourceMethod; 1029 else 1030 return RequestResult.withMarshaledResponse(responseMarshaler.forNotFound(request)) 1031 .resourceMethod(resourceMethod) 1032 .build(); 1033 } else { 1034 // Not an OPTIONS request, so it's possible we have a 405. See if other HTTP methods match... 1035 Map<HttpMethod, ResourceMethod> otherMatchingResourceMethodsByHttpMethod = resolveMatchingResourceMethodsByHttpMethod(request, resourceMethodResolver, serverType); 1036 1037 Set<HttpMethod> matchingNonOptionsHttpMethods = otherMatchingResourceMethodsByHttpMethod.keySet().stream() 1038 .filter(httpMethod -> httpMethod != HttpMethod.OPTIONS) 1039 .collect(Collectors.toSet()); 1040 1041 if (matchingNonOptionsHttpMethods.size() > 0) { 1042 // ...if some do, it's a 405 1043 Set<HttpMethod> allowedHttpMethods = allowedHttpMethodsForResponse(otherMatchingResourceMethodsByHttpMethod, true); 1044 return RequestResult.withMarshaledResponse(responseMarshaler.forMethodNotAllowed(request, allowedHttpMethods)) 1045 .resourceMethod(resourceMethod) 1046 .build(); 1047 } else { 1048 // no matching resource method found, it's a 404 1049 return RequestResult.withMarshaledResponse(responseMarshaler.forNotFound(request)) 1050 .resourceMethod(resourceMethod) 1051 .build(); 1052 } 1053 } 1054 } 1055 1056 // Found a resource method - happy path. 1057 // 1. Get an instance of the resource class 1058 // 2. Get values to pass to the resource method on the resource class 1059 // 3. Invoke the resource method and use its return value to drive a response 1060 Class<?> resourceClass = resourceMethod.getMethod().getDeclaringClass(); 1061 Object resourceClassInstance; 1062 1063 try { 1064 resourceClassInstance = instanceProvider.provide(resourceClass); 1065 } catch (Exception e) { 1066 throw new IllegalArgumentException(format("Unable to acquire an instance of %s", resourceClass.getName()), e); 1067 } 1068 1069 List<Object> parameterValues = resourceMethodParameterProvider.parameterValuesForResourceMethod(request, resourceMethod); 1070 1071 Object responseObject; 1072 1073 try { 1074 responseObject = resourceMethod.getMethod().invoke(resourceClassInstance, parameterValues.toArray()); 1075 } catch (InvocationTargetException e) { 1076 if (e.getTargetException() != null) 1077 throw e.getTargetException(); 1078 1079 throw e; 1080 } 1081 1082 // Unwrap the Optional<T>, if one exists. We do not recurse deeper than one level 1083 if (responseObject instanceof Optional<?>) 1084 responseObject = ((Optional<?>) responseObject).orElse(null); 1085 1086 Response response; 1087 HandshakeResult handshakeResult = null; 1088 1089 // If null/void return, it's a 204 1090 // If it's a MarshaledResponse object, no marshaling + return it immediately - caller knows exactly what it wants to write. 1091 // If it's a Response object, use as is. 1092 // If it's a non-Response type of object, assume it's the response body and wrap in a Response. 1093 if (responseObject == null) { 1094 response = Response.withStatusCode(204).build(); 1095 } else if (responseObject instanceof MarshaledResponse) { 1096 MarshaledResponse marshaledResponse = (MarshaledResponse) responseObject; 1097 enforceBodylessStatusCode(marshaledResponse.getStatusCode(), marshaledResponse.getBody().isPresent()); 1098 1099 return RequestResult.withMarshaledResponse(marshaledResponse) 1100 .resourceMethod(resourceMethod) 1101 .build(); 1102 } else if (responseObject instanceof Response) { 1103 response = (Response) responseObject; 1104 } else if (responseObject instanceof HandshakeResult.Accepted accepted) { // SSE "accepted" handshake 1105 return RequestResult.withMarshaledResponse(toMarshaledResponse(accepted)) 1106 .resourceMethod(resourceMethod) 1107 .handshakeResult(accepted) 1108 .build(); 1109 } else if (responseObject instanceof HandshakeResult.Rejected rejected) { // SSE "rejected" handshake 1110 response = rejected.getResponse(); 1111 handshakeResult = rejected; 1112 } else { 1113 response = Response.withStatusCode(200).body(responseObject).build(); 1114 } 1115 1116 enforceBodylessStatusCode(response.getStatusCode(), response.getBody().isPresent()); 1117 1118 MarshaledResponse marshaledResponse = responseMarshaler.forResourceMethod(request, response, resourceMethod); 1119 1120 enforceBodylessStatusCode(marshaledResponse.getStatusCode(), marshaledResponse.getBody().isPresent()); 1121 1122 return RequestResult.withMarshaledResponse(marshaledResponse) 1123 .response(response) 1124 .resourceMethod(resourceMethod) 1125 .handshakeResult(handshakeResult) 1126 .build(); 1127 } 1128 1129 @NonNull 1130 private MarshaledResponse toMarshaledResponse(HandshakeResult.@NonNull Accepted accepted) { 1131 requireNonNull(accepted); 1132 1133 Map<String, Set<String>> headers = accepted.getHeaders(); 1134 LinkedCaseInsensitiveMap<Set<String>> finalHeaders = new LinkedCaseInsensitiveMap<>(DEFAULT_ACCEPTED_HANDSHAKE_HEADERS.size() + headers.size()); 1135 1136 // Start with defaults 1137 for (Map.Entry<String, Set<String>> e : DEFAULT_ACCEPTED_HANDSHAKE_HEADERS.entrySet()) 1138 finalHeaders.put(e.getKey(), e.getValue()); // values already unmodifiable 1139 1140 // Overlay user-supplied headers (prefer user values on key collision) 1141 for (Map.Entry<String, Set<String>> e : headers.entrySet()) { 1142 // Defensively copy so callers can't mutate after construction 1143 Set<String> values = e.getValue() == null ? Set.of() : Set.copyOf(e.getValue()); 1144 finalHeaders.put(e.getKey(), values); 1145 } 1146 1147 return MarshaledResponse.withStatusCode(200) 1148 .headers(finalHeaders) 1149 .cookies(accepted.getCookies()) 1150 .build(); 1151 } 1152 1153 private static void enforceBodylessStatusCode(@NonNull Integer statusCode, 1154 @NonNull Boolean hasBody) { 1155 requireNonNull(statusCode); 1156 requireNonNull(hasBody); 1157 1158 if (hasBody && isBodylessStatusCode(statusCode)) 1159 throw new IllegalStateException(format("HTTP status code %d must not include a response body", statusCode)); 1160 } 1161 1162 private static boolean isBodylessStatusCode(@NonNull Integer statusCode) { 1163 requireNonNull(statusCode); 1164 return (statusCode >= 100 && statusCode < 200) || statusCode == 204 || statusCode == 304; 1165 } 1166 1167 @NonNull 1168 protected MarshaledResponse applyHeadResponseIfApplicable(@NonNull Request request, 1169 @NonNull MarshaledResponse marshaledResponse) { 1170 if (request.getHttpMethod() != HttpMethod.HEAD) 1171 return marshaledResponse; 1172 1173 return getSokletConfig().getResponseMarshaler().forHead(request, marshaledResponse); 1174 } 1175 1176 // Hat tip to Aslan Parçası and GrayStar 1177 @NonNull 1178 protected MarshaledResponse applyCommonPropertiesToMarshaledResponse(@NonNull Request request, 1179 @NonNull MarshaledResponse marshaledResponse) { 1180 requireNonNull(request); 1181 requireNonNull(marshaledResponse); 1182 1183 return applyCommonPropertiesToMarshaledResponse(request, marshaledResponse, false); 1184 } 1185 1186 @NonNull 1187 protected MarshaledResponse applyCommonPropertiesToMarshaledResponse(@NonNull Request request, 1188 @NonNull MarshaledResponse marshaledResponse, 1189 @NonNull Boolean suppressContentLength) { 1190 requireNonNull(request); 1191 requireNonNull(marshaledResponse); 1192 requireNonNull(suppressContentLength); 1193 1194 // Don't write Content-Length for an accepted SSE Handshake, for example 1195 if (!suppressContentLength) 1196 marshaledResponse = applyContentLengthIfApplicable(request, marshaledResponse); 1197 1198 // If the Date header is missing, add it using our cached provider 1199 if (!marshaledResponse.getHeaders().containsKey("Date")) 1200 marshaledResponse = marshaledResponse.copy() 1201 .headers(headers -> headers.put("Date", Set.of(CachedHttpDate.getCurrentValue()))) 1202 .finish(); 1203 1204 marshaledResponse = applyCorsResponseIfApplicable(request, marshaledResponse); 1205 1206 return marshaledResponse; 1207 } 1208 1209 @NonNull 1210 protected MarshaledResponse applyContentLengthIfApplicable(@NonNull Request request, 1211 @NonNull MarshaledResponse marshaledResponse) { 1212 requireNonNull(request); 1213 requireNonNull(marshaledResponse); 1214 1215 Set<String> normalizedHeaderNames = marshaledResponse.getHeaders().keySet().stream() 1216 .map(headerName -> headerName.toLowerCase(Locale.US)) 1217 .collect(Collectors.toSet()); 1218 1219 // If Content-Length is already specified, don't do anything 1220 if (normalizedHeaderNames.contains("content-length")) 1221 return marshaledResponse; 1222 1223 // If Content-Length is not specified, specify as the number of bytes in the body 1224 return marshaledResponse.copy() 1225 .headers((mutableHeaders) -> { 1226 String contentLengthHeaderValue = String.valueOf(marshaledResponse.getBody().orElse(emptyByteArray()).length); 1227 mutableHeaders.put("Content-Length", Set.of(contentLengthHeaderValue)); 1228 }).finish(); 1229 } 1230 1231 @NonNull 1232 protected MarshaledResponse applyCorsResponseIfApplicable(@NonNull Request request, 1233 @NonNull MarshaledResponse marshaledResponse) { 1234 requireNonNull(request); 1235 requireNonNull(marshaledResponse); 1236 1237 Cors cors = request.getCors().orElse(null); 1238 1239 // If non-CORS request, nothing further to do (note that CORS preflight was handled earlier) 1240 if (cors == null) 1241 return marshaledResponse; 1242 1243 CorsAuthorizer corsAuthorizer = getSokletConfig().getCorsAuthorizer(); 1244 1245 // Does the authorizer say we are authorized? 1246 CorsResponse corsResponse = corsAuthorizer.authorize(request, cors).orElse(null); 1247 1248 // Not authorized - don't apply CORS headers to the response 1249 if (corsResponse == null) 1250 return marshaledResponse; 1251 1252 // Authorized - OK, let's apply the headers to the response 1253 return getSokletConfig().getResponseMarshaler().forCorsAllowed(request, cors, corsResponse, marshaledResponse); 1254 } 1255 1256 @NonNull 1257 protected Map<@NonNull HttpMethod, @NonNull ResourceMethod> resolveMatchingResourceMethodsByHttpMethod(@NonNull Request request, 1258 @NonNull ResourceMethodResolver resourceMethodResolver, 1259 @NonNull ServerType serverType) { 1260 requireNonNull(request); 1261 requireNonNull(resourceMethodResolver); 1262 requireNonNull(serverType); 1263 1264 // Special handling for OPTIONS * 1265 if (request.getResourcePath() == ResourcePath.OPTIONS_SPLAT_RESOURCE_PATH) 1266 return new LinkedHashMap<>(); 1267 1268 Map<HttpMethod, ResourceMethod> matchingResourceMethodsByHttpMethod = new LinkedHashMap<>(HttpMethod.values().length); 1269 1270 for (HttpMethod httpMethod : HttpMethod.values()) { 1271 // Make a quick copy of the request to see if other paths match 1272 Request otherRequest = Request.withPath(httpMethod, request.getPath()).build(); 1273 ResourceMethod resourceMethod = resourceMethodResolver.resourceMethodForRequest(otherRequest, serverType).orElse(null); 1274 1275 if (resourceMethod != null) 1276 matchingResourceMethodsByHttpMethod.put(httpMethod, resourceMethod); 1277 } 1278 1279 return matchingResourceMethodsByHttpMethod; 1280 } 1281 1282 @NonNull 1283 private static Set<@NonNull HttpMethod> allowedHttpMethodsForResponse(@NonNull Map<@NonNull HttpMethod, @NonNull ResourceMethod> matchingResourceMethodsByHttpMethod, 1284 @NonNull Boolean includeOptions) { 1285 requireNonNull(matchingResourceMethodsByHttpMethod); 1286 requireNonNull(includeOptions); 1287 1288 Set<HttpMethod> allowedHttpMethods = EnumSet.noneOf(HttpMethod.class); 1289 allowedHttpMethods.addAll(matchingResourceMethodsByHttpMethod.keySet()); 1290 1291 if (includeOptions) 1292 allowedHttpMethods.add(HttpMethod.OPTIONS); 1293 1294 if (matchingResourceMethodsByHttpMethod.containsKey(HttpMethod.GET) || matchingResourceMethodsByHttpMethod.containsKey(HttpMethod.HEAD)) 1295 allowedHttpMethods.add(HttpMethod.HEAD); 1296 1297 return allowedHttpMethods; 1298 } 1299 1300 @NonNull 1301 protected MarshaledResponse provideFailsafeMarshaledResponse(@NonNull Request request, 1302 @NonNull Throwable throwable) { 1303 requireNonNull(request); 1304 requireNonNull(throwable); 1305 1306 Integer statusCode = 500; 1307 Charset charset = StandardCharsets.UTF_8; 1308 1309 return MarshaledResponse.withStatusCode(statusCode) 1310 .headers(Map.of("Content-Type", Set.of(format("text/plain; charset=%s", charset.name())))) 1311 .body(format("HTTP %d: %s", statusCode, StatusCode.fromStatusCode(statusCode).get().getReasonPhrase()).getBytes(charset)) 1312 .build(); 1313 } 1314 1315 /** 1316 * Synonym for {@link #stop()}. 1317 */ 1318 @Override 1319 public void close() { 1320 stop(); 1321 } 1322 1323 /** 1324 * Is either the managed {@link Server} or {@link ServerSentEventServer} started? 1325 * 1326 * @return {@code true} if at least one is started, {@code false} otherwise 1327 */ 1328 @NonNull 1329 public Boolean isStarted() { 1330 getLock().lock(); 1331 1332 try { 1333 if (getSokletConfig().getServer().isStarted()) 1334 return true; 1335 1336 ServerSentEventServer serverSentEventServer = getSokletConfig().getServerSentEventServer().orElse(null); 1337 return serverSentEventServer != null && serverSentEventServer.isStarted(); 1338 } finally { 1339 getLock().unlock(); 1340 } 1341 } 1342 1343 /** 1344 * Runs Soklet with special non-network "simulator" implementations of {@link Server} and {@link ServerSentEventServer} - useful for integration testing. 1345 * <p> 1346 * See <a href="https://www.soklet.com/docs/testing">https://www.soklet.com/docs/testing</a> for how to write these tests. 1347 * 1348 * @param sokletConfig configuration that drives the Soklet system 1349 * @param simulatorConsumer code to execute within the context of the simulator 1350 */ 1351 public static void runSimulator(@NonNull SokletConfig sokletConfig, 1352 @NonNull Consumer<Simulator> simulatorConsumer) { 1353 requireNonNull(sokletConfig); 1354 requireNonNull(simulatorConsumer); 1355 1356 // Create Soklet instance - this initializes the REAL implementations through proxies 1357 Soklet soklet = Soklet.fromConfig(sokletConfig); 1358 1359 // Extract proxies (they're guaranteed to be proxies now) 1360 ServerProxy serverProxy = (ServerProxy) sokletConfig.getServer(); 1361 ServerSentEventServerProxy serverSentEventServerProxy = sokletConfig.getServerSentEventServer() 1362 .map(s -> (ServerSentEventServerProxy) s) 1363 .orElse(null); 1364 1365 // Create mock implementations 1366 MockServer mockServer = new MockServer(); 1367 MockServerSentEventServer mockServerSentEventServer = new MockServerSentEventServer(); 1368 1369 // Switch proxies to simulator mode 1370 serverProxy.enableSimulatorMode(mockServer); 1371 1372 if (serverSentEventServerProxy != null) 1373 serverSentEventServerProxy.enableSimulatorMode(mockServerSentEventServer); 1374 1375 try { 1376 // Initialize mocks with request handlers that delegate to Soklet's processing 1377 mockServer.initialize(sokletConfig, (request, marshaledResponseConsumer) -> { 1378 // Delegate to Soklet's internal request handling 1379 soklet.handleRequest(request, ServerType.STANDARD_HTTP, marshaledResponseConsumer); 1380 }); 1381 1382 if (mockServerSentEventServer != null) 1383 mockServerSentEventServer.initialize(sokletConfig, (request, marshaledResponseConsumer) -> { 1384 // Delegate to Soklet's internal request handling for SSE 1385 soklet.handleRequest(request, ServerType.SERVER_SENT_EVENT, marshaledResponseConsumer); 1386 }); 1387 1388 // Create and provide simulator 1389 Simulator simulator = new DefaultSimulator(mockServer, mockServerSentEventServer); 1390 simulatorConsumer.accept(simulator); 1391 } finally { 1392 // Always restore to real implementations 1393 serverProxy.disableSimulatorMode(); 1394 1395 if (serverSentEventServerProxy != null) 1396 serverSentEventServerProxy.disableSimulatorMode(); 1397 } 1398 } 1399 1400 @NonNull 1401 protected SokletConfig getSokletConfig() { 1402 return this.sokletConfig; 1403 } 1404 1405 @NonNull 1406 protected ReentrantLock getLock() { 1407 return this.lock; 1408 } 1409 1410 @NonNull 1411 protected AtomicReference<CountDownLatch> getAwaitShutdownLatchReference() { 1412 return this.awaitShutdownLatchReference; 1413 } 1414 1415 @ThreadSafe 1416 static class DefaultSimulator implements Simulator { 1417 @Nullable 1418 private MockServer server; 1419 @Nullable 1420 private MockServerSentEventServer serverSentEventServer; 1421 1422 public DefaultSimulator(@NonNull MockServer server, 1423 @Nullable MockServerSentEventServer serverSentEventServer) { 1424 requireNonNull(server); 1425 1426 this.server = server; 1427 this.serverSentEventServer = serverSentEventServer; 1428 } 1429 1430 @NonNull 1431 @Override 1432 public RequestResult performRequest(@NonNull Request request) { 1433 AtomicReference<RequestResult> requestResultHolder = new AtomicReference<>(); 1434 Server.RequestHandler requestHandler = getServer().getRequestHandler().orElse(null); 1435 1436 if (requestHandler == null) 1437 throw new IllegalStateException("You must register a request handler prior to simulating requests"); 1438 1439 requestHandler.handleRequest(request, (requestResult -> { 1440 requestResultHolder.set(requestResult); 1441 })); 1442 1443 return requestResultHolder.get(); 1444 } 1445 1446 @NonNull 1447 @Override 1448 public ServerSentEventRequestResult performServerSentEventRequest(@NonNull Request request) { 1449 MockServerSentEventServer serverSentEventServer = getServerSentEventServer().orElse(null); 1450 1451 if (serverSentEventServer == null) 1452 throw new IllegalStateException(format("You must specify a %s in your %s to simulate Server-Sent Event requests", 1453 ServerSentEventServer.class.getSimpleName(), SokletConfig.class.getSimpleName())); 1454 1455 AtomicReference<RequestResult> requestResultHolder = new AtomicReference<>(); 1456 ServerSentEventServer.RequestHandler requestHandler = serverSentEventServer.getRequestHandler().orElse(null); 1457 1458 if (requestHandler == null) 1459 throw new IllegalStateException("You must register a request handler prior to simulating SSE Event Source requests"); 1460 1461 requestHandler.handleRequest(request, (requestResult -> { 1462 requestResultHolder.set(requestResult); 1463 })); 1464 1465 RequestResult requestResult = requestResultHolder.get(); 1466 HandshakeResult handshakeResult = requestResult.getHandshakeResult().orElse(null); 1467 1468 if (handshakeResult == null) 1469 return new ServerSentEventRequestResult.RequestFailed(requestResult); 1470 1471 if (handshakeResult instanceof HandshakeResult.Accepted acceptedHandshake) { 1472 Consumer<ServerSentEventUnicaster> clientInitializer = acceptedHandshake.getClientInitializer().orElse(null); 1473 1474 // Create a synthetic logical response using values from the accepted handshake 1475 if (requestResult.getResponse().isEmpty()) 1476 requestResult = requestResult.copy() 1477 .response(Response.withStatusCode(200) 1478 .headers(acceptedHandshake.getHeaders()) 1479 .cookies(acceptedHandshake.getCookies()) 1480 .build()) 1481 .finish(); 1482 1483 HandshakeAccepted handshakeAccepted = new HandshakeAccepted(acceptedHandshake, request.getResourcePath(), requestResult, this, clientInitializer); 1484 return handshakeAccepted; 1485 } 1486 1487 if (handshakeResult instanceof HandshakeResult.Rejected rejectedHandshake) 1488 return new HandshakeRejected(rejectedHandshake, requestResult); 1489 1490 throw new IllegalStateException(format("Encountered unexpected %s: %s", HandshakeResult.class.getSimpleName(), handshakeResult)); 1491 } 1492 1493 @NonNull 1494 @Override 1495 public Simulator onBroadcastError(@Nullable Consumer<Throwable> onBroadcastError) { 1496 MockServerSentEventServer serverSentEventServer = getServerSentEventServer().orElse(null); 1497 1498 if (serverSentEventServer != null) 1499 serverSentEventServer.onBroadcastError(onBroadcastError); 1500 1501 return this; 1502 } 1503 1504 @NonNull 1505 @Override 1506 public Simulator onUnicastError(@Nullable Consumer<Throwable> onUnicastError) { 1507 MockServerSentEventServer serverSentEventServer = getServerSentEventServer().orElse(null); 1508 1509 if (serverSentEventServer != null) 1510 serverSentEventServer.onUnicastError(onUnicastError); 1511 1512 return this; 1513 } 1514 1515 @NonNull 1516 MockServer getServer() { 1517 return this.server; 1518 } 1519 1520 @NonNull 1521 Optional<MockServerSentEventServer> getServerSentEventServer() { 1522 return Optional.ofNullable(this.serverSentEventServer); 1523 } 1524 } 1525 1526 /** 1527 * Mock server that doesn't touch the network at all, useful for testing. 1528 * 1529 * @author <a href="https://www.revetkn.com">Mark Allen</a> 1530 */ 1531 @ThreadSafe 1532 static class MockServer implements Server { 1533 @Nullable 1534 private SokletConfig sokletConfig; 1535 private Server.@Nullable RequestHandler requestHandler; 1536 1537 @Override 1538 public void start() { 1539 // No-op 1540 } 1541 1542 @Override 1543 public void stop() { 1544 // No-op 1545 } 1546 1547 @NonNull 1548 @Override 1549 public Boolean isStarted() { 1550 return true; 1551 } 1552 1553 @Override 1554 public void initialize(@NonNull SokletConfig sokletConfig, 1555 @NonNull RequestHandler requestHandler) { 1556 requireNonNull(sokletConfig); 1557 requireNonNull(requestHandler); 1558 1559 this.requestHandler = requestHandler; 1560 } 1561 1562 @NonNull 1563 protected Optional<SokletConfig> getSokletConfig() { 1564 return Optional.ofNullable(this.sokletConfig); 1565 } 1566 1567 @NonNull 1568 protected Optional<RequestHandler> getRequestHandler() { 1569 return Optional.ofNullable(this.requestHandler); 1570 } 1571 } 1572 1573 /** 1574 * Mock Server-Sent Event unicaster that doesn't touch the network at all, useful for testing. 1575 */ 1576 @ThreadSafe 1577 static class MockServerSentEventUnicaster implements ServerSentEventUnicaster { 1578 @NonNull 1579 private final ResourcePath resourcePath; 1580 @NonNull 1581 private final Consumer<ServerSentEvent> eventConsumer; 1582 @NonNull 1583 private final Consumer<ServerSentEventComment> commentConsumer; 1584 @NonNull 1585 private final AtomicReference<Consumer<Throwable>> unicastErrorHandler; 1586 1587 public MockServerSentEventUnicaster(@NonNull ResourcePath resourcePath, 1588 @NonNull Consumer<ServerSentEvent> eventConsumer, 1589 @NonNull Consumer<ServerSentEventComment> commentConsumer, 1590 @NonNull AtomicReference<Consumer<Throwable>> unicastErrorHandler) { 1591 requireNonNull(resourcePath); 1592 requireNonNull(eventConsumer); 1593 requireNonNull(commentConsumer); 1594 requireNonNull(unicastErrorHandler); 1595 1596 this.resourcePath = resourcePath; 1597 this.eventConsumer = eventConsumer; 1598 this.commentConsumer = commentConsumer; 1599 this.unicastErrorHandler = unicastErrorHandler; 1600 } 1601 1602 @Override 1603 public void unicastEvent(@NonNull ServerSentEvent serverSentEvent) { 1604 requireNonNull(serverSentEvent); 1605 try { 1606 getEventConsumer().accept(serverSentEvent); 1607 } catch (Throwable throwable) { 1608 handleUnicastError(throwable); 1609 } 1610 } 1611 1612 @Override 1613 public void unicastComment(@NonNull ServerSentEventComment serverSentEventComment) { 1614 requireNonNull(serverSentEventComment); 1615 try { 1616 getCommentConsumer().accept(serverSentEventComment); 1617 } catch (Throwable throwable) { 1618 handleUnicastError(throwable); 1619 } 1620 } 1621 1622 @NonNull 1623 @Override 1624 public ResourcePath getResourcePath() { 1625 return this.resourcePath; 1626 } 1627 1628 @NonNull 1629 protected Consumer<ServerSentEvent> getEventConsumer() { 1630 return this.eventConsumer; 1631 } 1632 1633 @NonNull 1634 protected Consumer<ServerSentEventComment> getCommentConsumer() { 1635 return this.commentConsumer; 1636 } 1637 1638 protected void handleUnicastError(@NonNull Throwable throwable) { 1639 requireNonNull(throwable); 1640 Consumer<Throwable> handler = this.unicastErrorHandler.get(); 1641 1642 if (handler != null) { 1643 try { 1644 handler.accept(throwable); 1645 return; 1646 } catch (Throwable ignored) { 1647 // Fall through to default behavior 1648 } 1649 } 1650 1651 throwable.printStackTrace(); 1652 } 1653 } 1654 1655 /** 1656 * Mock Server-Sent Event broadcaster that doesn't touch the network at all, useful for testing. 1657 */ 1658 @ThreadSafe 1659 static class MockServerSentEventBroadcaster implements ServerSentEventBroadcaster { 1660 // ConcurrentHashMap doesn't allow null values, so we use a sentinel if context is null 1661 private static final Object NULL_CONTEXT_SENTINEL; 1662 1663 static { 1664 NULL_CONTEXT_SENTINEL = new Object(); 1665 } 1666 1667 @NonNull 1668 private final ResourcePath resourcePath; 1669 // Maps the Consumer (Listener) to its Context object (e.g. Locale) 1670 @NonNull 1671 private final Map<@NonNull Consumer<ServerSentEvent>, @NonNull Object> eventConsumers; 1672 // Same goes for comments 1673 @NonNull 1674 private final Map<@NonNull Consumer<ServerSentEventComment>, @NonNull Object> commentConsumers; 1675 @NonNull 1676 private final AtomicReference<Consumer<Throwable>> broadcastErrorHandler; 1677 1678 public MockServerSentEventBroadcaster(@NonNull ResourcePath resourcePath, 1679 @NonNull AtomicReference<Consumer<Throwable>> broadcastErrorHandler) { 1680 requireNonNull(resourcePath); 1681 requireNonNull(broadcastErrorHandler); 1682 1683 this.resourcePath = resourcePath; 1684 this.eventConsumers = new ConcurrentHashMap<>(); 1685 this.commentConsumers = new ConcurrentHashMap<>(); 1686 this.broadcastErrorHandler = broadcastErrorHandler; 1687 } 1688 1689 @NonNull 1690 @Override 1691 public ResourcePath getResourcePath() { 1692 return this.resourcePath; 1693 } 1694 1695 @NonNull 1696 @Override 1697 public Long getClientCount() { 1698 return Long.valueOf(getEventConsumers().size() + getCommentConsumers().size()); 1699 } 1700 1701 @Override 1702 public void broadcastEvent(@NonNull ServerSentEvent serverSentEvent) { 1703 requireNonNull(serverSentEvent); 1704 1705 for (Consumer<ServerSentEvent> eventConsumer : getEventConsumers().keySet()) { 1706 try { 1707 eventConsumer.accept(serverSentEvent); 1708 } catch (Throwable throwable) { 1709 handleBroadcastError(throwable); 1710 } 1711 } 1712 } 1713 1714 @Override 1715 public void broadcastComment(@NonNull ServerSentEventComment serverSentEventComment) { 1716 requireNonNull(serverSentEventComment); 1717 1718 for (Consumer<ServerSentEventComment> commentConsumer : getCommentConsumers().keySet()) { 1719 try { 1720 commentConsumer.accept(serverSentEventComment); 1721 } catch (Throwable throwable) { 1722 handleBroadcastError(throwable); 1723 } 1724 } 1725 } 1726 1727 @Override 1728 public <T> void broadcastEvent( 1729 @NonNull Function<Object, T> keySelector, 1730 @NonNull Function<T, ServerSentEvent> eventProvider 1731 ) { 1732 requireNonNull(keySelector); 1733 requireNonNull(eventProvider); 1734 1735 // 1. Create a temporary cache for this specific broadcast operation. 1736 // This ensures we only run the expensive 'eventProvider' once per unique key. 1737 Map<T, ServerSentEvent> payloadCache = new HashMap<>(); 1738 1739 this.getEventConsumers().forEach((consumer, context) -> { 1740 try { 1741 // 2. Derive the key from the subscriber's context 1742 T key = keySelector.apply(context); 1743 1744 // 3. Memoize: Generate the payload if we haven't seen this key yet, otherwise reuse it 1745 ServerSentEvent event = payloadCache.computeIfAbsent(key, eventProvider); 1746 1747 // 4. Dispatch 1748 consumer.accept(event); 1749 } catch (Throwable throwable) { 1750 handleBroadcastError(throwable); 1751 } 1752 }); 1753 } 1754 1755 @Override 1756 public <T> void broadcastComment( 1757 @NonNull Function<Object, T> keySelector, 1758 @NonNull Function<T, ServerSentEventComment> commentProvider 1759 ) { 1760 requireNonNull(keySelector); 1761 requireNonNull(commentProvider); 1762 1763 // 1. Create temporary cache 1764 Map<T, ServerSentEventComment> commentCache = new HashMap<>(); 1765 1766 this.getCommentConsumers().forEach((consumer, context) -> { 1767 try { 1768 // 2. Derive key 1769 T key = keySelector.apply(context); 1770 1771 // 3. Memoize 1772 ServerSentEventComment comment = commentCache.computeIfAbsent(key, commentProvider); 1773 1774 // 4. Dispatch 1775 consumer.accept(comment); 1776 } catch (Throwable throwable) { 1777 handleBroadcastError(throwable); 1778 } 1779 }); 1780 } 1781 1782 @NonNull 1783 public Boolean registerEventConsumer(@NonNull Consumer<ServerSentEvent> eventConsumer) { 1784 return registerEventConsumer(eventConsumer, null); 1785 } 1786 1787 /** 1788 * Registers a consumer with an associated context, simulating a client with specific traits. 1789 */ 1790 @NonNull 1791 public Boolean registerEventConsumer(@NonNull Consumer<ServerSentEvent> eventConsumer, @Nullable Object context) { 1792 requireNonNull(eventConsumer); 1793 // map.put returns null if the key was new, which conceptually matches "add" returning true 1794 return this.getEventConsumers().put(eventConsumer, context == null ? NULL_CONTEXT_SENTINEL : context) == null; 1795 } 1796 1797 @NonNull 1798 public Boolean unregisterEventConsumer(@NonNull Consumer<ServerSentEvent> eventConsumer) { 1799 requireNonNull(eventConsumer); 1800 return this.getEventConsumers().remove(eventConsumer) != null; 1801 } 1802 1803 @NonNull 1804 public Boolean registerCommentConsumer(@NonNull Consumer<ServerSentEventComment> commentConsumer) { 1805 return registerCommentConsumer(commentConsumer, null); 1806 } 1807 1808 /** 1809 * Registers a consumer with an associated context, simulating a client with specific traits. 1810 */ 1811 @NonNull 1812 public Boolean registerCommentConsumer(@NonNull Consumer<ServerSentEventComment> commentConsumer, @Nullable Object context) { 1813 requireNonNull(commentConsumer); 1814 return this.getCommentConsumers().put(commentConsumer, context == null ? NULL_CONTEXT_SENTINEL : context) == null; 1815 } 1816 1817 @NonNull 1818 public Boolean unregisterCommentConsumer(@NonNull Consumer<ServerSentEventComment> commentConsumer) { 1819 requireNonNull(commentConsumer); 1820 return this.getCommentConsumers().remove(commentConsumer) != null; 1821 } 1822 1823 @NonNull 1824 protected Map<@NonNull Consumer<ServerSentEvent>, @NonNull Object> getEventConsumers() { 1825 return this.eventConsumers; 1826 } 1827 1828 @NonNull 1829 protected Map<@NonNull Consumer<ServerSentEventComment>, @NonNull Object> getCommentConsumers() { 1830 return this.commentConsumers; 1831 } 1832 1833 protected void handleBroadcastError(@NonNull Throwable throwable) { 1834 requireNonNull(throwable); 1835 Consumer<Throwable> handler = this.broadcastErrorHandler.get(); 1836 1837 if (handler != null) { 1838 try { 1839 handler.accept(throwable); 1840 return; 1841 } catch (Throwable ignored) { 1842 // Fall through to default behavior 1843 } 1844 } 1845 1846 throwable.printStackTrace(); 1847 } 1848 } 1849 1850 /** 1851 * Mock Server-Sent Event server that doesn't touch the network at all, useful for testing. 1852 * 1853 * @author <a href="https://www.revetkn.com">Mark Allen</a> 1854 */ 1855 @ThreadSafe 1856 static class MockServerSentEventServer implements ServerSentEventServer { 1857 @Nullable 1858 private SokletConfig sokletConfig; 1859 private ServerSentEventServer.@Nullable RequestHandler requestHandler; 1860 @NonNull 1861 private final ConcurrentHashMap<@NonNull ResourcePath, @NonNull MockServerSentEventBroadcaster> broadcastersByResourcePath; 1862 @NonNull 1863 private final AtomicReference<Consumer<Throwable>> broadcastErrorHandler; 1864 @NonNull 1865 private final AtomicReference<Consumer<Throwable>> unicastErrorHandler; 1866 1867 public MockServerSentEventServer() { 1868 this.broadcastersByResourcePath = new ConcurrentHashMap<>(); 1869 this.broadcastErrorHandler = new AtomicReference<>(); 1870 this.unicastErrorHandler = new AtomicReference<>(); 1871 } 1872 1873 @Override 1874 public void start() { 1875 // No-op 1876 } 1877 1878 @Override 1879 public void stop() { 1880 // No-op 1881 } 1882 1883 @NonNull 1884 @Override 1885 public Boolean isStarted() { 1886 return true; 1887 } 1888 1889 @NonNull 1890 @Override 1891 public Optional<? extends ServerSentEventBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath) { 1892 if (resourcePath == null) 1893 return Optional.empty(); 1894 1895 MockServerSentEventBroadcaster broadcaster = getBroadcastersByResourcePath() 1896 .computeIfAbsent(resourcePath, rp -> new MockServerSentEventBroadcaster(rp, broadcastErrorHandler)); 1897 1898 return Optional.of(broadcaster); 1899 } 1900 1901 public void registerEventConsumer(@NonNull ResourcePath resourcePath, 1902 @NonNull Consumer<ServerSentEvent> eventConsumer) { 1903 registerEventConsumer(resourcePath, eventConsumer, null); 1904 } 1905 1906 public void registerEventConsumer(@NonNull ResourcePath resourcePath, 1907 @NonNull Consumer<ServerSentEvent> eventConsumer, 1908 @Nullable Object context) { 1909 requireNonNull(resourcePath); 1910 requireNonNull(eventConsumer); 1911 1912 MockServerSentEventBroadcaster broadcaster = getBroadcastersByResourcePath() 1913 .computeIfAbsent(resourcePath, rp -> new MockServerSentEventBroadcaster(rp, broadcastErrorHandler)); 1914 1915 broadcaster.registerEventConsumer(eventConsumer, context); 1916 } 1917 1918 @NonNull 1919 public Boolean unregisterEventConsumer(@NonNull ResourcePath resourcePath, 1920 @NonNull Consumer<ServerSentEvent> eventConsumer) { 1921 requireNonNull(resourcePath); 1922 requireNonNull(eventConsumer); 1923 1924 MockServerSentEventBroadcaster broadcaster = getBroadcastersByResourcePath().get(resourcePath); 1925 1926 if (broadcaster == null) 1927 return false; 1928 1929 return broadcaster.unregisterEventConsumer(eventConsumer); 1930 } 1931 1932 public void registerCommentConsumer(@NonNull ResourcePath resourcePath, 1933 @NonNull Consumer<ServerSentEventComment> commentConsumer) { 1934 registerCommentConsumer(resourcePath, commentConsumer, null); 1935 } 1936 1937 public void registerCommentConsumer(@NonNull ResourcePath resourcePath, 1938 @NonNull Consumer<ServerSentEventComment> commentConsumer, 1939 @Nullable Object context) { 1940 requireNonNull(resourcePath); 1941 requireNonNull(commentConsumer); 1942 1943 MockServerSentEventBroadcaster broadcaster = getBroadcastersByResourcePath() 1944 .computeIfAbsent(resourcePath, rp -> new MockServerSentEventBroadcaster(rp, broadcastErrorHandler)); 1945 1946 broadcaster.registerCommentConsumer(commentConsumer, context); 1947 } 1948 1949 @NonNull 1950 public Boolean unregisterCommentConsumer(@NonNull ResourcePath resourcePath, 1951 @NonNull Consumer<ServerSentEventComment> commentConsumer) { 1952 requireNonNull(resourcePath); 1953 requireNonNull(commentConsumer); 1954 1955 MockServerSentEventBroadcaster broadcaster = getBroadcastersByResourcePath().get(resourcePath); 1956 1957 if (broadcaster == null) 1958 return false; 1959 1960 return broadcaster.unregisterCommentConsumer(commentConsumer); 1961 } 1962 1963 @Override 1964 public void initialize(@NonNull SokletConfig sokletConfig, 1965 ServerSentEventServer.@NonNull RequestHandler requestHandler) { 1966 requireNonNull(sokletConfig); 1967 requireNonNull(requestHandler); 1968 1969 this.sokletConfig = sokletConfig; 1970 this.requestHandler = requestHandler; 1971 } 1972 1973 public void onBroadcastError(@Nullable Consumer<Throwable> onBroadcastError) { 1974 this.broadcastErrorHandler.set(onBroadcastError); 1975 } 1976 1977 public void onUnicastError(@Nullable Consumer<Throwable> onUnicastError) { 1978 this.unicastErrorHandler.set(onUnicastError); 1979 } 1980 1981 @NonNull 1982 protected Optional<SokletConfig> getSokletConfig() { 1983 return Optional.ofNullable(this.sokletConfig); 1984 } 1985 1986 @NonNull 1987 protected Optional<ServerSentEventServer.RequestHandler> getRequestHandler() { 1988 return Optional.ofNullable(this.requestHandler); 1989 } 1990 1991 @NonNull 1992 protected ConcurrentHashMap<@NonNull ResourcePath, @NonNull MockServerSentEventBroadcaster> getBroadcastersByResourcePath() { 1993 return this.broadcastersByResourcePath; 1994 } 1995 1996 @NonNull 1997 protected AtomicReference<Consumer<Throwable>> getUnicastErrorHandler() { 1998 return this.unicastErrorHandler; 1999 } 2000 } 2001 2002 /** 2003 * Efficiently provides the current time in RFC 1123 HTTP-date format. 2004 * Updates once per second to avoid the overhead of formatting dates on every request. 2005 */ 2006 @ThreadSafe 2007 private static final class CachedHttpDate { 2008 @NonNull 2009 private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss z", Locale.ENGLISH) 2010 .withZone(ZoneId.of("GMT")); 2011 @NonNull 2012 private static volatile String CURRENT_VALUE = FORMATTER.format(Instant.now()); 2013 2014 static { 2015 Thread t = new Thread(() -> { 2016 while (true) { 2017 try { 2018 Thread.sleep(1000); 2019 CURRENT_VALUE = FORMATTER.format(Instant.now()); 2020 } catch (InterruptedException e) { 2021 break; // Allow thread to die on JVM shutdown 2022 } 2023 } 2024 }, "soklet-date-header-value-updater"); 2025 t.setDaemon(true); 2026 t.start(); 2027 } 2028 2029 @NonNull 2030 static String getCurrentValue() { 2031 return CURRENT_VALUE; 2032 } 2033 } 2034}