001/* 002 * Copyright 2022-2025 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet.core.impl; 018 019import com.soklet.SokletConfiguration; 020import com.soklet.core.HttpMethod; 021import com.soklet.core.LifecycleInterceptor; 022import com.soklet.core.LogEvent; 023import com.soklet.core.LogEventType; 024import com.soklet.core.MarshaledResponse; 025import com.soklet.core.Request; 026import com.soklet.core.RequestResult; 027import com.soklet.core.ResourceMethod; 028import com.soklet.core.ResourcePath; 029import com.soklet.core.ResourcePathDeclaration; 030import com.soklet.core.ServerSentEvent; 031import com.soklet.core.ServerSentEventBroadcaster; 032import com.soklet.core.ServerSentEventServer; 033import com.soklet.core.StatusCode; 034import com.soklet.core.Utilities; 035import com.soklet.internal.spring.LinkedCaseInsensitiveMap; 036import com.soklet.internal.util.ConcurrentLruMap; 037 038import javax.annotation.Nonnull; 039import javax.annotation.Nullable; 040import javax.annotation.concurrent.NotThreadSafe; 041import javax.annotation.concurrent.ThreadSafe; 042import java.io.IOException; 043import java.io.InterruptedIOException; 044import java.io.UncheckedIOException; 045import java.net.InetSocketAddress; 046import java.net.SocketTimeoutException; 047import java.net.URI; 048import java.nio.ByteBuffer; 049import java.nio.channels.ServerSocketChannel; 050import java.nio.channels.SocketChannel; 051import java.nio.charset.StandardCharsets; 052import java.time.Duration; 053import java.time.Instant; 054import java.util.ArrayList; 055import java.util.Collection; 056import java.util.LinkedHashSet; 057import java.util.List; 058import java.util.Locale; 059import java.util.Map; 060import java.util.Map.Entry; 061import java.util.Optional; 062import java.util.Set; 063import java.util.concurrent.ArrayBlockingQueue; 064import java.util.concurrent.BlockingQueue; 065import java.util.concurrent.ConcurrentHashMap; 066import java.util.concurrent.ExecutionException; 067import java.util.concurrent.ExecutorService; 068import java.util.concurrent.Executors; 069import java.util.concurrent.Future; 070import java.util.concurrent.ScheduledExecutorService; 071import java.util.concurrent.ThreadFactory; 072import java.util.concurrent.TimeUnit; 073import java.util.concurrent.TimeoutException; 074import java.util.concurrent.atomic.AtomicBoolean; 075import java.util.concurrent.atomic.AtomicInteger; 076import java.util.concurrent.locks.ReentrantLock; 077import java.util.function.Consumer; 078import java.util.function.Function; 079import java.util.function.Supplier; 080import java.util.stream.Collectors; 081 082import static java.lang.String.format; 083import static java.util.Objects.requireNonNull; 084 085/** 086 * @author <a href="https://www.revetkn.com">Mark Allen</a> 087 */ 088@ThreadSafe 089public class DefaultServerSentEventServer implements ServerSentEventServer { 090 @Nonnull 091 private static final String DEFAULT_HOST; 092 @Nonnull 093 private static final Duration DEFAULT_REQUEST_TIMEOUT; 094 @Nonnull 095 private static final Integer DEFAULT_MAXIMUM_REQUEST_SIZE_IN_BYTES; 096 @Nonnull 097 private static final Integer DEFAULT_REQUEST_READ_BUFFER_SIZE_IN_BYTES; 098 @Nonnull 099 private static final Duration DEFAULT_SHUTDOWN_TIMEOUT; 100 101 @Nonnull 102 private static final ServerSentEvent SERVER_SENT_EVENT_CONNECTION_VALIDITY_CHECK; 103 @Nonnull 104 private static final ServerSentEvent SERVER_SENT_EVENT_POISON_PILL; 105 106 static { 107 DEFAULT_HOST = "0.0.0.0"; 108 DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(60); 109 DEFAULT_MAXIMUM_REQUEST_SIZE_IN_BYTES = 1_024 * 1_024; 110 DEFAULT_REQUEST_READ_BUFFER_SIZE_IN_BYTES = 1_024; 111 DEFAULT_SHUTDOWN_TIMEOUT = Duration.ofSeconds(5); 112 113 // Make a unique "validity check" server-sent event used to wake a socket listener thread by injecting it into the relevant write queue. 114 // When this event is taken off of the queue, a validity check is performed on the socket to see if it's still active. 115 // If not, socket is torn down and the thread finishes running. 116 // The contents don't matter; the object reference is used to determine if it's a validity check. 117 SERVER_SENT_EVENT_CONNECTION_VALIDITY_CHECK = ServerSentEvent.withEvent("validity-check").build(); 118 119 // Make a unique "poison pill" server-sent event used to stop a socket listener thread by injecting it into the relevant write queue. 120 // When this event is taken off of the queue, the socket is torn down and the thread finishes running. 121 // The contents don't matter; the object reference is used to determine if it's poison. 122 SERVER_SENT_EVENT_POISON_PILL = ServerSentEvent.withEvent("poison").build(); 123 } 124 125 @Nonnull 126 private final Integer port; 127 @Nonnull 128 private final String host; 129 @Nonnull 130 private final Duration requestTimeout; 131 @Nonnull 132 private final Duration shutdownTimeout; 133 @Nonnull 134 private final Integer maximumRequestSizeInBytes; 135 @Nonnull 136 private final Integer requestReadBufferSizeInBytes; 137 // TODO: we probably want to convert to ConcurrentLruMap 138 @Nonnull 139 private final ConcurrentHashMap<ResourcePath, DefaultServerSentEventBroadcaster> broadcastersByResourcePath; 140 // TODO: we probably want to convert to ConcurrentLruMap 141 @Nonnull 142 private final ConcurrentHashMap<ResourcePath, ResourcePathDeclaration> resourcePathDeclarationsByResourcePathCache; 143 @Nonnull 144 private final ConcurrentLruMap<ServerSentEventConnection, DefaultServerSentEventBroadcaster> globalConnections; 145 @Nonnull 146 private final ReentrantLock lock; 147 @Nonnull 148 private final Supplier<ExecutorService> requestHandlerExecutorServiceSupplier; 149 @Nonnull 150 private final Supplier<ExecutorService> requestReaderExecutorServiceSupplier; 151 @Nonnull 152 private final Integer concurrentConnectionLimit; 153 @Nonnull 154 private final AtomicBoolean stopPoisonPill; 155 @Nullable 156 private volatile ExecutorService requestHandlerExecutorService; 157 @Nullable 158 private volatile ExecutorService requestReaderExecutorService; 159 @Nullable 160 private volatile ScheduledExecutorService connectionValidityExecutorService; 161 @Nonnull 162 private volatile Boolean started; 163 @Nonnull 164 private volatile Boolean stopping; 165 @Nullable 166 private Thread eventLoopThread; 167 // Does not need to be concurrent because it's calculated just once at initialization time and is never modified after 168 @Nonnull 169 private Map<ResourcePathDeclaration, ResourceMethod> resourceMethodsByResourcePathDeclaration; 170 @Nullable 171 private RequestHandler requestHandler; 172 @Nullable 173 private LifecycleInterceptor lifecycleInterceptor; 174 175 @ThreadSafe 176 protected static class DefaultServerSentEventBroadcaster implements ServerSentEventBroadcaster { 177 @Nonnull 178 private final ResourceMethod resourceMethod; 179 @Nonnull 180 private final ResourcePath resourcePath; 181 @Nonnull 182 private final Consumer<ServerSentEventConnection> connectionUnregisteredListener; 183 // This must be threadsafe, e.g. via ConcurrentHashMap#newKeySet 184 @Nonnull 185 private final Set<ServerSentEventConnection> serverSentEventConnections; 186 187 public DefaultServerSentEventBroadcaster(@Nonnull ResourceMethod resourceMethod, 188 @Nonnull ResourcePath resourcePath, 189 @Nonnull Consumer<ServerSentEventConnection> connectionUnregisteredListener) { 190 requireNonNull(resourceMethod); 191 requireNonNull(resourcePath); 192 requireNonNull(connectionUnregisteredListener); 193 194 this.resourceMethod = resourceMethod; 195 this.resourcePath = resourcePath; 196 this.connectionUnregisteredListener = connectionUnregisteredListener; 197 // TODO: let clients specify capacity 198 this.serverSentEventConnections = ConcurrentHashMap.newKeySet(1_024); 199 } 200 201 @Nonnull 202 public ResourceMethod getResourceMethod() { 203 return this.resourceMethod; 204 } 205 206 @Nonnull 207 @Override 208 public ResourcePath getResourcePath() { 209 return this.resourcePath; 210 } 211 212 @Nonnull 213 @Override 214 public Long getClientCount() { 215 return (long) getServerSentEventConnections().size(); 216 } 217 218 @Override 219 public void broadcast(@Nonnull ServerSentEvent serverSentEvent) { 220 requireNonNull(serverSentEvent); 221 222 // We can broadcast from the current thread because putting elements onto blocking queues is reasonably fast. 223 // The blocking queues are consumed by separate per-socket-channel threads 224 for (ServerSentEventConnection serverSentEventConnection : getServerSentEventConnections()) 225 serverSentEventConnection.getWriteQueue().add(serverSentEvent); 226 } 227 228 @Nonnull 229 public Boolean registerServerSentEventConnection(@Nullable ServerSentEventConnection serverSentEventConnection) { 230 if (serverSentEventConnection == null) 231 return false; 232 233 // Underlying set is threadsafe so this is OK 234 return getServerSentEventConnections().add(serverSentEventConnection); 235 } 236 237 @Nonnull 238 public Boolean unregisterServerSentEventConnection(@Nullable ServerSentEventConnection serverSentEventConnection, 239 @Nonnull Boolean sendPoisonPill) { 240 requireNonNull(sendPoisonPill); 241 242 if (serverSentEventConnection == null) 243 return false; 244 245 // Underlying set is threadsafe so this is OK 246 boolean unregistered = getServerSentEventConnections().remove(serverSentEventConnection); 247 248 if (unregistered) { 249 getConnectionUnregisteredListener().accept(serverSentEventConnection); 250 251 // If requested, send a poison pill so the socket thread gets terminated 252 if (sendPoisonPill) 253 serverSentEventConnection.getWriteQueue().add(SERVER_SENT_EVENT_POISON_PILL); 254 } 255 256 return unregistered; 257 } 258 259 @Nonnull 260 public void unregisterAllServerSentEventConnections(@Nonnull Boolean sendPoisonPill) { 261 requireNonNull(sendPoisonPill); 262 263 // TODO: we probably want to have a lock around registration/unregistration 264 for (ServerSentEventConnection serverSentEventConnection : getServerSentEventConnections()) 265 unregisterServerSentEventConnection(serverSentEventConnection, sendPoisonPill); 266 } 267 268 @Nonnull 269 protected Set<ServerSentEventConnection> getServerSentEventConnections() { 270 return this.serverSentEventConnections; 271 } 272 273 @Nonnull 274 protected Consumer<ServerSentEventConnection> getConnectionUnregisteredListener() { 275 return this.connectionUnregisteredListener; 276 } 277 } 278 279 @Nonnull 280 public static Builder withPort(@Nonnull Integer port) { 281 requireNonNull(port); 282 return new Builder(port); 283 } 284 285 protected DefaultServerSentEventServer(@Nonnull Builder builder) { 286 requireNonNull(builder); 287 288 this.stopPoisonPill = new AtomicBoolean(false); 289 this.started = false; 290 this.stopping = false; 291 this.lock = new ReentrantLock(); 292 this.port = builder.port; 293 this.host = builder.host != null ? builder.host : DEFAULT_HOST; 294 this.maximumRequestSizeInBytes = builder.maximumRequestSizeInBytes != null ? builder.maximumRequestSizeInBytes : DEFAULT_MAXIMUM_REQUEST_SIZE_IN_BYTES; 295 this.requestReadBufferSizeInBytes = builder.requestReadBufferSizeInBytes != null ? builder.requestReadBufferSizeInBytes : DEFAULT_REQUEST_READ_BUFFER_SIZE_IN_BYTES; 296 this.requestTimeout = builder.requestTimeout != null ? builder.requestTimeout : DEFAULT_REQUEST_TIMEOUT; 297 this.shutdownTimeout = builder.shutdownTimeout != null ? builder.shutdownTimeout : DEFAULT_SHUTDOWN_TIMEOUT; 298 this.resourceMethodsByResourcePathDeclaration = Map.of(); // Temporary to remain non-null; will be overridden by Soklet via #initialize 299 300 if (this.maximumRequestSizeInBytes <= 0) 301 throw new IllegalArgumentException("Maximum request size must be > 0"); 302 303 if (this.requestReadBufferSizeInBytes <= 0) 304 throw new IllegalArgumentException("Request read buffer size must be > 0"); 305 306 // TODO: let clients specify initial capacity 307 this.broadcastersByResourcePath = new ConcurrentHashMap<>(1_024); 308 this.resourcePathDeclarationsByResourcePathCache = new ConcurrentHashMap<>(1_024); 309 310 // Cowardly refuse to run on anything other than a runtime that supports Virtual threads. 311 if (!Utilities.virtualThreadsAvailable()) 312 throw new IllegalStateException(format("Virtual threads are required for %s", getClass().getSimpleName())); 313 314 this.requestHandlerExecutorServiceSupplier = builder.requestHandlerExecutorServiceSupplier != null ? builder.requestHandlerExecutorServiceSupplier : () -> { 315 String threadNamePrefix = "sse-request-handler-"; 316 317 return Utilities.createVirtualThreadsNewThreadPerTaskExecutor(threadNamePrefix, (Thread thread, Throwable throwable) -> { 318 try { 319 safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unexpected exception occurred during server Server-Sent Event processing") 320 .throwable(throwable) 321 .build()); 322 } catch (Throwable loggingThrowable) { 323 // We are in a bad state - the log operation in the uncaught exception handler failed. 324 // Not much else we can do here but dump to stderr and try to stop the server. 325 throwable.printStackTrace(); 326 loggingThrowable.printStackTrace(); 327 } 328 }); 329 }; 330 331 this.requestReaderExecutorServiceSupplier = () -> { 332 String threadNamePrefix = "sse-request-reader-"; 333 334 return Utilities.createVirtualThreadsNewThreadPerTaskExecutor(threadNamePrefix, (Thread thread, Throwable throwable) -> { 335 try { 336 safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unexpected exception occurred during server Server-Sent Event request reading") 337 .throwable(throwable) 338 .build()); 339 } catch (Throwable loggingThrowable) { 340 // We are in a bad state - the log operation in the uncaught exception handler failed. 341 // Not much else we can do here but dump to stderr and try to stop the server. 342 throwable.printStackTrace(); 343 loggingThrowable.printStackTrace(); 344 } 345 }); 346 }; 347 348 this.concurrentConnectionLimit = builder.concurrentConnectionLimit != null ? builder.concurrentConnectionLimit : 8_192; 349 350 if (this.concurrentConnectionLimit < 1) 351 throw new IllegalArgumentException("The value for concurrentConnectionLimit must be > 0"); 352 353 // Initialize the global LRU map with the specified limit. Assume ConcurrentLRUMap supports a removal listener. 354 this.globalConnections = new ConcurrentLruMap<>(this.concurrentConnectionLimit, (evictedConnection, broadcaster) -> { 355 // This callback is triggered when a connection is evicted from the global LRU map. 356 // Unregister the evicted connection from the broadcaster and send poison pill to close it. 357 broadcaster.unregisterServerSentEventConnection(evictedConnection, true); 358 }); 359 } 360 361 @Override 362 public void initialize(@Nonnull SokletConfiguration sokletConfiguration, 363 @Nonnull RequestHandler requestHandler) { 364 requireNonNull(sokletConfiguration); 365 requireNonNull(requestHandler); 366 367 this.lifecycleInterceptor = sokletConfiguration.getLifecycleInterceptor(); 368 this.requestHandler = requestHandler; 369 370 // Pick out all the @ServerSentEventSource resource methods and store off keyed on resource path for ease of lookup. 371 // This is computed just once here and will never change. 372 // TODO: we should fail-fast if there are multiple @ServerSentEventSource annotations with the same resource path. Should that happen here or at the Soklet level? 373 this.resourceMethodsByResourcePathDeclaration = sokletConfiguration.getResourceMethodResolver().getResourceMethods().stream() 374 .filter(resourceMethod -> resourceMethod.isServerSentEventSource()) 375 .collect(Collectors.toMap(ResourceMethod::getResourcePath, Function.identity())); 376 } 377 378 @Override 379 public void start() { 380 getLock().lock(); 381 382 try { 383 if (isStarted()) 384 return; 385 386 // Should never happen, this would already be set by the Soklet instance 387 if (getRequestHandler().isEmpty()) 388 throw new IllegalStateException(format("No %s was registered for %s", RequestHandler.class, getClass())); 389 390 if (getLifecycleInterceptor().isEmpty()) 391 throw new IllegalStateException(format("No %s was registered for %s", LifecycleInterceptor.class, getClass())); 392 393 this.requestHandlerExecutorService = getRequestHandlerExecutorServiceSupplier().get(); 394 this.requestReaderExecutorService = getRequestReaderExecutorServiceSupplier().get(); 395 this.eventLoopThread = new Thread(this::startInternal, "sse-event-loop"); 396 eventLoopThread.start(); 397 398 this.connectionValidityExecutorService = Executors.newScheduledThreadPool(1, new ThreadFactory() { 399 @Override 400 @Nonnull 401 public Thread newThread(@Nonnull Runnable runnable) { 402 requireNonNull(runnable); 403 return new Thread(runnable, "sse-connection-validator"); 404 } 405 }); 406 407 // TODO: make durations configurable 408 this.connectionValidityExecutorService.scheduleWithFixedDelay(() -> { 409 try { 410 performConnectionValidityTask(); 411 } catch (Throwable throwable) { 412 safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Server-Sent Event connection validity checker encountered an error") 413 .throwable(throwable) 414 .build()); 415 } 416 }, 5, 15, TimeUnit.SECONDS); 417 418 this.started = true; 419 } finally { 420 getLock().unlock(); 421 } 422 } 423 424 protected void performConnectionValidityTask() { 425 Collection<DefaultServerSentEventBroadcaster> broadcasters = getBroadcastersByResourcePath().values(); 426 int i = 0; 427 428 //System.out.println("Global connections (" + getGlobalConnections().size() + "): " + getGlobalConnections()); 429 430 if (broadcasters.size() > 0) { 431 for (DefaultServerSentEventBroadcaster broadcaster : broadcasters) { 432 ++i; 433 //System.out.println(format("Performing validity checks for broadcaster %d of %d (%s)...", i, broadcasters.size(), broadcaster.getResourcePath().getPath())); 434 435 Set<ServerSentEventConnection> serverSentEventConnections = broadcaster.getServerSentEventConnections(); 436 437 //System.out.println(format("This broadcaster has %d SSE connections", serverSentEventConnections.size())); 438 439 if (serverSentEventConnections.size() == 0) { 440 // This broadcaster can be entirely dealloced because it has no more connections. 441 // TODO: this should be more of a failsafe, we should factor into its own method and call this at the end of a socket thread too for immediate cleanup 442 // TODO: broadcaster removes/adds be protected with a "broadcasterLock" 443 //System.out.println("Because this broadcaster has no connections, removing it."); 444 getBroadcastersByResourcePath().remove(broadcaster.getResourcePath()); 445 } else { 446 int j = 0; 447 448 for (ServerSentEventConnection serverSentEventConnection : serverSentEventConnections) { 449 ++j; 450 //System.out.println(format("Enqueuing heartbeat for socket %d of %d...", j, serverSentEventConnections.size())); 451 // TODO: keep track of when the most recent validity check was done so we don't do it too frequently, e.g. with AtomicReference<Instant> on ServerSentEventConnection (nice-to-have) 452 serverSentEventConnection.writeQueue.add(SERVER_SENT_EVENT_CONNECTION_VALIDITY_CHECK); // TODO: should this just be the heartbeat event? 453 } 454 } 455 } 456 } 457 } 458 459 protected void startInternal() { 460 // Handle scenario where server is stopped immediately after starting (and before this thread is scheduled) 461 // TODO: clean this up 462 if (!isStarted() || isStopping()) { 463 //System.out.println("Server is stopped or stopping, exiting SSE event loop..."); 464 return; 465 } 466 467 try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) { 468 serverSocketChannel.bind(new InetSocketAddress(getPort())); 469 470 // Handle scenario where server is stopped immediately after starting (and before this thread is scheduled) 471 // TODO: clean this up 472 if (!isStarted() || isStopping()) { 473 //System.out.println("Server is stopped or stopping, exiting SSE event loop..."); 474 return; 475 } 476 477 ExecutorService executorService = getRequestHandlerExecutorService().get(); 478 479 while (!getStopPoisonPill().get()) { 480 SocketChannel clientSocketChannel = serverSocketChannel.accept(); 481 executorService.submit(() -> handleClientSocketChannel(clientSocketChannel)); 482 } 483 } catch (IOException e) { 484 throw new UncheckedIOException(e); 485 } finally { 486 // In case we are not already being stopped, force a stop 487 stop(); 488 } 489 } 490 491 protected void handleClientSocketChannel(@Nonnull SocketChannel clientSocketChannel) { 492 requireNonNull(clientSocketChannel); 493 494 ClientSocketChannelRegistration clientSocketChannelRegistration = null; 495 Request request = null; 496 ResourceMethod resourceMethod = null; 497 ServerSentEvent serverSentEvent; 498 Instant writeStarted; 499 Throwable throwable = null; 500 501 try (clientSocketChannel) { 502 // Use the socket's address as an identifier 503 String requestIdentifier = clientSocketChannel.getRemoteAddress().toString(); 504 505 try { 506 // TODO: in a future version, we might introduce lifecycle interceptor option here and for Server for "will/didInitiateConnection" 507 String rawRequest = readRequest(requestIdentifier, clientSocketChannel); 508 request = parseRequest(requestIdentifier, rawRequest); 509 } catch (RequestTooLargeIOException e) { 510 // Exception provides a "too large"-flagged request with whatever data we could pull out of it 511 request = e.getTooLargeRequest(); 512 } catch (SocketTimeoutException e) { 513 // TODO: in a future version, we might introduce lifecycle interceptor option here and for Server for "request timed out" 514 throw e; 515 } catch (Exception e) { 516 // TODO: in a future version, we might introduce lifecycle interceptor option here and for Server for "request parsing failed" 517 throw e; 518 } 519 520 //System.out.println(format("Received SSE request on socket: %s", debuggingString(request))); 521 522 // Determine the resource path 523 ResourcePathDeclaration resourcePathDeclaration = matchingResourcePath(request.getResourcePath()).orElse(null); 524 525 if (resourcePathDeclaration != null) 526 resourceMethod = getResourceMethodsByResourcePathDeclaration().get(resourcePathDeclaration); 527 528 AtomicInteger marshaledResponseStatusCode = new AtomicInteger(500); 529 530 // OK, we have a request. 531 // First thing to do is write an HTTP response (status, headers) - and then we keep the socket open for subsequent writes. 532 // To write this initial "handshake" response, we delegate to the Soklet instance, handing it the request we just parsed 533 // and receiving a MarshaledResponse to write. This lets the normal Soklet request processing flow occur. 534 // Subsequent writes to the open socket are done via a ServerSentEventBroadcaster and sidestep the Soklet request processing flow. 535 getRequestHandler().get().handleRequest(request, (@Nonnull RequestResult requestResult) -> { 536 MarshaledResponse marshaledResponse = requestResult.getMarshaledResponse(); 537 538 marshaledResponseStatusCode.set(marshaledResponse.getStatusCode()); 539 String handshakeResponse = toHandshakeResponse(marshaledResponse); 540 541 try { 542 clientSocketChannel.write(ByteBuffer.wrap(handshakeResponse.getBytes(StandardCharsets.UTF_8))); 543 } catch (IOException e) { 544 // TODO: log out? 545 throw new UncheckedIOException("Unable to write initial SSE handshake response", e); 546 } 547 }); 548 549 // Happy path? Register the channel for future ServerSentEvent writes. 550 // If no socket channel registration (404) or >= 300 HTTP status, we're done immediately now that initial data has been written. 551 if (resourceMethod != null && marshaledResponseStatusCode.get() < 300) { 552 getLifecycleInterceptor().get().willEstablishServerSentEventConnection(request, resourceMethod); 553 554 clientSocketChannelRegistration = registerClientSocketChannel(clientSocketChannel, request).get(); 555 556 // TODO: is this the right spot? Should it be lower down? 557 getLifecycleInterceptor().get().didEstablishServerSentEventConnection(request, resourceMethod); 558 559 while (true) { 560 //System.out.println(format("Waiting for SSE broadcasts on socket: %s", debuggingString(request))); 561 serverSentEvent = clientSocketChannelRegistration.serverSentEventConnection().getWriteQueue().take(); 562 563 if (serverSentEvent == SERVER_SENT_EVENT_POISON_PILL) { 564 //System.out.println("Encountered poison pill, exiting..."); 565 break; 566 } 567 568 ByteBuffer buffer = null; 569 570 if (serverSentEvent == SERVER_SENT_EVENT_CONNECTION_VALIDITY_CHECK) { 571 //System.out.println("Performing socket validity check by writing a heartbeat message..."); 572 String message = formatForResponse(ServerSentEvent.forHeartbeat()); 573 buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)); 574 } else { 575 //System.out.println(format("Writing %s to %s...", serverSentEvent, debuggingString(request))); 576 String message = formatForResponse(serverSentEvent); 577 buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)); 578 } 579 580 getLifecycleInterceptor().get().willStartServerSentEventWriting(request, 581 clientSocketChannelRegistration.serverSentEventConnection().getResourceMethod(), serverSentEvent); 582 583 writeStarted = Instant.now(); 584 Throwable writeThrowable = null; 585 586 try { 587 clientSocketChannel.write(buffer); 588 } catch (Throwable t) { 589 writeThrowable = t; 590 } finally { 591 Instant writeFinished = Instant.now(); 592 Duration writeDuration = Duration.between(writeStarted, writeFinished); 593 594 getLifecycleInterceptor().get().didFinishServerSentEventWriting(request, 595 clientSocketChannelRegistration.serverSentEventConnection().getResourceMethod(), serverSentEvent, writeDuration, throwable); 596 597 if (writeThrowable != null) 598 throw writeThrowable; 599 } 600 } 601 } else { 602 String reason = "unknown"; 603 604 if (resourceMethod == null) 605 reason = format("no SSE resource method exists for %s", request.getUri()); 606 else if (marshaledResponseStatusCode.get() >= 300) 607 reason = format("SSE resource method status code is %d", marshaledResponseStatusCode.get()); 608 609 //System.out.println(format("Closing socket %s immediately after handshake instead of waiting for broadcasts. Reason: %s", debuggingString(request), reason)); 610 } 611 } catch (Throwable t) { 612 throwable = t; 613 // System.out.println("Closing socket due to exception: " + t.getMessage()); 614 615 if (t instanceof InterruptedException) 616 Thread.currentThread().interrupt(); // Restore interrupt status 617 } finally { 618 // First, tell the event source to unregister the connection 619 if (clientSocketChannelRegistration != null) { 620 if (resourceMethod != null) 621 getLifecycleInterceptor().get().willTerminateServerSentEventConnection(request, resourceMethod, throwable); 622 623 try { 624 clientSocketChannelRegistration.broadcaster().unregisterServerSentEventConnection(clientSocketChannelRegistration.serverSentEventConnection(), false); 625 626 // System.out.println(format("SSE socket thread completed for request: %s", debuggingString(clientSocketChannelRegistration.serverSentEventConnection().getRequest()))); 627 } catch (Exception exception) { 628 safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to de-register Server-Sent Event connection") 629 .throwable(exception) 630 .build()); 631 } 632 } 633 634 // Then, close the channel itself 635 if (clientSocketChannel != null) { 636 try { 637 // Should already be closed, but just in case 638 clientSocketChannel.close(); 639 } catch (Exception exception) { 640 safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to close Server-Sent Event connection socket channel") 641 .throwable(exception) 642 .build()); 643 } finally { 644 if (clientSocketChannelRegistration != null && resourceMethod != null) { 645 Instant connectionFinished = Instant.now(); 646 Duration connectionDuration = Duration.between(clientSocketChannelRegistration.serverSentEventConnection().getEstablishedAt(), connectionFinished); 647 648 getLifecycleInterceptor().get().didTerminateServerSentEventConnection(request, resourceMethod, connectionDuration, throwable); 649 } 650 } 651 } 652 } 653 } 654 655 @Nonnull 656 protected String toHandshakeResponse(@Nonnull MarshaledResponse marshaledResponse) { 657 requireNonNull(marshaledResponse); 658 659 // For example: 660 // 661 // "HTTP/1.1 200 OK\r\n" + 662 // "Content-Type: text/event-stream\r\n" + 663 // "Cache-Control: no-cache\r\n" + 664 // "X-Accel-Buffering: no\r\n" + 665 // "Connection: keep-alive\r\n\r\n"; 666 667 // TODO: make this a configurable value. It's just here temporarily. 668 final Map<String, Set<String>> DEFAULT_HEADERS = Map.of( 669 "Content-Type", Set.of("text/event-stream"), 670 "Cache-Control", Set.of("no-cache"), 671 "Connection", Set.of("keep-alive"), 672 "X-Accel-Buffering", Set.of("on") 673 ); 674 675 final Set<String> ILLEGAL_HEADER_NAMES = Set.of("Content-Length"); 676 677 int statusCode = marshaledResponse.getStatusCode(); 678 Map<String, Set<String>> headers = marshaledResponse.getHeaders(); 679 680 List<String> lines = new ArrayList<>(1 + headers.size()); 681 682 // e.g. "HTTP/1.1 200 OK" 683 lines.add(format("HTTP/1.1 %d %s", statusCode, StatusCode.fromStatusCode(statusCode).get().getReasonPhrase())); 684 685 // Write default headers 686 // TODO: do these apply for responses > HTTP 299? Probably not. 687 for (Entry<String, Set<String>> entry : DEFAULT_HEADERS.entrySet()) { 688 String headerName = entry.getKey(); 689 Set<String> headerValues = entry.getValue(); 690 691 if (headerValues != null) 692 for (String headerValue : headerValues) 693 lines.add(format("%s: %s", headerName, headerValue)); 694 } 695 696 // Write custom headers 697 for (Entry<String, Set<String>> entry : headers.entrySet()) { 698 // TODO: case-insensitive comparison 699 String headerName = entry.getKey(); 700 Set<String> headerValues = entry.getValue(); 701 702 // Only write headers that are not part of the default set 703 if (!DEFAULT_HEADERS.containsKey(headerName) && !ILLEGAL_HEADER_NAMES.contains(headerName)) 704 if (headerValues != null) 705 for (String headerValue : headerValues) 706 lines.add(format("%s: %s", headerName, headerValue)); 707 } 708 709 return lines.stream().collect(Collectors.joining("\r\n")) + "\r\n\r\n"; 710 } 711 712 @Nonnull 713 protected String formatForResponse(@Nonnull ServerSentEvent serverSentEvent) { 714 requireNonNull(serverSentEvent); 715 716 if (serverSentEvent.isHeartbeat()) 717 return ":\n\n"; 718 719 String event = serverSentEvent.getEvent().orElse(null); 720 721 String data = serverSentEvent.getData().orElse(null); 722 List<String> dataLines = data == null ? List.of() : data.lines() 723 .map(line -> format("data: %s", line)) 724 .toList(); 725 726 String id = serverSentEvent.getId().orElse(null); 727 Duration retry = serverSentEvent.getRetry().orElse(null); 728 729 List<String> lines = new ArrayList<>(16); 730 731 if (event != null) 732 lines.add(format("event: %s", event)); 733 734 if (id != null) 735 lines.add(format("id: %s", id)); 736 737 if (retry != null) 738 lines.add(format("retry: %d", retry.toMillis())); 739 740 if (dataLines.size() > 0) 741 lines.addAll(dataLines); 742 743 if (lines.size() == 0) 744 return ":\n\n"; 745 746 return format("%s\n\n", lines.stream().collect(Collectors.joining("\n"))); 747 } 748 749 @Nonnull 750 protected String debuggingString(@Nonnull Request request) { 751 requireNonNull(request); 752 return format("%s %s %s", request.getId(), request.getHttpMethod().name(), request.getUri()); 753 } 754 755 @ThreadSafe 756 protected static class ServerSentEventConnection { 757 @Nonnull 758 private final Request request; 759 @Nonnull 760 private final ResourceMethod resourceMethod; 761 @Nonnull 762 private final BlockingQueue<ServerSentEvent> writeQueue; 763 @Nonnull 764 private final Instant establishedAt; 765 766 public ServerSentEventConnection(@Nonnull Request request, 767 @Nonnull ResourceMethod resourceMethod) { 768 requireNonNull(request); 769 requireNonNull(resourceMethod); 770 771 this.request = request; 772 this.resourceMethod = resourceMethod; 773 this.writeQueue = new ArrayBlockingQueue<>(8); 774 this.establishedAt = Instant.now(); // Don't use this for anything currently, but might later 775 } 776 777 @Nonnull 778 public Request getRequest() { 779 return this.request; 780 } 781 782 @Nonnull 783 public ResourceMethod getResourceMethod() { 784 return this.resourceMethod; 785 } 786 787 @Nonnull 788 public BlockingQueue<ServerSentEvent> getWriteQueue() { 789 return this.writeQueue; 790 } 791 792 @Nonnull 793 public Instant getEstablishedAt() { 794 return this.establishedAt; 795 } 796 } 797 798 protected record ClientSocketChannelRegistration(@Nonnull ServerSentEventConnection serverSentEventConnection, 799 @Nonnull DefaultServerSentEventBroadcaster broadcaster) { 800 public ClientSocketChannelRegistration { 801 requireNonNull(serverSentEventConnection); 802 requireNonNull(broadcaster); 803 } 804 } 805 806 @Nonnull 807 protected Optional<ClientSocketChannelRegistration> registerClientSocketChannel(@Nonnull SocketChannel clientSocketChannel, 808 @Nonnull Request request) { 809 requireNonNull(clientSocketChannel); 810 requireNonNull(request); 811 812 ResourcePath resourcePath = request.getResourcePath(); 813 814 if (!matchingResourcePath(resourcePath).isPresent()) 815 return Optional.empty(); 816 817 // Get a handle to the event source (it will be created if necessary) 818 DefaultServerSentEventBroadcaster broadcaster = acquireBroadcasterInternal(resourcePath).get(); 819 820 // Create the connection and register it with the EventSource 821 ServerSentEventConnection serverSentEventConnection = new ServerSentEventConnection(request, broadcaster.getResourceMethod()); 822 823 if (!broadcaster.registerServerSentEventConnection(serverSentEventConnection)) 824 return Optional.empty(); 825 826 // Also register the connection globally so we can enforce an overall limit on the number of open connections. 827 // If this causes an eviction, the eviction callback supplied to the ConcurrentLruMap will handle cleanup. 828 getGlobalConnections().put(serverSentEventConnection, broadcaster); 829 830 return Optional.of(new ClientSocketChannelRegistration(serverSentEventConnection, broadcaster)); 831 } 832 833 @Nonnull 834 protected Request parseRequest(@Nonnull String requestIdentifier, 835 @Nonnull String rawRequest) { 836 requireNonNull(requestIdentifier); 837 requireNonNull(rawRequest); 838 839 rawRequest = Utilities.trimAggressivelyToNull(rawRequest); 840 841 if (rawRequest == null) 842 throw new IllegalStateException("Server-Sent Event HTTP request has no data"); 843 844 // Example request structure: 845 // 846 // GET /testing?one=two HTTP/1.1 847 // Host: localhost:8081 848 // Connection: keep-alive 849 // sec-ch-ua-platform: "macOS" 850 // Cache-Control: no-cache 851 // User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 852 // Accept: text/event-stream 853 // sec-ch-ua: "Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99" 854 // sec-ch-ua-mobile: ?0 855 // Origin: null 856 // Sec-Fetch-Site: cross-site 857 // Sec-Fetch-Mode: cors 858 // Sec-Fetch-Dest: empty 859 // Accept-Encoding: gzip, deflate, br, zstd 860 // Accept-Language: en-US,en;q=0.9,fr-CA;q=0.8,fr;q=0.7 861 862 // We know any EventSource request must be a GET. As a result, we know there is no request body. 863 864 // First line is the URL and the rest are headers. 865 // Line 1: GET /testing?one=two HTTP/1.1 866 // Line 2: Accept-Encoding: gzip, deflate, br, zstd 867 // ...and so forth. 868 869 Request.Builder requestBuilder = null; 870 Map<String, Set<String>> headers = new LinkedCaseInsensitiveMap<>(32); 871 872 for (String line : rawRequest.lines().toList()) { 873 line = Utilities.trimAggressivelyToNull(line); 874 875 if (line == null) 876 continue; 877 878 if (requestBuilder == null) { 879 // This is the first line. 880 // Example: GET /testing?one=two HTTP/1.1 881 String[] components = line.split(" "); 882 883 if (components.length != 3) 884 throw new IllegalStateException(format("Malformed Server-Sent Event request line '%s'. Expected a format like 'GET /example?one=two HTTP/1.1'", line)); 885 886 String httpMethod = components[0]; 887 String rawUri = components[1]; 888 URI uri = null; 889 890 if (!httpMethod.equals("GET")) 891 throw new IllegalStateException(format("Malformed Server-Sent Event request line '%s'. Expected a format like 'GET /example?one=two HTTP/1.1'", line)); 892 893 if (rawUri != null) { 894 try { 895 uri = new URI(rawUri); 896 } catch (Exception ignored) { 897 // Malformed URI 898 } 899 } 900 901 if (uri == null) 902 throw new IllegalStateException(format("Malformed Server-Sent Event request line '%s'. Expected a format like 'GET /example?one=two HTTP/1.1'", line)); 903 904 requestBuilder = Request.with(HttpMethod.GET, rawUri); 905 } else { 906 // This is a header line. 907 // Example: Accept-Encoding: gzip, deflate, br, zstd 908 int indexOfFirstColon = line.indexOf(":"); 909 910 if (indexOfFirstColon == -1) 911 throw new IllegalStateException(format("Malformed Server-Sent Event request line '%s'. Expected a format like 'Header-Name: Value", line)); 912 913 String headerName = line.substring(0, indexOfFirstColon); 914 String headerValue = Utilities.trimAggressivelyToNull(line.substring(indexOfFirstColon + 1)); 915 916 Set<String> headerValues = headers.get(headerName); 917 918 if (headerValues == null) { 919 headerValues = new LinkedHashSet<>(); 920 headers.put(headerName, headerValues); 921 } 922 923 // Blank headers will have a key in the map, but an empty set of header values. 924 if (headerValue != null) 925 headerValues.add(headerValue); 926 } 927 } 928 929 return requestBuilder.id(requestIdentifier).headers(headers).build(); 930 } 931 932 @Nonnull 933 protected String readRequest(@Nonnull String requestIdentifier, 934 @Nonnull SocketChannel clientSocketChannel) throws IOException { 935 requireNonNull(requestIdentifier); 936 requireNonNull(clientSocketChannel); 937 938 // Because reads from the socket channel are blocking, there is no way to specify a timeout for it. 939 // We work around this by performing the read in a virtual thread, and use the timeout functionality 940 // built in to Futures to interrupt the thread if it doesn't finish in time. 941 Future<String> readFuture = null; 942 943 try { 944 readFuture = getRequestReaderExecutorService().get().submit(() -> { 945 ByteBuffer buffer = ByteBuffer.allocate(getRequestReadBufferSizeInBytes()); 946 StringBuilder requestBuilder = new StringBuilder(); 947 boolean headersComplete = false; 948 int totalBytesRead = 0; 949 950 while (!headersComplete) { 951 int bytesRead = clientSocketChannel.read(buffer); 952 953 // If the thread was interrupted while blocked in read(...), 954 // the read call should throw InterruptedIOException or similar. 955 if (Thread.interrupted()) 956 throw new InterruptedIOException("Thread interrupted while reading request data"); 957 958 // End of stream (connection closed by client) 959 if (bytesRead == -1) 960 throw new IOException("Client closed the connection before request was complete"); 961 962 // Flip the buffer to read mode 963 buffer.flip(); 964 965 // Decode the buffer content to a string and append to the request 966 byte[] bytes = new byte[buffer.remaining()]; 967 buffer.get(bytes); 968 969 totalBytesRead += bytes.length; 970 971 // Check size limit 972 // To test: 973 // echo -ne 'GET /example HTTP/1.1\r\nHost: example.com FILLER_UNTIL_WE_ARE_TOO_BIG\r\n\r\n' | netcat -v localhost 8081 974 if (totalBytesRead > getMaximumRequestSizeInBytes()) { 975 String rawRequest = requestBuilder.toString(); 976 977 // Given our partial raw request, try to parse it into a request... 978 Request tooLargeRequest = parseTooLargeRequestForRawRequest(requestIdentifier, rawRequest).orElse(null); 979 980 // ...if unable to parse into a request (as in, we can't even make it through the first line), bail 981 if (tooLargeRequest == null) 982 throw new IOException(format("Request is too large (exceeded %d bytes) but we do not have enough data available to know its path", getMaximumRequestSizeInBytes())); 983 984 throw new RequestTooLargeIOException(format("Request too large (exceeded %d bytes)", getMaximumRequestSizeInBytes()), tooLargeRequest); 985 } 986 987 requestBuilder.append(new String(bytes, StandardCharsets.UTF_8)); 988 989 // Check if the headers are complete (look for the "\r\n\r\n" marker) 990 if (requestBuilder.indexOf("\r\n\r\n") != -1) 991 headersComplete = true; 992 993 // Clear the buffer for the next read 994 buffer.clear(); 995 } 996 997 return requestBuilder.toString(); 998 }); 999 1000 // Wait up to the specified timeout for reading to complete 1001 return readFuture.get(getRequestTimeout().getSeconds(), TimeUnit.SECONDS); 1002 } catch (TimeoutException e) { 1003 // Time's up; cancel the task so the blocking read is interrupted 1004 if (readFuture != null) 1005 readFuture.cancel(true); 1006 1007 throw new SocketTimeoutException(format("Reading request took longer than %d seconds", getRequestTimeout().getSeconds())); 1008 } catch (InterruptedException e) { 1009 // Current thread interrupted while waiting 1010 Thread.currentThread().interrupt(); // restore interrupt status 1011 throw new IOException("Interrupted while awaiting request data", e); 1012 } catch (ExecutionException e) { 1013 // The task itself threw an exception 1014 if (e.getCause() instanceof IOException) 1015 throw (IOException) e.getCause(); 1016 1017 throw new IOException("Unexpected exception while reading request", e.getCause()); 1018 } 1019 } 1020 1021 /** 1022 * Given partial raw request data (once we hit max size threshold, we stop collecting it), parse out what we have as 1023 * best we can into a request that is marked "too large". 1024 * <p> 1025 * If there isn't sufficient data to parse into a request (or if the data is malformed), then return the empty value. 1026 */ 1027 @Nonnull 1028 protected Optional<Request> parseTooLargeRequestForRawRequest(@Nonnull String requestIdentifier, 1029 @Nullable String rawRequest) { 1030 requireNonNull(requestIdentifier); 1031 1032 // Supports both relative and absolute paths. 1033 // e.g. "GET /index.html HTTP/1.1\r\n" would return "/index.html". 1034 // e.g. "GET https://www.soklet.com/index.html HTTP/1.1\r\n" would return "/index.html". 1035 String firstLine = null; 1036 1037 int crLfIndex = rawRequest.indexOf("\r\n"); 1038 1039 if (crLfIndex != -1) 1040 firstLine = rawRequest.substring(0, crLfIndex).trim(); 1041 1042 // We don't even have a complete first line of the request 1043 if (firstLine == null || firstLine.length() == 0) 1044 return Optional.empty(); 1045 1046 String[] parts = firstLine.split(" "); 1047 1048 // First line of the request is malformed 1049 if (parts.length < 2) 1050 return Optional.empty(); 1051 1052 String rawHttpMethod = parts[0]; 1053 1054 if (rawHttpMethod != null) 1055 rawHttpMethod = rawHttpMethod.trim().toUpperCase(Locale.ENGLISH); 1056 1057 HttpMethod httpMethod = null; 1058 1059 try { 1060 httpMethod = HttpMethod.valueOf(rawHttpMethod); 1061 } catch (IllegalArgumentException e) { 1062 // Malformed HTTP method specified 1063 return Optional.empty(); 1064 } 1065 1066 String rawUri = parts[1]; 1067 1068 // Validate URI 1069 if (rawUri != null) { 1070 try { 1071 new URI(rawUri.trim()); 1072 } catch (Exception e) { 1073 // Malformed URI specified 1074 return Optional.empty(); 1075 } 1076 } 1077 1078 // TODO: eventually would be nice to parse headers as best we can. For now, we just parse the first request line 1079 return Optional.of(Request.with(httpMethod, rawUri) 1080 .id(requestIdentifier) 1081 .contentTooLarge(true) 1082 .build()); 1083 } 1084 1085 @NotThreadSafe 1086 protected static class RequestTooLargeIOException extends IOException { 1087 @Nonnull 1088 private final Request tooLargeRequest; 1089 1090 public RequestTooLargeIOException(@Nullable String message, 1091 @Nonnull Request tooLargeRequest) { 1092 super(message); 1093 this.tooLargeRequest = requireNonNull(tooLargeRequest); 1094 } 1095 1096 @Nonnull 1097 public Request getTooLargeRequest() { 1098 return this.tooLargeRequest; 1099 } 1100 } 1101 1102 @Override 1103 public void stop() { 1104 getLock().lock(); 1105 1106 boolean interrupted = false; 1107 this.stopping = false; 1108 1109 try { 1110 if (!isStarted()) 1111 return; 1112 1113 this.stopping = true; 1114 1115 try { 1116 this.connectionValidityExecutorService.shutdown(); 1117 this.connectionValidityExecutorService.awaitTermination(getShutdownTimeout().getSeconds(), TimeUnit.SECONDS); 1118 } catch (InterruptedException e) { 1119 interrupted = true; 1120 } catch (Exception e) { 1121 safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to shut down Server-Sent Event connection validity checker") 1122 .throwable(e) 1123 .build()); 1124 } 1125 1126 getStopPoisonPill().set(true); 1127 1128 // TODO: need an additional check/lock to prevent race condition where someone acquires an event source while we are shutting down 1129 for (DefaultServerSentEventBroadcaster broadcaster : getBroadcastersByResourcePath().values()) { 1130 try { 1131 broadcaster.unregisterAllServerSentEventConnections(true); 1132 } catch (Exception e) { 1133 safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to shut down open Server-Sent Event connections") 1134 .throwable(e) 1135 .build()); 1136 } 1137 } 1138 1139 // Clear global connections map for sanity (though it should be empty by this point) 1140 getGlobalConnections().clear(); 1141 1142 try { 1143 getRequestHandlerExecutorService().get().shutdown(); 1144 getRequestHandlerExecutorService().get().awaitTermination(getShutdownTimeout().getSeconds(), TimeUnit.SECONDS); 1145 } catch (InterruptedException e) { 1146 interrupted = true; 1147 } catch (Exception e) { 1148 safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to shut down Server-Sent Event request handler") 1149 .throwable(e) 1150 .build()); 1151 } 1152 1153 try { 1154 getRequestReaderExecutorService().get().shutdown(); 1155 getRequestReaderExecutorService().get().awaitTermination(getShutdownTimeout().getSeconds(), TimeUnit.SECONDS); 1156 } catch (InterruptedException e) { 1157 interrupted = true; 1158 } catch (Exception e) { 1159 safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to shut down Server-Sent Event request reader") 1160 .throwable(e) 1161 .build()); 1162 } 1163 } finally { 1164 this.started = false; 1165 this.eventLoopThread = null; 1166 this.requestHandlerExecutorService = null; 1167 this.requestReaderExecutorService = null; 1168 this.connectionValidityExecutorService = null; 1169 this.getBroadcastersByResourcePath().clear(); 1170 this.getResourcePathDeclarationsByResourcePathCache().clear(); 1171 getStopPoisonPill().set(false); 1172 1173 if (interrupted) 1174 Thread.currentThread().interrupt(); 1175 1176 getLock().unlock(); 1177 } 1178 } 1179 1180 @Nonnull 1181 @Override 1182 public Boolean isStarted() { 1183 getLock().lock(); 1184 1185 try { 1186 return this.started; 1187 } finally { 1188 getLock().unlock(); 1189 } 1190 } 1191 1192 @Nonnull 1193 protected Boolean isStopping() { 1194 getLock().lock(); 1195 1196 try { 1197 return this.stopping; 1198 } finally { 1199 getLock().unlock(); 1200 } 1201 } 1202 1203 @Nonnull 1204 @Override 1205 public Optional<? extends ServerSentEventBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath) { 1206 if (resourcePath == null) 1207 return Optional.empty(); 1208 1209 return acquireBroadcasterInternal(resourcePath); 1210 } 1211 1212 @Nonnull 1213 protected Optional<DefaultServerSentEventBroadcaster> acquireBroadcasterInternal(@Nullable ResourcePath resourcePath) { 1214 if (resourcePath == null) 1215 return Optional.empty(); 1216 1217 ResourcePathDeclaration resourcePathDeclaration = matchingResourcePath(resourcePath).orElse(null); 1218 1219 if (resourcePathDeclaration == null) 1220 return Optional.empty(); 1221 1222 ResourceMethod resourceMethod = getResourceMethodsByResourcePathDeclaration().get(resourcePathDeclaration); 1223 1224 // TODO: should this be sent as a LogEvent? 1225 if (resourceMethod == null) 1226 throw new IllegalStateException(format("Internal error: unable to find %s instance that matches %s", ResourceMethod.class, resourcePathDeclaration)); 1227 1228 // Create the event source if it does not already exist 1229 DefaultServerSentEventBroadcaster broadcaster = getBroadcastersByResourcePath() 1230 .computeIfAbsent(resourcePath, (ignored) -> new DefaultServerSentEventBroadcaster(resourceMethod, resourcePath, (serverSentEventConnection -> { 1231 // When the broadcaster unregisters a connection it manages, remove it from the global set of connections as well 1232 getGlobalConnections().remove(serverSentEventConnection); 1233 }))); 1234 1235 return Optional.of(broadcaster); 1236 } 1237 1238 @Nonnull 1239 protected Optional<ResourcePathDeclaration> matchingResourcePath(@Nullable ResourcePath resourcePath) { 1240 if (resourcePath == null) 1241 return Optional.empty(); 1242 1243 // TODO: convert to computeIfAbsent() 1244 1245 // Try a cache lookup first 1246 ResourcePathDeclaration resourcePathDeclaration = getResourcePathDeclarationsByResourcePathCache().get(resourcePath); 1247 1248 if (resourcePathDeclaration == null) { 1249 // If the cache lookup fails, perform a manual lookup 1250 for (ResourcePathDeclaration registeredResourcePathDeclaration : getResourceMethodsByResourcePathDeclaration().keySet()) { 1251 if (registeredResourcePathDeclaration.matches(resourcePath)) { 1252 resourcePathDeclaration = registeredResourcePathDeclaration; 1253 break; 1254 } 1255 } 1256 1257 // Put the value in the cache for quick access later 1258 if (resourcePathDeclaration != null) 1259 getResourcePathDeclarationsByResourcePathCache().put(resourcePath, resourcePathDeclaration); 1260 } 1261 1262 return Optional.ofNullable(resourcePathDeclaration); 1263 } 1264 1265 protected void safelyLog(@Nonnull LogEvent logEvent) { 1266 requireNonNull(logEvent); 1267 1268 try { 1269 getLifecycleInterceptor().get().didReceiveLogEvent(logEvent); 1270 } catch (Throwable throwable) { 1271 // The LifecycleInterceptor implementation errored out, but we can't let that affect us - swallow its exception. 1272 // Not much else we can do here but dump to stderr 1273 throwable.printStackTrace(); 1274 } 1275 } 1276 1277 @Nonnull 1278 protected Integer getPort() { 1279 return this.port; 1280 } 1281 1282 @Nonnull 1283 protected String getHost() { 1284 return this.host; 1285 } 1286 1287 @Nonnull 1288 protected Duration getRequestTimeout() { 1289 return this.requestTimeout; 1290 } 1291 1292 @Nonnull 1293 protected Duration getShutdownTimeout() { 1294 return this.shutdownTimeout; 1295 } 1296 1297 @Nonnull 1298 protected Integer getMaximumRequestSizeInBytes() { 1299 return this.maximumRequestSizeInBytes; 1300 } 1301 1302 @Nonnull 1303 protected Integer getRequestReadBufferSizeInBytes() { 1304 return this.requestReadBufferSizeInBytes; 1305 } 1306 1307 @Nonnull 1308 public Map<ResourcePathDeclaration, ResourceMethod> getResourceMethodsByResourcePathDeclaration() { 1309 return this.resourceMethodsByResourcePathDeclaration; 1310 } 1311 1312 @Nonnull 1313 protected ConcurrentHashMap<ResourcePath, DefaultServerSentEventBroadcaster> getBroadcastersByResourcePath() { 1314 return this.broadcastersByResourcePath; 1315 } 1316 1317 @Nonnull 1318 protected ConcurrentHashMap<ResourcePath, ResourcePathDeclaration> getResourcePathDeclarationsByResourcePathCache() { 1319 return this.resourcePathDeclarationsByResourcePathCache; 1320 } 1321 1322 @Nonnull 1323 protected Optional<ExecutorService> getRequestHandlerExecutorService() { 1324 return Optional.ofNullable(this.requestHandlerExecutorService); 1325 } 1326 1327 @Nonnull 1328 protected Optional<ExecutorService> getRequestReaderExecutorService() { 1329 return Optional.ofNullable(this.requestReaderExecutorService); 1330 } 1331 1332 @Nonnull 1333 protected ReentrantLock getLock() { 1334 return this.lock; 1335 } 1336 1337 @Nonnull 1338 protected Supplier<ExecutorService> getRequestHandlerExecutorServiceSupplier() { 1339 return this.requestHandlerExecutorServiceSupplier; 1340 } 1341 1342 @Nonnull 1343 protected Supplier<ExecutorService> getRequestReaderExecutorServiceSupplier() { 1344 return this.requestReaderExecutorServiceSupplier; 1345 } 1346 1347 @Nonnull 1348 protected Integer getConcurrentConnectionLimit() { 1349 return this.concurrentConnectionLimit; 1350 } 1351 1352 @Nonnull 1353 protected ConcurrentLruMap<ServerSentEventConnection, DefaultServerSentEventBroadcaster> getGlobalConnections() { 1354 return this.globalConnections; 1355 } 1356 1357 @Nullable 1358 protected ScheduledExecutorService getConnectionValidityExecutorService() { 1359 return this.connectionValidityExecutorService; 1360 } 1361 1362 @Nonnull 1363 protected AtomicBoolean getStopPoisonPill() { 1364 return this.stopPoisonPill; 1365 } 1366 1367 @Nonnull 1368 protected Optional<Thread> getEventLoopThread() { 1369 return Optional.ofNullable(this.eventLoopThread); 1370 } 1371 1372 @Nonnull 1373 protected Optional<RequestHandler> getRequestHandler() { 1374 return Optional.ofNullable(this.requestHandler); 1375 } 1376 1377 @Nonnull 1378 protected Optional<LifecycleInterceptor> getLifecycleInterceptor() { 1379 return Optional.ofNullable(this.lifecycleInterceptor); 1380 } 1381 1382 /** 1383 * Builder used to construct instances of {@link DefaultServerSentEventServer}. 1384 * <p> 1385 * This class is intended for use by a single thread. 1386 * 1387 * @author <a href="https://www.revetkn.com">Mark Allen</a> 1388 */ 1389 @NotThreadSafe 1390 public static class Builder { 1391 @Nonnull 1392 private Integer port; 1393 @Nullable 1394 private String host; 1395 @Nullable 1396 private Duration requestTimeout; 1397 @Nullable 1398 private Duration shutdownTimeout; 1399 @Nullable 1400 private Integer maximumRequestSizeInBytes; 1401 @Nullable 1402 private Integer requestReadBufferSizeInBytes; 1403 @Nullable 1404 private Supplier<ExecutorService> requestHandlerExecutorServiceSupplier; 1405 @Nullable 1406 private Integer concurrentConnectionLimit; 1407 1408 @Nonnull 1409 protected Builder(@Nonnull Integer port) { 1410 requireNonNull(port); 1411 this.port = port; 1412 } 1413 1414 @Nonnull 1415 public Builder port(@Nonnull Integer port) { 1416 requireNonNull(port); 1417 this.port = port; 1418 return this; 1419 } 1420 1421 @Nonnull 1422 public Builder host(@Nullable String host) { 1423 this.host = host; 1424 return this; 1425 } 1426 1427 @Nonnull 1428 public Builder requestTimeout(@Nullable Duration requestTimeout) { 1429 this.requestTimeout = requestTimeout; 1430 return this; 1431 } 1432 1433 @Nonnull 1434 public Builder shutdownTimeout(@Nullable Duration shutdownTimeout) { 1435 this.shutdownTimeout = shutdownTimeout; 1436 return this; 1437 } 1438 1439 @Nonnull 1440 public Builder maximumRequestSizeInBytes(@Nullable Integer maximumRequestSizeInBytes) { 1441 this.maximumRequestSizeInBytes = maximumRequestSizeInBytes; 1442 return this; 1443 } 1444 1445 @Nonnull 1446 public Builder requestReadBufferSizeInBytes(@Nullable Integer requestReadBufferSizeInBytes) { 1447 this.requestReadBufferSizeInBytes = requestReadBufferSizeInBytes; 1448 return this; 1449 } 1450 1451 @Nonnull 1452 public Builder concurrentConnectionLimit(@Nullable Integer concurrentConnectionLimit) { 1453 this.concurrentConnectionLimit = concurrentConnectionLimit; 1454 return this; 1455 } 1456 1457 @Nonnull 1458 public Builder requestHandlerExecutorServiceSupplier(@Nullable Supplier<ExecutorService> requestHandlerExecutorServiceSupplier) { 1459 this.requestHandlerExecutorServiceSupplier = requestHandlerExecutorServiceSupplier; 1460 return this; 1461 } 1462 1463 @Nonnull 1464 public DefaultServerSentEventServer build() { 1465 return new DefaultServerSentEventServer(this); 1466 } 1467 } 1468}