001/* 002 * Copyright 2022-2025 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet; 018 019import com.soklet.annotation.Resource; 020 021import javax.annotation.Nonnull; 022import javax.annotation.Nullable; 023import javax.annotation.concurrent.ThreadSafe; 024import java.io.BufferedReader; 025import java.io.IOException; 026import java.io.InputStreamReader; 027import java.lang.reflect.InvocationTargetException; 028import java.nio.charset.Charset; 029import java.nio.charset.StandardCharsets; 030import java.time.Duration; 031import java.time.Instant; 032import java.util.ArrayList; 033import java.util.Collections; 034import java.util.EnumSet; 035import java.util.LinkedHashMap; 036import java.util.List; 037import java.util.Locale; 038import java.util.Map; 039import java.util.Map.Entry; 040import java.util.Objects; 041import java.util.Optional; 042import java.util.Set; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.CopyOnWriteArraySet; 045import java.util.concurrent.CountDownLatch; 046import java.util.concurrent.atomic.AtomicBoolean; 047import java.util.concurrent.atomic.AtomicReference; 048import java.util.concurrent.locks.ReentrantLock; 049import java.util.function.Consumer; 050import java.util.stream.Collectors; 051 052import static com.soklet.Utilities.emptyByteArray; 053import static java.lang.String.format; 054import static java.util.Objects.requireNonNull; 055 056/** 057 * Soklet's main class - manages a {@link Server} (and optionally a {@link ServerSentEventServer}) using the provided system configuration. 058 * <p> 059 * <pre>{@code // Use out-of-the-box defaults 060 * SokletConfig config = SokletConfig.withServer( 061 * Server.withPort(8080).build() 062 * ).build(); 063 * 064 * try (Soklet soklet = Soklet.withConfig(config)) { 065 * soklet.start(); 066 * System.out.println("Soklet started, press [enter] to exit"); 067 * soklet.awaitShutdown(ShutdownTrigger.ENTER_KEY); 068 * }}</pre> 069 * 070 * @author <a href="https://www.revetkn.com">Mark Allen</a> 071 */ 072@ThreadSafe 073public final class Soklet implements AutoCloseable { 074 @Nonnull 075 private final SokletConfig sokletConfig; 076 @Nonnull 077 private final ReentrantLock lock; 078 @Nonnull 079 private final AtomicReference<CountDownLatch> awaitShutdownLatchReference; 080 081 /** 082 * Acquires a Soklet instance with the given configuration. 083 * 084 * @param sokletConfig configuration that drives the Soklet system 085 * @return a Soklet instance 086 */ 087 @Nonnull 088 public static Soklet withConfig(@Nonnull SokletConfig sokletConfig) { 089 requireNonNull(sokletConfig); 090 return new Soklet(sokletConfig); 091 } 092 093 /** 094 * Creates a Soklet instance with the given configuration. 095 * 096 * @param sokletConfig configuration that drives the Soklet system 097 */ 098 private Soklet(@Nonnull SokletConfig sokletConfig) { 099 requireNonNull(sokletConfig); 100 101 this.sokletConfig = sokletConfig; 102 this.lock = new ReentrantLock(); 103 this.awaitShutdownLatchReference = new AtomicReference<>(new CountDownLatch(1)); 104 105 // Fail fast in the event that Soklet appears misconfigured 106 if (sokletConfig.getResourceMethodResolver().getResourceMethods().size() == 0) 107 throw new IllegalArgumentException(format("No classes annotated with @%s were found.", Resource.class.getSimpleName())); 108 109 // Use a layer of indirection here so the Soklet type does not need to directly implement the `RequestHandler` interface. 110 // Reasoning: the `handleRequest` method for Soklet should not be public, which might lead to accidental invocation by users. 111 // That method should only be called by the managed `Server` instance. 112 Soklet soklet = this; 113 114 sokletConfig.getServer().initialize(getSokletConfig(), (request, marshaledResponseConsumer) -> { 115 // Delegate to Soklet's internal request handling method 116 soklet.handleRequest(request, marshaledResponseConsumer); 117 }); 118 119 ServerSentEventServer serverSentEventServer = sokletConfig.getServerSentEventServer().orElse(null); 120 121 if (serverSentEventServer != null) 122 serverSentEventServer.initialize(sokletConfig, (request, marshaledResponseConsumer) -> { 123 // Delegate to Soklet's internal request handling method 124 soklet.handleRequest(request, marshaledResponseConsumer); 125 }); 126 } 127 128 /** 129 * Starts the managed server instance[s]. 130 * <p> 131 * If the managed server[s] are already started, this is a no-op. 132 */ 133 public void start() { 134 getLock().lock(); 135 136 try { 137 if (isStarted()) 138 return; 139 140 getAwaitShutdownLatchReference().set(new CountDownLatch(1)); 141 142 SokletConfig sokletConfig = getSokletConfig(); 143 LifecycleInterceptor lifecycleInterceptor = sokletConfig.getLifecycleInterceptor(); 144 Server server = sokletConfig.getServer(); 145 146 lifecycleInterceptor.willStartServer(server); 147 server.start(); 148 lifecycleInterceptor.didStartServer(server); 149 150 ServerSentEventServer serverSentEventServer = sokletConfig.getServerSentEventServer().orElse(null); 151 152 if (serverSentEventServer != null) { 153 lifecycleInterceptor.willStartServerSentEventServer(serverSentEventServer); 154 serverSentEventServer.start(); 155 lifecycleInterceptor.didStartServerSentEventServer(serverSentEventServer); 156 } 157 } finally { 158 getLock().unlock(); 159 } 160 } 161 162 /** 163 * Stops the managed server instance[s]. 164 * <p> 165 * If the managed server[s] are already stopped, this is a no-op. 166 */ 167 public void stop() { 168 getLock().lock(); 169 170 try { 171 if (isStarted()) { 172 SokletConfig sokletConfig = getSokletConfig(); 173 LifecycleInterceptor lifecycleInterceptor = sokletConfig.getLifecycleInterceptor(); 174 Server server = sokletConfig.getServer(); 175 176 if (server.isStarted()) { 177 lifecycleInterceptor.willStopServer(server); 178 server.stop(); 179 lifecycleInterceptor.didStopServer(server); 180 } 181 182 ServerSentEventServer serverSentEventServer = sokletConfig.getServerSentEventServer().orElse(null); 183 184 if (serverSentEventServer != null && serverSentEventServer.isStarted()) { 185 lifecycleInterceptor.willStopServerSentEventServer(serverSentEventServer); 186 serverSentEventServer.stop(); 187 lifecycleInterceptor.didStopServerSentEventServer(serverSentEventServer); 188 } 189 } 190 } finally { 191 try { 192 getAwaitShutdownLatchReference().get().countDown(); 193 } finally { 194 getLock().unlock(); 195 } 196 } 197 } 198 199 /** 200 * Blocks the current thread until JVM shutdown ({@code SIGTERM/SIGINT/System.exit(...)} and so forth), <strong>or</strong> if one of the provided {@code shutdownTriggers} occurs. 201 * <p> 202 * This method will automatically invoke this instance's {@link #stop()} method once it becomes unblocked. 203 * <p> 204 * <strong>Notes regarding {@link ShutdownTrigger#ENTER_KEY}:</strong> 205 * <ul> 206 * <li>It will invoke {@link #stop()} on <i>all</i> Soklet instances, as stdin is process-wide</li> 207 * <li>It is only supported for environments with an interactive TTY and will be ignored if none exists (e.g. running in a Docker container) - Soklet will detect this and fire {@link LifecycleInterceptor#didReceiveLogEvent(LogEvent)} with an event of type {@link LogEventType#CONFIGURATION_UNSUPPORTED}</li> 208 * </ul> 209 * 210 * @param shutdownTriggers additional trigger[s] which signal that shutdown should occur, e.g. {@link ShutdownTrigger#ENTER_KEY} for "enter key pressed" 211 * @throws InterruptedException if the current thread has its interrupted status set on entry to this method, or is interrupted while waiting 212 */ 213 public void awaitShutdown(@Nullable ShutdownTrigger... shutdownTriggers) throws InterruptedException { 214 Thread shutdownHook = null; 215 boolean registeredEnterKeyShutdownTrigger = false; 216 Set<ShutdownTrigger> shutdownTriggersAsSet = shutdownTriggers == null ? Set.of() : EnumSet.copyOf(Set.of(shutdownTriggers)); 217 218 try { 219 // Optionally listen for enter key 220 if (shutdownTriggersAsSet.contains(ShutdownTrigger.ENTER_KEY)) { 221 registeredEnterKeyShutdownTrigger = KeypressManager.register(this); // returns false if stdin unusable/disabled 222 223 if (!registeredEnterKeyShutdownTrigger) { 224 LogEvent logEvent = LogEvent.with( 225 LogEventType.CONFIGURATION_UNSUPPORTED, 226 format("Ignoring request for %s.%s - it is unsupported in this environment (no interactive TTY detected)", ShutdownTrigger.class.getSimpleName(), ShutdownTrigger.ENTER_KEY.name()) 227 ).build(); 228 229 getSokletConfig().getLifecycleInterceptor().didReceiveLogEvent(logEvent); 230 } 231 } 232 233 // Always register a shutdown hook 234 shutdownHook = new Thread(() -> { 235 try { 236 stop(); 237 } catch (Throwable ignored) { 238 // Nothing to do 239 } 240 }, "soklet-shutdown-hook"); 241 242 Runtime.getRuntime().addShutdownHook(shutdownHook); 243 244 // Wait until "close" finishes 245 getAwaitShutdownLatchReference().get().await(); 246 } finally { 247 if (registeredEnterKeyShutdownTrigger) 248 KeypressManager.unregister(this); 249 250 try { 251 Runtime.getRuntime().removeShutdownHook(shutdownHook); 252 } catch (IllegalStateException ignored) { 253 // JVM shutting down 254 } 255 } 256 } 257 258 /** 259 * Handles "awaitShutdown" for {@link ShutdownTrigger#ENTER_KEY} by listening to stdin - all Soklet instances are terminated on keypress. 260 */ 261 @ThreadSafe 262 private static final class KeypressManager { 263 @Nonnull 264 private static final Set<Soklet> SOKLET_REGISTRY; 265 @Nonnull 266 private static final AtomicBoolean LISTENER_STARTED; 267 268 static { 269 SOKLET_REGISTRY = new CopyOnWriteArraySet<>(); 270 LISTENER_STARTED = new AtomicBoolean(false); 271 } 272 273 /** 274 * Register a Soklet for Enter-to-stop support. Returns true iff a listener is (or was already) active. 275 * If System.in is not usable (or disabled), returns false and does nothing. 276 */ 277 @Nonnull 278 static Boolean register(@Nonnull Soklet soklet) { 279 requireNonNull(soklet); 280 281 // If stdin is not readable (e.g., container with no TTY), don't start a listener. 282 if (!canReadFromStdin()) 283 return false; 284 285 SOKLET_REGISTRY.add(soklet); 286 287 // Start a single process-wide listener once. 288 if (LISTENER_STARTED.compareAndSet(false, true)) { 289 Thread thread = new Thread(KeypressManager::runLoop, "soklet-keypress-shutdown-listener"); 290 thread.setDaemon(true); // never block JVM exit 291 thread.start(); 292 } 293 294 return true; 295 } 296 297 static void unregister(@Nonnull Soklet soklet) { 298 SOKLET_REGISTRY.remove(soklet); 299 // We intentionally keep the listener alive; it's daemon and cheap. 300 // If stdin hits EOF, the listener exits on its own. 301 } 302 303 /** 304 * Heuristic: if System.in is present and calling available() doesn't throw, 305 * treat it as readable. Works even in IDEs where System.console() is null. 306 */ 307 @Nonnull 308 private static Boolean canReadFromStdin() { 309 if (System.in == null) 310 return false; 311 312 try { 313 // available() >= 0 means stream is open; 0 means no buffered data (that’s fine). 314 return System.in.available() >= 0; 315 } catch (IOException e) { 316 return false; 317 } 318 } 319 320 /** 321 * Single blocking read on stdin. On any line (or EOF), stop all registered servers. 322 */ 323 private static void runLoop() { 324 try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8))) { 325 // Blocks until newline or EOF; EOF (null) happens with /dev/null or closed pipe. 326 bufferedReader.readLine(); 327 328 // Either a line or EOF → stop everything that’s currently registered. 329 for (Soklet soklet : SOKLET_REGISTRY) { 330 try { 331 soklet.stop(); 332 } catch (Throwable ignored) { 333 // Nothing to do 334 } 335 } 336 } catch (Throwable ignored) { 337 // If stdin is closed mid-run, just exit quietly. 338 } 339 } 340 341 private KeypressManager() {} 342 } 343 344 /** 345 * Nonpublic "informal" implementation of {@link com.soklet.Server.RequestHandler} so Soklet does not need to expose {@code handleRequest} publicly. 346 * Reasoning: users of this library should never call {@code handleRequest} directly - it should only be invoked in response to events 347 * provided by a {@link Server} or {@link ServerSentEventServer} implementation. 348 */ 349 protected void handleRequest(@Nonnull Request request, 350 @Nonnull Consumer<RequestResult> requestResultConsumer) { 351 requireNonNull(request); 352 requireNonNull(requestResultConsumer); 353 354 Instant processingStarted = Instant.now(); 355 356 SokletConfig sokletConfig = getSokletConfig(); 357 ResourceMethodResolver resourceMethodResolver = sokletConfig.getResourceMethodResolver(); 358 ResponseMarshaler responseMarshaler = sokletConfig.getResponseMarshaler(); 359 LifecycleInterceptor lifecycleInterceptor = sokletConfig.getLifecycleInterceptor(); 360 361 // Holders to permit mutable effectively-final variables 362 AtomicReference<MarshaledResponse> marshaledResponseHolder = new AtomicReference<>(); 363 AtomicReference<Throwable> resourceMethodResolutionExceptionHolder = new AtomicReference<>(); 364 AtomicReference<Request> requestHolder = new AtomicReference<>(request); 365 AtomicReference<ResourceMethod> resourceMethodHolder = new AtomicReference<>(); 366 AtomicReference<RequestResult> requestResultHolder = new AtomicReference<>(); 367 368 // Holders to permit mutable effectively-final state tracking 369 AtomicBoolean willStartResponseWritingCompleted = new AtomicBoolean(false); 370 AtomicBoolean didFinishResponseWritingCompleted = new AtomicBoolean(false); 371 AtomicBoolean didFinishRequestHandlingCompleted = new AtomicBoolean(false); 372 373 List<Throwable> throwables = new ArrayList<>(10); 374 375 Consumer<LogEvent> safelyLog = (logEvent -> { 376 try { 377 lifecycleInterceptor.didReceiveLogEvent(logEvent); 378 } catch (Throwable throwable) { 379 throwable.printStackTrace(); 380 throwables.add(throwable); 381 } 382 }); 383 384 requestHolder.set(request); 385 386 try { 387 // Do we have an exact match for this resource method? 388 resourceMethodHolder.set(resourceMethodResolver.resourceMethodForRequest(requestHolder.get()).orElse(null)); 389 } catch (Throwable t) { 390 safelyLog.accept(LogEvent.with(LogEventType.RESOURCE_METHOD_RESOLUTION_FAILED, "Unable to resolve Resource Method") 391 .throwable(t) 392 .request(requestHolder.get()) 393 .build()); 394 395 // If an exception occurs here, keep track of it - we will surface them after letting LifecycleInterceptor 396 // see that a request has come in. 397 throwables.add(t); 398 resourceMethodResolutionExceptionHolder.set(t); 399 } 400 401 try { 402 lifecycleInterceptor.wrapRequest(request, resourceMethodHolder.get(), (wrappedRequest) -> { 403 requestHolder.set(wrappedRequest); 404 405 try { 406 lifecycleInterceptor.didStartRequestHandling(requestHolder.get(), resourceMethodHolder.get()); 407 } catch (Throwable t) { 408 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_DID_START_REQUEST_HANDLING_FAILED, 409 format("An exception occurred while invoking %s::didStartRequestHandling", 410 LifecycleInterceptor.class.getSimpleName())) 411 .throwable(t) 412 .request(requestHolder.get()) 413 .resourceMethod(resourceMethodHolder.get()) 414 .build()); 415 416 throwables.add(t); 417 } 418 419 try { 420 lifecycleInterceptor.interceptRequest(request, resourceMethodHolder.get(), (interceptorRequest) -> { 421 requestHolder.set(interceptorRequest); 422 423 try { 424 if (resourceMethodResolutionExceptionHolder.get() != null) 425 throw resourceMethodResolutionExceptionHolder.get(); 426 427 RequestResult requestResult = toRequestResult(requestHolder.get(), resourceMethodHolder.get()); 428 requestResultHolder.set(requestResult); 429 430 MarshaledResponse originalMarshaledResponse = requestResult.getMarshaledResponse(); 431 MarshaledResponse updatedMarshaledResponse = requestResult.getMarshaledResponse(); 432 433 // A few special cases that are "global" in that they can affect all requests and 434 // need to happen after marshaling the response... 435 436 // 1. Customize response for HEAD (e.g. remove body, set Content-Length header) 437 updatedMarshaledResponse = applyHeadResponseIfApplicable(request, updatedMarshaledResponse); 438 439 // 2. Apply other standard response customizations (CORS, Content-Length) 440 updatedMarshaledResponse = applyCommonPropertiesToMarshaledResponse(request, updatedMarshaledResponse); 441 442 // Update our result holder with the modified response if necessary 443 if (originalMarshaledResponse != updatedMarshaledResponse) { 444 marshaledResponseHolder.set(updatedMarshaledResponse); 445 requestResultHolder.set(requestResult.copy() 446 .marshaledResponse(updatedMarshaledResponse) 447 .finish()); 448 } 449 450 return updatedMarshaledResponse; 451 } catch (Throwable t) { 452 if (!Objects.equals(t, resourceMethodResolutionExceptionHolder.get())) { 453 throwables.add(t); 454 455 safelyLog.accept(LogEvent.with(LogEventType.REQUEST_PROCESSING_FAILED, 456 "An exception occurred while processing request") 457 .throwable(t) 458 .request(requestHolder.get()) 459 .resourceMethod(resourceMethodHolder.get()) 460 .build()); 461 } 462 463 // Unhappy path. Try to use configuration's exception response marshaler... 464 try { 465 MarshaledResponse marshaledResponse = responseMarshaler.forThrowable(requestHolder.get(), t, resourceMethodHolder.get()); 466 marshaledResponse = applyCommonPropertiesToMarshaledResponse(request, marshaledResponse); 467 marshaledResponseHolder.set(marshaledResponse); 468 469 return marshaledResponse; 470 } catch (Throwable t2) { 471 throwables.add(t2); 472 473 safelyLog.accept(LogEvent.with(LogEventType.RESPONSE_MARSHALER_FOR_THROWABLE_FAILED, 474 format("An exception occurred while trying to write an exception response for %s", t)) 475 .throwable(t2) 476 .request(requestHolder.get()) 477 .resourceMethod(resourceMethodHolder.get()) 478 .build()); 479 480 // The configuration's exception response marshaler failed - provide a failsafe response to recover 481 return provideFailsafeMarshaledResponse(requestHolder.get(), t2); 482 } 483 } 484 }, (interceptorMarshaledResponse) -> { 485 marshaledResponseHolder.set(interceptorMarshaledResponse); 486 }); 487 } catch (Throwable t) { 488 throwables.add(t); 489 490 try { 491 // In the event that an error occurs during processing of a LifecycleInterceptor method, for example 492 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_INTERCEPT_REQUEST_FAILED, 493 format("An exception occurred while invoking %s::interceptRequest", LifecycleInterceptor.class.getSimpleName())) 494 .throwable(t) 495 .request(requestHolder.get()) 496 .resourceMethod(resourceMethodHolder.get()) 497 .build()); 498 499 MarshaledResponse marshaledResponse = responseMarshaler.forThrowable(requestHolder.get(), t, resourceMethodHolder.get()); 500 marshaledResponse = applyCommonPropertiesToMarshaledResponse(request, marshaledResponse); 501 marshaledResponseHolder.set(marshaledResponse); 502 } catch (Throwable t2) { 503 throwables.add(t2); 504 505 safelyLog.accept(LogEvent.with(LogEventType.RESPONSE_MARSHALER_FOR_THROWABLE_FAILED, 506 format("An exception occurred while invoking %s::forThrowable when trying to write an exception response for %s", ResponseMarshaler.class.getSimpleName(), t)) 507 .throwable(t2) 508 .request(requestHolder.get()) 509 .resourceMethod(resourceMethodHolder.get()) 510 .build()); 511 512 marshaledResponseHolder.set(provideFailsafeMarshaledResponse(requestHolder.get(), t2)); 513 } 514 } finally { 515 try { 516 try { 517 lifecycleInterceptor.willStartResponseWriting(requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get()); 518 } finally { 519 willStartResponseWritingCompleted.set(true); 520 } 521 522 Instant responseWriteStarted = Instant.now(); 523 524 try { 525 RequestResult requestResult = requestResultHolder.get(); 526 527 if (requestResult != null) 528 requestResultConsumer.accept(requestResult); 529 else 530 requestResultConsumer.accept(RequestResult.withMarshaledResponse(marshaledResponseHolder.get()) 531 .resourceMethod(resourceMethodHolder.get()) 532 .build()); 533 534 Instant responseWriteFinished = Instant.now(); 535 Duration responseWriteDuration = Duration.between(responseWriteStarted, responseWriteFinished); 536 537 try { 538 lifecycleInterceptor.didFinishResponseWriting(requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), responseWriteDuration, null); 539 } catch (Throwable t) { 540 throwables.add(t); 541 542 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_DID_FINISH_RESPONSE_WRITING_FAILED, 543 format("An exception occurred while invoking %s::didFinishResponseWriting", 544 LifecycleInterceptor.class.getSimpleName())) 545 .throwable(t) 546 .request(requestHolder.get()) 547 .resourceMethod(resourceMethodHolder.get()) 548 .marshaledResponse(marshaledResponseHolder.get()) 549 .build()); 550 } finally { 551 didFinishResponseWritingCompleted.set(true); 552 } 553 } catch (Throwable t) { 554 throwables.add(t); 555 556 Instant responseWriteFinished = Instant.now(); 557 Duration responseWriteDuration = Duration.between(responseWriteStarted, responseWriteFinished); 558 559 try { 560 lifecycleInterceptor.didFinishResponseWriting(requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), responseWriteDuration, t); 561 } catch (Throwable t2) { 562 throwables.add(t2); 563 564 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_DID_FINISH_RESPONSE_WRITING_FAILED, 565 format("An exception occurred while invoking %s::didFinishResponseWriting", 566 LifecycleInterceptor.class.getSimpleName())) 567 .throwable(t2) 568 .request(requestHolder.get()) 569 .resourceMethod(resourceMethodHolder.get()) 570 .marshaledResponse(marshaledResponseHolder.get()) 571 .build()); 572 } 573 } 574 } finally { 575 try { 576 Instant processingFinished = Instant.now(); 577 Duration processingDuration = Duration.between(processingStarted, processingFinished); 578 579 lifecycleInterceptor.didFinishRequestHandling(requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), processingDuration, Collections.unmodifiableList(throwables)); 580 } catch (Throwable t) { 581 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_DID_FINISH_REQUEST_HANDLING_FAILED, 582 format("An exception occurred while invoking %s::didFinishRequestHandling", 583 LifecycleInterceptor.class.getSimpleName())) 584 .throwable(t) 585 .request(requestHolder.get()) 586 .resourceMethod(resourceMethodHolder.get()) 587 .marshaledResponse(marshaledResponseHolder.get()) 588 .build()); 589 } finally { 590 didFinishRequestHandlingCompleted.set(true); 591 } 592 } 593 } 594 }); 595 } catch (Throwable t) { 596 // If an error occurred during request wrapping, it's possible a response was never written/communicated back to LifecycleInterceptor. 597 // Detect that here and inform LifecycleInterceptor accordingly. 598 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_WRAP_REQUEST_FAILED, 599 format("An exception occurred while invoking %s::wrapRequest", 600 LifecycleInterceptor.class.getSimpleName())) 601 .throwable(t) 602 .request(requestHolder.get()) 603 .resourceMethod(resourceMethodHolder.get()) 604 .marshaledResponse(marshaledResponseHolder.get()) 605 .build()); 606 607 // If we don't have a response, let the marshaler try to make one for the exception. 608 // If that fails, use the failsafe. 609 if (marshaledResponseHolder.get() == null) { 610 try { 611 MarshaledResponse marshaledResponse = responseMarshaler.forThrowable(requestHolder.get(), t, resourceMethodHolder.get()); 612 marshaledResponse = applyCommonPropertiesToMarshaledResponse(request, marshaledResponse); 613 marshaledResponseHolder.set(marshaledResponse); 614 } catch (Throwable t2) { 615 throwables.add(t2); 616 617 safelyLog.accept(LogEvent.with(LogEventType.RESPONSE_MARSHALER_FOR_THROWABLE_FAILED, 618 format("An exception occurred during request wrapping while invoking %s::forThrowable", 619 ResponseMarshaler.class.getSimpleName())) 620 .throwable(t2) 621 .request(requestHolder.get()) 622 .resourceMethod(resourceMethodHolder.get()) 623 .marshaledResponse(marshaledResponseHolder.get()) 624 .build()); 625 626 marshaledResponseHolder.set(provideFailsafeMarshaledResponse(requestHolder.get(), t)); 627 } 628 } 629 630 if (!willStartResponseWritingCompleted.get()) { 631 try { 632 lifecycleInterceptor.willStartResponseWriting(requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get()); 633 } catch (Throwable t2) { 634 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_WILL_START_RESPONSE_WRITING_FAILED, 635 format("An exception occurred while invoking %s::willStartResponseWriting", 636 LifecycleInterceptor.class.getSimpleName())) 637 .throwable(t2) 638 .request(requestHolder.get()) 639 .resourceMethod(resourceMethodHolder.get()) 640 .marshaledResponse(marshaledResponseHolder.get()) 641 .build()); 642 } 643 } 644 645 try { 646 Instant responseWriteStarted = Instant.now(); 647 648 if (!didFinishResponseWritingCompleted.get()) { 649 try { 650 RequestResult requestResult = requestResultHolder.get(); 651 652 if (requestResult != null) 653 requestResultConsumer.accept(requestResult); 654 else 655 requestResultConsumer.accept(RequestResult.withMarshaledResponse(marshaledResponseHolder.get()) 656 .resourceMethod(resourceMethodHolder.get()) 657 .build()); 658 659 Instant responseWriteFinished = Instant.now(); 660 Duration responseWriteDuration = Duration.between(responseWriteStarted, responseWriteFinished); 661 662 try { 663 lifecycleInterceptor.didFinishResponseWriting(requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), responseWriteDuration, null); 664 } catch (Throwable t2) { 665 throwables.add(t2); 666 667 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_DID_FINISH_RESPONSE_WRITING_FAILED, 668 format("An exception occurred while invoking %s::didFinishResponseWriting", 669 LifecycleInterceptor.class.getSimpleName())) 670 .throwable(t2) 671 .request(requestHolder.get()) 672 .resourceMethod(resourceMethodHolder.get()) 673 .marshaledResponse(marshaledResponseHolder.get()) 674 .build()); 675 } 676 } catch (Throwable t2) { 677 throwables.add(t2); 678 679 Instant responseWriteFinished = Instant.now(); 680 Duration responseWriteDuration = Duration.between(responseWriteStarted, responseWriteFinished); 681 682 try { 683 lifecycleInterceptor.didFinishResponseWriting(requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), responseWriteDuration, t); 684 } catch (Throwable t3) { 685 throwables.add(t3); 686 687 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_DID_FINISH_RESPONSE_WRITING_FAILED, 688 format("An exception occurred while invoking %s::didFinishResponseWriting", 689 LifecycleInterceptor.class.getSimpleName())) 690 .throwable(t3) 691 .request(requestHolder.get()) 692 .resourceMethod(resourceMethodHolder.get()) 693 .marshaledResponse(marshaledResponseHolder.get()) 694 .build()); 695 } 696 } 697 } 698 } finally { 699 if (!didFinishRequestHandlingCompleted.get()) { 700 try { 701 Instant processingFinished = Instant.now(); 702 Duration processingDuration = Duration.between(processingStarted, processingFinished); 703 704 lifecycleInterceptor.didFinishRequestHandling(requestHolder.get(), resourceMethodHolder.get(), marshaledResponseHolder.get(), processingDuration, Collections.unmodifiableList(throwables)); 705 } catch (Throwable t2) { 706 safelyLog.accept(LogEvent.with(LogEventType.LIFECYCLE_INTERCEPTOR_DID_FINISH_REQUEST_HANDLING_FAILED, 707 format("An exception occurred while invoking %s::didFinishRequestHandling", 708 LifecycleInterceptor.class.getSimpleName())) 709 .throwable(t2) 710 .request(requestHolder.get()) 711 .resourceMethod(resourceMethodHolder.get()) 712 .marshaledResponse(marshaledResponseHolder.get()) 713 .build()); 714 } 715 } 716 } 717 } 718 } 719 720 @Nonnull 721 protected RequestResult toRequestResult(@Nonnull Request request, 722 @Nullable ResourceMethod resourceMethod) throws Throwable { 723 ResourceMethodParameterProvider resourceMethodParameterProvider = getSokletConfig().getResourceMethodParameterProvider(); 724 InstanceProvider instanceProvider = getSokletConfig().getInstanceProvider(); 725 CorsAuthorizer corsAuthorizer = getSokletConfig().getCorsAuthorizer(); 726 ResourceMethodResolver resourceMethodResolver = getSokletConfig().getResourceMethodResolver(); 727 ResponseMarshaler responseMarshaler = getSokletConfig().getResponseMarshaler(); 728 CorsPreflight corsPreflight = request.getCorsPreflight().orElse(null); 729 730 // Special short-circuit for big requests 731 if (request.isContentTooLarge()) 732 return RequestResult.withMarshaledResponse(responseMarshaler.forContentTooLarge(request, resourceMethodResolver.resourceMethodForRequest(request).orElse(null))) 733 .resourceMethod(resourceMethod) 734 .build(); 735 736 // No resource method was found for this HTTP method and path. 737 if (resourceMethod == null) { 738 // If this was an OPTIONS request, do special processing. 739 // If not, figure out if we should return a 404 or 405. 740 if (request.getHttpMethod() == HttpMethod.OPTIONS) { 741 // See what methods are available to us for this request's path 742 Map<HttpMethod, ResourceMethod> matchingResourceMethodsByHttpMethod = resolveMatchingResourceMethodsByHttpMethod(request, resourceMethodResolver); 743 744 // Special handling for CORS preflight requests, if needed 745 if (corsPreflight != null) { 746 // Let configuration function determine if we should authorize this request. 747 // Discard any OPTIONS references - see https://stackoverflow.com/a/68529748 748 Map<HttpMethod, ResourceMethod> nonOptionsMatchingResourceMethodsByHttpMethod = matchingResourceMethodsByHttpMethod.entrySet().stream() 749 .filter(entry -> entry.getKey() != HttpMethod.OPTIONS) 750 .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); 751 752 CorsPreflightResponse corsPreflightResponse = corsAuthorizer.authorizePreflight(request, corsPreflight, nonOptionsMatchingResourceMethodsByHttpMethod).orElse(null); 753 754 // Allow or reject CORS depending on what the function said to do 755 if (corsPreflightResponse != null) { 756 // Allow 757 MarshaledResponse marshaledResponse = responseMarshaler.forCorsPreflightAllowed(request, corsPreflight, corsPreflightResponse); 758 759 return RequestResult.withMarshaledResponse(marshaledResponse) 760 .corsPreflightResponse(corsPreflightResponse) 761 .resourceMethod(resourceMethod) 762 .build(); 763 } 764 765 // Reject 766 return RequestResult.withMarshaledResponse(responseMarshaler.forCorsPreflightRejected(request, corsPreflight)) 767 .resourceMethod(resourceMethod) 768 .build(); 769 } else { 770 // Just a normal OPTIONS response (non-CORS-preflight). 771 // If there's a matching OPTIONS resource method for this OPTIONS request, then invoke it. 772 ResourceMethod optionsResourceMethod = matchingResourceMethodsByHttpMethod.get(HttpMethod.OPTIONS); 773 774 if (optionsResourceMethod != null) { 775 resourceMethod = optionsResourceMethod; 776 } else { 777 // Ensure OPTIONS is always present in the map, even if there is no explicit matching resource method for it 778 if (!matchingResourceMethodsByHttpMethod.containsKey(HttpMethod.OPTIONS)) 779 matchingResourceMethodsByHttpMethod.put(HttpMethod.OPTIONS, null); 780 781 // Ensure HEAD is always present in the map, even if there is no explicit matching resource method for it 782 if (!matchingResourceMethodsByHttpMethod.containsKey(HttpMethod.HEAD)) 783 matchingResourceMethodsByHttpMethod.put(HttpMethod.HEAD, null); 784 785 return RequestResult.withMarshaledResponse(responseMarshaler.forOptions(request, matchingResourceMethodsByHttpMethod.keySet())) 786 .resourceMethod(resourceMethod) 787 .build(); 788 } 789 } 790 } else if (request.getHttpMethod() == HttpMethod.HEAD) { 791 // If there's a matching GET resource method for this HEAD request, then invoke it 792 Request headGetRequest = Request.with(HttpMethod.GET, request.getUri()).build(); 793 ResourceMethod headGetResourceMethod = resourceMethodResolver.resourceMethodForRequest(headGetRequest).orElse(null); 794 795 if (headGetResourceMethod != null) 796 resourceMethod = headGetResourceMethod; 797 else 798 return RequestResult.withMarshaledResponse(responseMarshaler.forNotFound(request)) 799 .resourceMethod(resourceMethod) 800 .build(); 801 } else { 802 // Not an OPTIONS request, so it's possible we have a 405. See if other HTTP methods match... 803 Map<HttpMethod, ResourceMethod> otherMatchingResourceMethodsByHttpMethod = resolveMatchingResourceMethodsByHttpMethod(request, resourceMethodResolver); 804 805 Set<HttpMethod> matchingNonOptionsHttpMethods = otherMatchingResourceMethodsByHttpMethod.keySet().stream() 806 .filter(httpMethod -> httpMethod != HttpMethod.OPTIONS) 807 .collect(Collectors.toSet()); 808 809 // Ensure OPTIONS is always present in the map, even if there is no explicit matching resource method for it 810 if (!otherMatchingResourceMethodsByHttpMethod.containsKey(HttpMethod.OPTIONS)) 811 otherMatchingResourceMethodsByHttpMethod.put(HttpMethod.OPTIONS, null); 812 813 // Ensure HEAD is always present in the map, even if there is no explicit matching resource method for it 814 if (!otherMatchingResourceMethodsByHttpMethod.containsKey(HttpMethod.HEAD)) 815 otherMatchingResourceMethodsByHttpMethod.put(HttpMethod.HEAD, null); 816 817 if (matchingNonOptionsHttpMethods.size() > 0) { 818 // ...if some do, it's a 405 819 return RequestResult.withMarshaledResponse(responseMarshaler.forMethodNotAllowed(request, otherMatchingResourceMethodsByHttpMethod.keySet())) 820 .resourceMethod(resourceMethod) 821 .build(); 822 } else { 823 // no matching resource method found, it's a 404 824 return RequestResult.withMarshaledResponse(responseMarshaler.forNotFound(request)) 825 .resourceMethod(resourceMethod) 826 .build(); 827 } 828 } 829 } 830 831 // Found a resource method - happy path. 832 // 1. Get an instance of the resource class 833 // 2. Get values to pass to the resource method on the resource class 834 // 3. Invoke the resource method and use its return value to drive a response 835 Class<?> resourceClass = resourceMethod.getMethod().getDeclaringClass(); 836 Object resourceClassInstance; 837 838 try { 839 resourceClassInstance = instanceProvider.provide(resourceClass); 840 } catch (Exception e) { 841 throw new IllegalArgumentException(format("Unable to acquire an instance of %s", resourceClass.getName()), e); 842 } 843 844 List<Object> parameterValues = resourceMethodParameterProvider.parameterValuesForResourceMethod(request, resourceMethod); 845 846 Object responseObject; 847 848 try { 849 responseObject = resourceMethod.getMethod().invoke(resourceClassInstance, parameterValues.toArray()); 850 } catch (InvocationTargetException e) { 851 if (e.getTargetException() != null) 852 throw e.getTargetException(); 853 854 throw e; 855 } 856 857 // Unwrap the Optional<T>, if one exists. We do not recurse deeper than one level 858 if (responseObject instanceof Optional<?>) 859 responseObject = ((Optional<?>) responseObject).orElse(null); 860 861 Response response; 862 863 // If null/void return, it's a 204 864 // If it's a MarshaledResponse object, no marshaling + return it immediately - caller knows exactly what it wants to write. 865 // If it's a Response object, use as is. 866 // If it's a non-Response type of object, assume it's the response body and wrap in a Response. 867 if (responseObject == null) 868 response = Response.withStatusCode(204).build(); 869 else if (responseObject instanceof MarshaledResponse) 870 return RequestResult.withMarshaledResponse((MarshaledResponse) responseObject) 871 .resourceMethod(resourceMethod) 872 .build(); 873 else if (responseObject instanceof Response) 874 response = (Response) responseObject; 875 else 876 response = Response.withStatusCode(200).body(responseObject).build(); 877 878 MarshaledResponse marshaledResponse = responseMarshaler.forHappyPath(request, response, resourceMethod); 879 880 return RequestResult.withMarshaledResponse(marshaledResponse) 881 .response(response) 882 .resourceMethod(resourceMethod) 883 .build(); 884 } 885 886 @Nonnull 887 protected MarshaledResponse applyHeadResponseIfApplicable(@Nonnull Request request, 888 @Nonnull MarshaledResponse marshaledResponse) { 889 if (request.getHttpMethod() != HttpMethod.HEAD) 890 return marshaledResponse; 891 892 return getSokletConfig().getResponseMarshaler().forHead(request, marshaledResponse); 893 } 894 895 // Hat tip to Aslan Parçası and GrayStar 896 @Nonnull 897 protected MarshaledResponse applyCommonPropertiesToMarshaledResponse(@Nonnull Request request, 898 @Nonnull MarshaledResponse marshaledResponse) { 899 requireNonNull(request); 900 requireNonNull(marshaledResponse); 901 902 marshaledResponse = applyContentLengthIfApplicable(request, marshaledResponse); 903 marshaledResponse = applyCorsResponseIfApplicable(request, marshaledResponse); 904 905 return marshaledResponse; 906 } 907 908 @Nonnull 909 protected MarshaledResponse applyContentLengthIfApplicable(@Nonnull Request request, 910 @Nonnull MarshaledResponse marshaledResponse) { 911 requireNonNull(request); 912 requireNonNull(marshaledResponse); 913 914 Set<String> normalizedHeaderNames = marshaledResponse.getHeaders().keySet().stream() 915 .map(headerName -> headerName.toLowerCase(Locale.US)) 916 .collect(Collectors.toSet()); 917 918 // If Content-Length is already specified, don't do anything 919 if (normalizedHeaderNames.contains("content-length")) 920 return marshaledResponse; 921 922 // If Content-Length is not specified, specify as the number of bytes in the body 923 return marshaledResponse.copy() 924 .headers((mutableHeaders) -> { 925 String contentLengthHeaderValue = String.valueOf(marshaledResponse.getBody().orElse(emptyByteArray()).length); 926 mutableHeaders.put("Content-Length", Set.of(contentLengthHeaderValue)); 927 }).finish(); 928 } 929 930 @Nonnull 931 protected MarshaledResponse applyCorsResponseIfApplicable(@Nonnull Request request, 932 @Nonnull MarshaledResponse marshaledResponse) { 933 requireNonNull(request); 934 requireNonNull(marshaledResponse); 935 936 Cors cors = request.getCors().orElse(null); 937 938 // If non-CORS request, nothing further to do (note that CORS preflight was handled earlier) 939 if (cors == null) 940 return marshaledResponse; 941 942 CorsAuthorizer corsAuthorizer = getSokletConfig().getCorsAuthorizer(); 943 944 // Does the authorizer say we are authorized? 945 CorsResponse corsResponse = corsAuthorizer.authorize(request, cors).orElse(null); 946 947 // Not authorized - don't apply CORS headers to the response 948 if (corsResponse == null) 949 return marshaledResponse; 950 951 // Authorized - OK, let's apply the headers to the response 952 return getSokletConfig().getResponseMarshaler().forCorsAllowed(request, cors, corsResponse, marshaledResponse); 953 } 954 955 @Nonnull 956 protected Map<HttpMethod, ResourceMethod> resolveMatchingResourceMethodsByHttpMethod(@Nonnull Request request, 957 @Nonnull ResourceMethodResolver resourceMethodResolver) { 958 requireNonNull(request); 959 requireNonNull(resourceMethodResolver); 960 961 Map<HttpMethod, ResourceMethod> matchingResourceMethodsByHttpMethod = new LinkedHashMap<>(HttpMethod.values().length); 962 963 for (HttpMethod httpMethod : HttpMethod.values()) { 964 Request otherRequest = Request.with(httpMethod, request.getUri()).build(); 965 ResourceMethod resourceMethod = resourceMethodResolver.resourceMethodForRequest(otherRequest).orElse(null); 966 967 if (resourceMethod != null) 968 matchingResourceMethodsByHttpMethod.put(httpMethod, resourceMethod); 969 } 970 971 return matchingResourceMethodsByHttpMethod; 972 } 973 974 @Nonnull 975 protected MarshaledResponse provideFailsafeMarshaledResponse(@Nonnull Request request, 976 @Nonnull Throwable throwable) { 977 requireNonNull(request); 978 requireNonNull(throwable); 979 980 Integer statusCode = 500; 981 Charset charset = StandardCharsets.UTF_8; 982 983 return MarshaledResponse.withStatusCode(statusCode) 984 .headers(Map.of("Content-Type", Set.of(format("text/plain; charset=%s", charset.name())))) 985 .body(format("HTTP %d: %s", statusCode, StatusCode.fromStatusCode(statusCode).get().getReasonPhrase()).getBytes(charset)) 986 .build(); 987 } 988 989 /** 990 * Synonym for {@link #stop()}. 991 */ 992 @Override 993 public void close() { 994 stop(); 995 } 996 997 /** 998 * Is either the managed {@link Server} or {@link ServerSentEventServer} started? 999 * 1000 * @return {@code true} if at least one is started, {@code false} otherwise 1001 */ 1002 @Nonnull 1003 public Boolean isStarted() { 1004 getLock().lock(); 1005 1006 try { 1007 if (getSokletConfig().getServer().isStarted()) 1008 return true; 1009 1010 ServerSentEventServer serverSentEventServer = getSokletConfig().getServerSentEventServer().orElse(null); 1011 return serverSentEventServer != null && serverSentEventServer.isStarted(); 1012 } finally { 1013 getLock().unlock(); 1014 } 1015 } 1016 1017 /** 1018 * Runs Soklet with a special "simulator" server that is useful for integration testing. 1019 * <p> 1020 * See <a href="https://www.soklet.com/docs/automated-testing">https://www.soklet.com/docs/automated-testing</a> for how to write these tests. 1021 * 1022 * @param sokletConfig configuration that drives the Soklet system 1023 * @param simulatorConsumer code to execute within the context of the simulator 1024 */ 1025 public static void runSimulator(@Nonnull SokletConfig sokletConfig, 1026 @Nonnull Consumer<Simulator> simulatorConsumer) { 1027 requireNonNull(sokletConfig); 1028 requireNonNull(simulatorConsumer); 1029 1030 MockServer server = new MockServer(); 1031 MockServerSentEventServer serverSentEventServer = new MockServerSentEventServer(); 1032 1033 SokletConfig mockConfiguration = sokletConfig.copy() 1034 .server(server) 1035 .serverSentEventServer(serverSentEventServer) 1036 .finish(); 1037 1038 Simulator simulator = new DefaultSimulator(server, serverSentEventServer); 1039 1040 try (Soklet soklet = Soklet.withConfig(mockConfiguration)) { 1041 soklet.start(); 1042 simulatorConsumer.accept(simulator); 1043 } catch (RuntimeException e) { 1044 throw e; 1045 } catch (Exception e) { 1046 throw new RuntimeException(e); 1047 } 1048 } 1049 1050 @Nonnull 1051 protected SokletConfig getSokletConfig() { 1052 return this.sokletConfig; 1053 } 1054 1055 @Nonnull 1056 protected ReentrantLock getLock() { 1057 return this.lock; 1058 } 1059 1060 @Nonnull 1061 protected AtomicReference<CountDownLatch> getAwaitShutdownLatchReference() { 1062 return this.awaitShutdownLatchReference; 1063 } 1064 1065 @ThreadSafe 1066 static class DefaultSimulator implements Simulator { 1067 @Nullable 1068 private MockServer server; 1069 @Nullable 1070 private MockServerSentEventServer serverSentEventServer; 1071 1072 public DefaultSimulator(@Nonnull MockServer server, 1073 @Nonnull MockServerSentEventServer serverSentEventServer) { 1074 requireNonNull(server); 1075 requireNonNull(serverSentEventServer); 1076 1077 this.server = server; 1078 this.serverSentEventServer = serverSentEventServer; 1079 } 1080 1081 @Nonnull 1082 @Override 1083 public RequestResult performRequest(@Nonnull Request request) { 1084 AtomicReference<RequestResult> requestResultHolder = new AtomicReference<>(); 1085 Server.RequestHandler requestHandler = getServer().getRequestHandler().orElse(null); 1086 1087 if (requestHandler == null) 1088 throw new IllegalStateException("You must register a request handler prior to simulating requests"); 1089 1090 requestHandler.handleRequest(request, (requestResult -> { 1091 requestResultHolder.set(requestResult); 1092 })); 1093 1094 return requestResultHolder.get(); 1095 } 1096 1097 @Override 1098 public void registerServerSentEventConsumer(@Nonnull ResourcePath resourcePath, 1099 @Nonnull Consumer<ServerSentEvent> serverSentEventConsumer) { 1100 requireNonNull(resourcePath); 1101 requireNonNull(serverSentEventConsumer); 1102 1103 // Delegate to the mock SSE server 1104 getServerSentEventServer().registerServerSentEventConsumer(resourcePath, serverSentEventConsumer); 1105 } 1106 1107 @Nonnull 1108 @Override 1109 public ServerSentEventBroadcaster acquireServerSentEventBroadcaster(@Nonnull ResourcePath resourcePath) { 1110 requireNonNull(resourcePath); 1111 1112 // Delegate to the mock SSE server. 1113 // We know the mock will always provide us with a broadcaster, so it's safe to immediately "get" the result 1114 return getServerSentEventServer().acquireBroadcaster(resourcePath).get(); 1115 } 1116 1117 @Nullable 1118 protected MockServer getServer() { 1119 return this.server; 1120 } 1121 1122 @Nullable 1123 protected MockServerSentEventServer getServerSentEventServer() { 1124 return this.serverSentEventServer; 1125 } 1126 } 1127 1128 /** 1129 * Mock server that doesn't touch the network at all, useful for testing. 1130 * 1131 * @author <a href="https://www.revetkn.com">Mark Allen</a> 1132 */ 1133 @ThreadSafe 1134 static class MockServer implements Server { 1135 @Nullable 1136 private SokletConfig sokletConfig; 1137 @Nullable 1138 private Server.RequestHandler requestHandler; 1139 1140 @Override 1141 public void start() { 1142 // No-op 1143 } 1144 1145 @Override 1146 public void stop() { 1147 // No-op 1148 } 1149 1150 @Nonnull 1151 @Override 1152 public Boolean isStarted() { 1153 return true; 1154 } 1155 1156 @Override 1157 public void initialize(@Nonnull SokletConfig sokletConfig, 1158 @Nonnull RequestHandler requestHandler) { 1159 requireNonNull(sokletConfig); 1160 requireNonNull(requestHandler); 1161 1162 this.requestHandler = requestHandler; 1163 } 1164 1165 @Nonnull 1166 protected Optional<SokletConfig> getSokletConfig() { 1167 return Optional.ofNullable(this.sokletConfig); 1168 } 1169 1170 @Nonnull 1171 protected Optional<RequestHandler> getRequestHandler() { 1172 return Optional.ofNullable(this.requestHandler); 1173 } 1174 } 1175 1176 /** 1177 * Mock Server-Sent Event broadcaster that doesn't touch the network at all, useful for testing. 1178 */ 1179 @ThreadSafe 1180 static class MockServerSentEventBroadcaster implements ServerSentEventBroadcaster { 1181 @Nonnull 1182 private final ResourcePath resourcePath; 1183 @Nonnull 1184 private final Set<Consumer<ServerSentEvent>> serverSentEventConsumers; 1185 1186 public MockServerSentEventBroadcaster(@Nonnull ResourcePath resourcePath) { 1187 requireNonNull(resourcePath); 1188 1189 this.resourcePath = resourcePath; 1190 this.serverSentEventConsumers = ConcurrentHashMap.newKeySet(); 1191 } 1192 1193 @Nonnull 1194 @Override 1195 public ResourcePath getResourcePath() { 1196 return this.resourcePath; 1197 } 1198 1199 @Nonnull 1200 @Override 1201 public Long getClientCount() { 1202 return Long.valueOf(getServerSentEventConsumers().size()); 1203 } 1204 1205 @Override 1206 public void broadcast(@Nonnull ServerSentEvent serverSentEvent) { 1207 requireNonNull(serverSentEvent); 1208 1209 for (Consumer<ServerSentEvent> serverSentEventConsumer : getServerSentEventConsumers()) { 1210 try { 1211 serverSentEventConsumer.accept(serverSentEvent); 1212 } catch (Throwable throwable) { 1213 // TODO: revisit this - should we communicate back exceptions, and should we fire these on separate threads for "realism" (probably not)? 1214 throwable.printStackTrace(); 1215 } 1216 } 1217 } 1218 1219 @Nonnull 1220 public Boolean registerServerSentEventConsumer(@Nonnull Consumer<ServerSentEvent> serverSentEventConsumer) { 1221 requireNonNull(serverSentEventConsumer); 1222 return getServerSentEventConsumers().add(serverSentEventConsumer); 1223 } 1224 1225 @Nonnull 1226 protected Set<Consumer<ServerSentEvent>> getServerSentEventConsumers() { 1227 return this.serverSentEventConsumers; 1228 } 1229 } 1230 1231 /** 1232 * Mock Server-Sent Event server that doesn't touch the network at all, useful for testing. 1233 * 1234 * @author <a href="https://www.revetkn.com">Mark Allen</a> 1235 */ 1236 @ThreadSafe 1237 static class MockServerSentEventServer implements ServerSentEventServer { 1238 @Nullable 1239 private SokletConfig sokletConfig; 1240 @Nullable 1241 private ServerSentEventServer.RequestHandler requestHandler; 1242 @Nonnull 1243 private final ConcurrentHashMap<ResourcePath, MockServerSentEventBroadcaster> broadcastersByResourcePath; 1244 1245 public MockServerSentEventServer() { 1246 this.broadcastersByResourcePath = new ConcurrentHashMap<>(); 1247 } 1248 1249 @Override 1250 public void start() { 1251 // No-op 1252 } 1253 1254 @Override 1255 public void stop() { 1256 // No-op 1257 } 1258 1259 @Nonnull 1260 @Override 1261 public Boolean isStarted() { 1262 return true; 1263 } 1264 1265 @Nonnull 1266 @Override 1267 public Optional<? extends ServerSentEventBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath) { 1268 if (resourcePath == null) 1269 return Optional.empty(); 1270 1271 MockServerSentEventBroadcaster broadcaster = getBroadcastersByResourcePath() 1272 .computeIfAbsent(resourcePath, rp -> new MockServerSentEventBroadcaster(rp)); 1273 1274 return Optional.of(broadcaster); 1275 } 1276 1277 public void registerServerSentEventConsumer(@Nonnull ResourcePath resourcePath, 1278 @Nonnull Consumer<ServerSentEvent> serverSentEventConsumer) { 1279 requireNonNull(resourcePath); 1280 requireNonNull(serverSentEventConsumer); 1281 1282 MockServerSentEventBroadcaster broadcaster = getBroadcastersByResourcePath() 1283 .computeIfAbsent(resourcePath, rp -> new MockServerSentEventBroadcaster(rp)); 1284 1285 broadcaster.registerServerSentEventConsumer(serverSentEventConsumer); 1286 } 1287 1288 @Override 1289 public void initialize(@Nonnull SokletConfig sokletConfig, 1290 @Nonnull ServerSentEventServer.RequestHandler requestHandler) { 1291 requireNonNull(sokletConfig); 1292 requireNonNull(requestHandler); 1293 1294 this.sokletConfig = sokletConfig; 1295 this.requestHandler = requestHandler; 1296 } 1297 1298 @Nullable 1299 protected Optional<SokletConfig> getSokletConfig() { 1300 return Optional.ofNullable(this.sokletConfig); 1301 } 1302 1303 @Nullable 1304 protected Optional<ServerSentEventServer.RequestHandler> getRequestHandler() { 1305 return Optional.ofNullable(this.requestHandler); 1306 } 1307 1308 @Nonnull 1309 protected ConcurrentHashMap<ResourcePath, MockServerSentEventBroadcaster> getBroadcastersByResourcePath() { 1310 return this.broadcastersByResourcePath; 1311 } 1312 } 1313}