001/*
002 * Copyright 2022-2025 Revetware LLC.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.soklet.core.impl;
018
019import com.soklet.SokletConfiguration;
020import com.soklet.core.HttpMethod;
021import com.soklet.core.LifecycleInterceptor;
022import com.soklet.core.LogEvent;
023import com.soklet.core.LogEventType;
024import com.soklet.core.MarshaledResponse;
025import com.soklet.core.Request;
026import com.soklet.core.RequestResult;
027import com.soklet.core.ResourceMethod;
028import com.soklet.core.ResourcePath;
029import com.soklet.core.ResourcePathDeclaration;
030import com.soklet.core.ServerSentEvent;
031import com.soklet.core.ServerSentEventBroadcaster;
032import com.soklet.core.ServerSentEventServer;
033import com.soklet.core.StatusCode;
034import com.soklet.core.Utilities;
035import com.soklet.internal.spring.LinkedCaseInsensitiveMap;
036import com.soklet.internal.util.ConcurrentLruMap;
037
038import javax.annotation.Nonnull;
039import javax.annotation.Nullable;
040import javax.annotation.concurrent.NotThreadSafe;
041import javax.annotation.concurrent.ThreadSafe;
042import java.io.IOException;
043import java.io.InterruptedIOException;
044import java.io.UncheckedIOException;
045import java.net.InetSocketAddress;
046import java.net.SocketTimeoutException;
047import java.net.URI;
048import java.nio.ByteBuffer;
049import java.nio.channels.ServerSocketChannel;
050import java.nio.channels.SocketChannel;
051import java.nio.charset.StandardCharsets;
052import java.time.Duration;
053import java.time.Instant;
054import java.util.ArrayList;
055import java.util.Collection;
056import java.util.LinkedHashSet;
057import java.util.List;
058import java.util.Locale;
059import java.util.Map;
060import java.util.Map.Entry;
061import java.util.Optional;
062import java.util.Set;
063import java.util.concurrent.ArrayBlockingQueue;
064import java.util.concurrent.BlockingQueue;
065import java.util.concurrent.ConcurrentHashMap;
066import java.util.concurrent.ExecutionException;
067import java.util.concurrent.ExecutorService;
068import java.util.concurrent.Executors;
069import java.util.concurrent.Future;
070import java.util.concurrent.ScheduledExecutorService;
071import java.util.concurrent.ThreadFactory;
072import java.util.concurrent.TimeUnit;
073import java.util.concurrent.TimeoutException;
074import java.util.concurrent.atomic.AtomicBoolean;
075import java.util.concurrent.atomic.AtomicInteger;
076import java.util.concurrent.locks.ReentrantLock;
077import java.util.function.Consumer;
078import java.util.function.Function;
079import java.util.function.Supplier;
080import java.util.stream.Collectors;
081
082import static java.lang.String.format;
083import static java.util.Objects.requireNonNull;
084
085/**
086 * @author <a href="https://www.revetkn.com">Mark Allen</a>
087 */
088@ThreadSafe
089public class DefaultServerSentEventServer implements ServerSentEventServer {
090        @Nonnull
091        private static final String DEFAULT_HOST;
092        @Nonnull
093        private static final Duration DEFAULT_REQUEST_TIMEOUT;
094        @Nonnull
095        private static final Integer DEFAULT_MAXIMUM_REQUEST_SIZE_IN_BYTES;
096        @Nonnull
097        private static final Integer DEFAULT_REQUEST_READ_BUFFER_SIZE_IN_BYTES;
098        @Nonnull
099        private static final Duration DEFAULT_SHUTDOWN_TIMEOUT;
100
101        @Nonnull
102        private static final ServerSentEvent SERVER_SENT_EVENT_CONNECTION_VALIDITY_CHECK;
103        @Nonnull
104        private static final ServerSentEvent SERVER_SENT_EVENT_POISON_PILL;
105
106        static {
107                DEFAULT_HOST = "0.0.0.0";
108                DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(60);
109                DEFAULT_MAXIMUM_REQUEST_SIZE_IN_BYTES = 1_024 * 1_024;
110                DEFAULT_REQUEST_READ_BUFFER_SIZE_IN_BYTES = 1_024;
111                DEFAULT_SHUTDOWN_TIMEOUT = Duration.ofSeconds(5);
112
113                // Make a unique "validity check" server-sent event used to wake a socket listener thread by injecting it into the relevant write queue.
114                // When this event is taken off of the queue, a validity check is performed on the socket to see if it's still active.
115                // If not, socket is torn down and the thread finishes running.
116                // The contents don't matter; the object reference is used to determine if it's a validity check.
117                SERVER_SENT_EVENT_CONNECTION_VALIDITY_CHECK = ServerSentEvent.withEvent("validity-check").build();
118
119                // Make a unique "poison pill" server-sent event used to stop a socket listener thread by injecting it into the relevant write queue.
120                // When this event is taken off of the queue, the socket is torn down and the thread finishes running.
121                // The contents don't matter; the object reference is used to determine if it's poison.
122                SERVER_SENT_EVENT_POISON_PILL = ServerSentEvent.withEvent("poison").build();
123        }
124
125        @Nonnull
126        private final Integer port;
127        @Nonnull
128        private final String host;
129        @Nonnull
130        private final Duration requestTimeout;
131        @Nonnull
132        private final Duration shutdownTimeout;
133        @Nonnull
134        private final Integer maximumRequestSizeInBytes;
135        @Nonnull
136        private final Integer requestReadBufferSizeInBytes;
137        // TODO: we probably want to convert to ConcurrentLruMap
138        @Nonnull
139        private final ConcurrentHashMap<ResourcePath, DefaultServerSentEventBroadcaster> broadcastersByResourcePath;
140        // TODO: we probably want to convert to ConcurrentLruMap
141        @Nonnull
142        private final ConcurrentHashMap<ResourcePath, ResourcePathDeclaration> resourcePathDeclarationsByResourcePathCache;
143        @Nonnull
144        private final ConcurrentLruMap<ServerSentEventConnection, DefaultServerSentEventBroadcaster> globalConnections;
145        @Nonnull
146        private final ReentrantLock lock;
147        @Nonnull
148        private final Supplier<ExecutorService> requestHandlerExecutorServiceSupplier;
149        @Nonnull
150        private final Supplier<ExecutorService> requestReaderExecutorServiceSupplier;
151        @Nonnull
152        private final Integer concurrentConnectionLimit;
153        @Nonnull
154        private final AtomicBoolean stopPoisonPill;
155        @Nullable
156        private volatile ExecutorService requestHandlerExecutorService;
157        @Nullable
158        private volatile ExecutorService requestReaderExecutorService;
159        @Nullable
160        private volatile ScheduledExecutorService connectionValidityExecutorService;
161        @Nonnull
162        private volatile Boolean started;
163        @Nonnull
164        private volatile Boolean stopping;
165        @Nullable
166        private Thread eventLoopThread;
167        // Does not need to be concurrent because it's calculated just once at initialization time and is never modified after
168        @Nonnull
169        private Map<ResourcePathDeclaration, ResourceMethod> resourceMethodsByResourcePathDeclaration;
170        @Nullable
171        private RequestHandler requestHandler;
172        @Nullable
173        private LifecycleInterceptor lifecycleInterceptor;
174
175        @ThreadSafe
176        protected static class DefaultServerSentEventBroadcaster implements ServerSentEventBroadcaster {
177                @Nonnull
178                private final ResourceMethod resourceMethod;
179                @Nonnull
180                private final ResourcePath resourcePath;
181                @Nonnull
182                private final Consumer<ServerSentEventConnection> connectionUnregisteredListener;
183                // This must be threadsafe, e.g. via ConcurrentHashMap#newKeySet
184                @Nonnull
185                private final Set<ServerSentEventConnection> serverSentEventConnections;
186
187                public DefaultServerSentEventBroadcaster(@Nonnull ResourceMethod resourceMethod,
188                                                                                                                                                                                 @Nonnull ResourcePath resourcePath,
189                                                                                                                                                                                 @Nonnull Consumer<ServerSentEventConnection> connectionUnregisteredListener) {
190                        requireNonNull(resourceMethod);
191                        requireNonNull(resourcePath);
192                        requireNonNull(connectionUnregisteredListener);
193
194                        this.resourceMethod = resourceMethod;
195                        this.resourcePath = resourcePath;
196                        this.connectionUnregisteredListener = connectionUnregisteredListener;
197                        // TODO: let clients specify capacity
198                        this.serverSentEventConnections = ConcurrentHashMap.newKeySet(1_024);
199                }
200
201                @Nonnull
202                public ResourceMethod getResourceMethod() {
203                        return this.resourceMethod;
204                }
205
206                @Nonnull
207                @Override
208                public ResourcePath getResourcePath() {
209                        return this.resourcePath;
210                }
211
212                @Nonnull
213                @Override
214                public Long getClientCount() {
215                        return (long) getServerSentEventConnections().size();
216                }
217
218                @Override
219                public void broadcast(@Nonnull ServerSentEvent serverSentEvent) {
220                        requireNonNull(serverSentEvent);
221
222                        // We can broadcast from the current thread because putting elements onto blocking queues is reasonably fast.
223                        // The blocking queues are consumed by separate per-socket-channel threads
224                        for (ServerSentEventConnection serverSentEventConnection : getServerSentEventConnections())
225                                serverSentEventConnection.getWriteQueue().add(serverSentEvent);
226                }
227
228                @Nonnull
229                public Boolean registerServerSentEventConnection(@Nullable ServerSentEventConnection serverSentEventConnection) {
230                        if (serverSentEventConnection == null)
231                                return false;
232
233                        // Underlying set is threadsafe so this is OK
234                        return getServerSentEventConnections().add(serverSentEventConnection);
235                }
236
237                @Nonnull
238                public Boolean unregisterServerSentEventConnection(@Nullable ServerSentEventConnection serverSentEventConnection,
239                                                                                                                                                                                                                         @Nonnull Boolean sendPoisonPill) {
240                        requireNonNull(sendPoisonPill);
241
242                        if (serverSentEventConnection == null)
243                                return false;
244
245                        // Underlying set is threadsafe so this is OK
246                        boolean unregistered = getServerSentEventConnections().remove(serverSentEventConnection);
247
248                        if (unregistered) {
249                                getConnectionUnregisteredListener().accept(serverSentEventConnection);
250
251                                // If requested, send a poison pill so the socket thread gets terminated
252                                if (sendPoisonPill)
253                                        serverSentEventConnection.getWriteQueue().add(SERVER_SENT_EVENT_POISON_PILL);
254                        }
255
256                        return unregistered;
257                }
258
259                @Nonnull
260                public void unregisterAllServerSentEventConnections(@Nonnull Boolean sendPoisonPill) {
261                        requireNonNull(sendPoisonPill);
262
263                        // TODO: we probably want to have a lock around registration/unregistration
264                        for (ServerSentEventConnection serverSentEventConnection : getServerSentEventConnections())
265                                unregisterServerSentEventConnection(serverSentEventConnection, sendPoisonPill);
266                }
267
268                @Nonnull
269                protected Set<ServerSentEventConnection> getServerSentEventConnections() {
270                        return this.serverSentEventConnections;
271                }
272
273                @Nonnull
274                protected Consumer<ServerSentEventConnection> getConnectionUnregisteredListener() {
275                        return this.connectionUnregisteredListener;
276                }
277        }
278
279        @Nonnull
280        public static Builder withPort(@Nonnull Integer port) {
281                requireNonNull(port);
282                return new Builder(port);
283        }
284
285        protected DefaultServerSentEventServer(@Nonnull Builder builder) {
286                requireNonNull(builder);
287
288                this.stopPoisonPill = new AtomicBoolean(false);
289                this.started = false;
290                this.stopping = false;
291                this.lock = new ReentrantLock();
292                this.port = builder.port;
293                this.host = builder.host != null ? builder.host : DEFAULT_HOST;
294                this.maximumRequestSizeInBytes = builder.maximumRequestSizeInBytes != null ? builder.maximumRequestSizeInBytes : DEFAULT_MAXIMUM_REQUEST_SIZE_IN_BYTES;
295                this.requestReadBufferSizeInBytes = builder.requestReadBufferSizeInBytes != null ? builder.requestReadBufferSizeInBytes : DEFAULT_REQUEST_READ_BUFFER_SIZE_IN_BYTES;
296                this.requestTimeout = builder.requestTimeout != null ? builder.requestTimeout : DEFAULT_REQUEST_TIMEOUT;
297                this.shutdownTimeout = builder.shutdownTimeout != null ? builder.shutdownTimeout : DEFAULT_SHUTDOWN_TIMEOUT;
298                this.resourceMethodsByResourcePathDeclaration = Map.of(); // Temporary to remain non-null; will be overridden by Soklet via #initialize
299
300                if (this.maximumRequestSizeInBytes <= 0)
301                        throw new IllegalArgumentException("Maximum request size must be > 0");
302
303                if (this.requestReadBufferSizeInBytes <= 0)
304                        throw new IllegalArgumentException("Request read buffer size must be > 0");
305
306                // TODO: let clients specify initial capacity
307                this.broadcastersByResourcePath = new ConcurrentHashMap<>(1_024);
308                this.resourcePathDeclarationsByResourcePathCache = new ConcurrentHashMap<>(1_024);
309
310                // Cowardly refuse to run on anything other than a runtime that supports Virtual threads.
311                if (!Utilities.virtualThreadsAvailable())
312                        throw new IllegalStateException(format("Virtual threads are required for %s", getClass().getSimpleName()));
313
314                this.requestHandlerExecutorServiceSupplier = builder.requestHandlerExecutorServiceSupplier != null ? builder.requestHandlerExecutorServiceSupplier : () -> {
315                        String threadNamePrefix = "sse-request-handler-";
316
317                        return Utilities.createVirtualThreadsNewThreadPerTaskExecutor(threadNamePrefix, (Thread thread, Throwable throwable) -> {
318                                try {
319                                        safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unexpected exception occurred during server Server-Sent Event processing")
320                                                        .throwable(throwable)
321                                                        .build());
322                                } catch (Throwable loggingThrowable) {
323                                        // We are in a bad state - the log operation in the uncaught exception handler failed.
324                                        // Not much else we can do here but dump to stderr and try to stop the server.
325                                        throwable.printStackTrace();
326                                        loggingThrowable.printStackTrace();
327                                }
328                        });
329                };
330
331                this.requestReaderExecutorServiceSupplier = () -> {
332                        String threadNamePrefix = "sse-request-reader-";
333
334                        return Utilities.createVirtualThreadsNewThreadPerTaskExecutor(threadNamePrefix, (Thread thread, Throwable throwable) -> {
335                                try {
336                                        safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unexpected exception occurred during server Server-Sent Event request reading")
337                                                        .throwable(throwable)
338                                                        .build());
339                                } catch (Throwable loggingThrowable) {
340                                        // We are in a bad state - the log operation in the uncaught exception handler failed.
341                                        // Not much else we can do here but dump to stderr and try to stop the server.
342                                        throwable.printStackTrace();
343                                        loggingThrowable.printStackTrace();
344                                }
345                        });
346                };
347
348                this.concurrentConnectionLimit = builder.concurrentConnectionLimit != null ? builder.concurrentConnectionLimit : 8_192;
349
350                if (this.concurrentConnectionLimit < 1)
351                        throw new IllegalArgumentException("The value for concurrentConnectionLimit must be > 0");
352
353                // Initialize the global LRU map with the specified limit. Assume ConcurrentLRUMap supports a removal listener.
354                this.globalConnections = new ConcurrentLruMap<>(this.concurrentConnectionLimit, (evictedConnection, broadcaster) -> {
355                        // This callback is triggered when a connection is evicted from the global LRU map.
356                        // Unregister the evicted connection from the broadcaster and send poison pill to close it.
357                        broadcaster.unregisterServerSentEventConnection(evictedConnection, true);
358                });
359        }
360
361        @Override
362        public void initialize(@Nonnull SokletConfiguration sokletConfiguration,
363                                                                                                 @Nonnull RequestHandler requestHandler) {
364                requireNonNull(sokletConfiguration);
365                requireNonNull(requestHandler);
366
367                this.lifecycleInterceptor = sokletConfiguration.getLifecycleInterceptor();
368                this.requestHandler = requestHandler;
369
370                // Pick out all the @ServerSentEventSource resource methods and store off keyed on resource path for ease of lookup.
371                // This is computed just once here and will never change.
372                // TODO: we should fail-fast if there are multiple @ServerSentEventSource annotations with the same resource path.  Should that happen here or at the Soklet level?
373                this.resourceMethodsByResourcePathDeclaration = sokletConfiguration.getResourceMethodResolver().getResourceMethods().stream()
374                                .filter(resourceMethod -> resourceMethod.isServerSentEventSource())
375                                .collect(Collectors.toMap(ResourceMethod::getResourcePath, Function.identity()));
376        }
377
378        @Override
379        public void start() {
380                getLock().lock();
381
382                try {
383                        if (isStarted())
384                                return;
385
386                        // Should never happen, this would already be set by the Soklet instance
387                        if (getRequestHandler().isEmpty())
388                                throw new IllegalStateException(format("No %s was registered for %s", RequestHandler.class, getClass()));
389
390                        if (getLifecycleInterceptor().isEmpty())
391                                throw new IllegalStateException(format("No %s was registered for %s", LifecycleInterceptor.class, getClass()));
392
393                        this.requestHandlerExecutorService = getRequestHandlerExecutorServiceSupplier().get();
394                        this.requestReaderExecutorService = getRequestReaderExecutorServiceSupplier().get();
395                        this.eventLoopThread = new Thread(this::startInternal, "sse-event-loop");
396                        eventLoopThread.start();
397
398                        this.connectionValidityExecutorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
399                                @Override
400                                @Nonnull
401                                public Thread newThread(@Nonnull Runnable runnable) {
402                                        requireNonNull(runnable);
403                                        return new Thread(runnable, "sse-connection-validator");
404                                }
405                        });
406
407                        // TODO: make durations configurable
408                        this.connectionValidityExecutorService.scheduleWithFixedDelay(() -> {
409                                try {
410                                        performConnectionValidityTask();
411                                } catch (Throwable throwable) {
412                                        safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Server-Sent Event connection validity checker encountered an error")
413                                                        .throwable(throwable)
414                                                        .build());
415                                }
416                        }, 5, 15, TimeUnit.SECONDS);
417
418                        this.started = true;
419                } finally {
420                        getLock().unlock();
421                }
422        }
423
424        protected void performConnectionValidityTask() {
425                Collection<DefaultServerSentEventBroadcaster> broadcasters = getBroadcastersByResourcePath().values();
426                int i = 0;
427
428                //System.out.println("Global connections (" + getGlobalConnections().size() + "): " + getGlobalConnections());
429
430                if (broadcasters.size() > 0) {
431                        for (DefaultServerSentEventBroadcaster broadcaster : broadcasters) {
432                                ++i;
433                                //System.out.println(format("Performing validity checks for broadcaster %d of %d (%s)...", i, broadcasters.size(), broadcaster.getResourcePath().getPath()));
434
435                                Set<ServerSentEventConnection> serverSentEventConnections = broadcaster.getServerSentEventConnections();
436
437                                //System.out.println(format("This broadcaster has %d SSE connections", serverSentEventConnections.size()));
438
439                                if (serverSentEventConnections.size() == 0) {
440                                        // This broadcaster can be entirely dealloced because it has no more connections.
441                                        // TODO: this should be more of a failsafe, we should factor into its own method and call this at the end of a socket thread too for immediate cleanup
442                                        // TODO: broadcaster removes/adds be protected with a "broadcasterLock"
443                                        //System.out.println("Because this broadcaster has no connections, removing it.");
444                                        getBroadcastersByResourcePath().remove(broadcaster.getResourcePath());
445                                } else {
446                                        int j = 0;
447
448                                        for (ServerSentEventConnection serverSentEventConnection : serverSentEventConnections) {
449                                                ++j;
450                                                //System.out.println(format("Enqueuing heartbeat for socket %d of %d...", j, serverSentEventConnections.size()));
451                                                // TODO: keep track of when the most recent validity check was done so we don't do it too frequently, e.g. with AtomicReference<Instant> on ServerSentEventConnection (nice-to-have)
452                                                serverSentEventConnection.writeQueue.add(SERVER_SENT_EVENT_CONNECTION_VALIDITY_CHECK); // TODO: should this just be the heartbeat event?
453                                        }
454                                }
455                        }
456                }
457        }
458
459        protected void startInternal() {
460                // Handle scenario where server is stopped immediately after starting (and before this thread is scheduled)
461                // TODO: clean this up
462                if (!isStarted() || isStopping()) {
463                        //System.out.println("Server is stopped or stopping, exiting SSE event loop...");
464                        return;
465                }
466
467                try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
468                        serverSocketChannel.bind(new InetSocketAddress(getPort()));
469
470                        // Handle scenario where server is stopped immediately after starting (and before this thread is scheduled)
471                        // TODO: clean this up
472                        if (!isStarted() || isStopping()) {
473                                //System.out.println("Server is stopped or stopping, exiting SSE event loop...");
474                                return;
475                        }
476
477                        ExecutorService executorService = getRequestHandlerExecutorService().get();
478
479                        while (!getStopPoisonPill().get()) {
480                                SocketChannel clientSocketChannel = serverSocketChannel.accept();
481                                executorService.submit(() -> handleClientSocketChannel(clientSocketChannel));
482                        }
483                } catch (IOException e) {
484                        throw new UncheckedIOException(e);
485                } finally {
486                        // In case we are not already being stopped, force a stop
487                        stop();
488                }
489        }
490
491        protected void handleClientSocketChannel(@Nonnull SocketChannel clientSocketChannel) {
492                requireNonNull(clientSocketChannel);
493
494                ClientSocketChannelRegistration clientSocketChannelRegistration = null;
495                Request request = null;
496                ResourceMethod resourceMethod = null;
497                ServerSentEvent serverSentEvent;
498                Instant writeStarted;
499                Throwable throwable = null;
500
501                try (clientSocketChannel) {
502                        // Use the socket's address as an identifier
503                        String requestIdentifier = clientSocketChannel.getRemoteAddress().toString();
504
505                        try {
506                                // TODO: in a future version, we might introduce lifecycle interceptor option here and for Server for "will/didInitiateConnection"
507                                String rawRequest = readRequest(requestIdentifier, clientSocketChannel);
508                                request = parseRequest(requestIdentifier, rawRequest);
509                        } catch (RequestTooLargeIOException e) {
510                                // Exception provides a "too large"-flagged request with whatever data we could pull out of it
511                                request = e.getTooLargeRequest();
512                        } catch (SocketTimeoutException e) {
513                                // TODO: in a future version, we might introduce lifecycle interceptor option here and for Server for "request timed out"
514                                throw e;
515                        } catch (Exception e) {
516                                // TODO: in a future version, we might introduce lifecycle interceptor option here and for Server for "request parsing failed"
517                                throw e;
518                        }
519
520                        //System.out.println(format("Received SSE request on socket: %s", debuggingString(request)));
521
522                        // Determine the resource path
523                        ResourcePathDeclaration resourcePathDeclaration = matchingResourcePath(request.getResourcePath()).orElse(null);
524
525                        if (resourcePathDeclaration != null)
526                                resourceMethod = getResourceMethodsByResourcePathDeclaration().get(resourcePathDeclaration);
527
528                        AtomicInteger marshaledResponseStatusCode = new AtomicInteger(500);
529
530                        // OK, we have a request.
531                        // First thing to do is write an HTTP response (status, headers) - and then we keep the socket open for subsequent writes.
532                        // To write this initial "handshake" response, we delegate to the Soklet instance, handing it the request we just parsed
533                        // and receiving a MarshaledResponse to write.  This lets the normal Soklet request processing flow occur.
534                        // Subsequent writes to the open socket are done via a ServerSentEventBroadcaster and sidestep the Soklet request processing flow.
535                        getRequestHandler().get().handleRequest(request, (@Nonnull RequestResult requestResult) -> {
536                                MarshaledResponse marshaledResponse = requestResult.getMarshaledResponse();
537
538                                marshaledResponseStatusCode.set(marshaledResponse.getStatusCode());
539                                String handshakeResponse = toHandshakeResponse(marshaledResponse);
540
541                                try {
542                                        clientSocketChannel.write(ByteBuffer.wrap(handshakeResponse.getBytes(StandardCharsets.UTF_8)));
543                                } catch (IOException e) {
544                                        // TODO: log out?
545                                        throw new UncheckedIOException("Unable to write initial SSE handshake response", e);
546                                }
547                        });
548
549                        // Happy path? Register the channel for future ServerSentEvent writes.
550                        // If no socket channel registration (404) or >= 300 HTTP status, we're done immediately now that initial data has been written.
551                        if (resourceMethod != null && marshaledResponseStatusCode.get() < 300) {
552                                getLifecycleInterceptor().get().willEstablishServerSentEventConnection(request, resourceMethod);
553
554                                clientSocketChannelRegistration = registerClientSocketChannel(clientSocketChannel, request).get();
555
556                                // TODO: is this the right spot?  Should it be lower down?
557                                getLifecycleInterceptor().get().didEstablishServerSentEventConnection(request, resourceMethod);
558
559                                while (true) {
560                                        //System.out.println(format("Waiting for SSE broadcasts on socket: %s", debuggingString(request)));
561                                        serverSentEvent = clientSocketChannelRegistration.serverSentEventConnection().getWriteQueue().take();
562
563                                        if (serverSentEvent == SERVER_SENT_EVENT_POISON_PILL) {
564                                                //System.out.println("Encountered poison pill, exiting...");
565                                                break;
566                                        }
567
568                                        ByteBuffer buffer = null;
569
570                                        if (serverSentEvent == SERVER_SENT_EVENT_CONNECTION_VALIDITY_CHECK) {
571                                                //System.out.println("Performing socket validity check by writing a heartbeat message...");
572                                                String message = formatForResponse(ServerSentEvent.forHeartbeat());
573                                                buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
574                                        } else {
575                                                //System.out.println(format("Writing %s to %s...", serverSentEvent, debuggingString(request)));
576                                                String message = formatForResponse(serverSentEvent);
577                                                buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
578                                        }
579
580                                        getLifecycleInterceptor().get().willStartServerSentEventWriting(request,
581                                                        clientSocketChannelRegistration.serverSentEventConnection().getResourceMethod(), serverSentEvent);
582
583                                        writeStarted = Instant.now();
584                                        Throwable writeThrowable = null;
585
586                                        try {
587                                                clientSocketChannel.write(buffer);
588                                        } catch (Throwable t) {
589                                                writeThrowable = t;
590                                        } finally {
591                                                Instant writeFinished = Instant.now();
592                                                Duration writeDuration = Duration.between(writeStarted, writeFinished);
593
594                                                getLifecycleInterceptor().get().didFinishServerSentEventWriting(request,
595                                                                clientSocketChannelRegistration.serverSentEventConnection().getResourceMethod(), serverSentEvent, writeDuration, throwable);
596
597                                                if (writeThrowable != null)
598                                                        throw writeThrowable;
599                                        }
600                                }
601                        } else {
602                                String reason = "unknown";
603
604                                if (resourceMethod == null)
605                                        reason = format("no SSE resource method exists for %s", request.getUri());
606                                else if (marshaledResponseStatusCode.get() >= 300)
607                                        reason = format("SSE resource method status code is %d", marshaledResponseStatusCode.get());
608
609                                //System.out.println(format("Closing socket %s immediately after handshake instead of waiting for broadcasts. Reason: %s", debuggingString(request), reason));
610                        }
611                } catch (Throwable t) {
612                        throwable = t;
613                        // System.out.println("Closing socket due to exception: " + t.getMessage());
614
615                        if (t instanceof InterruptedException)
616                                Thread.currentThread().interrupt();  // Restore interrupt status
617                } finally {
618                        // First, tell the event source to unregister the connection
619                        if (clientSocketChannelRegistration != null) {
620                                if (resourceMethod != null)
621                                        getLifecycleInterceptor().get().willTerminateServerSentEventConnection(request, resourceMethod, throwable);
622
623                                try {
624                                        clientSocketChannelRegistration.broadcaster().unregisterServerSentEventConnection(clientSocketChannelRegistration.serverSentEventConnection(), false);
625
626                                        // System.out.println(format("SSE socket thread completed for request: %s", debuggingString(clientSocketChannelRegistration.serverSentEventConnection().getRequest())));
627                                } catch (Exception exception) {
628                                        safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to de-register Server-Sent Event connection")
629                                                        .throwable(exception)
630                                                        .build());
631                                }
632                        }
633
634                        // Then, close the channel itself
635                        if (clientSocketChannel != null) {
636                                try {
637                                        // Should already be closed, but just in case
638                                        clientSocketChannel.close();
639                                } catch (Exception exception) {
640                                        safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to close Server-Sent Event connection socket channel")
641                                                        .throwable(exception)
642                                                        .build());
643                                } finally {
644                                        if (clientSocketChannelRegistration != null && resourceMethod != null) {
645                                                Instant connectionFinished = Instant.now();
646                                                Duration connectionDuration = Duration.between(clientSocketChannelRegistration.serverSentEventConnection().getEstablishedAt(), connectionFinished);
647
648                                                getLifecycleInterceptor().get().didTerminateServerSentEventConnection(request, resourceMethod, connectionDuration, throwable);
649                                        }
650                                }
651                        }
652                }
653        }
654
655        @Nonnull
656        protected String toHandshakeResponse(@Nonnull MarshaledResponse marshaledResponse) {
657                requireNonNull(marshaledResponse);
658
659                // For example:
660                //
661                // "HTTP/1.1 200 OK\r\n" +
662                // "Content-Type: text/event-stream\r\n" +
663                // "Cache-Control: no-cache\r\n" +
664                // "X-Accel-Buffering: no\r\n" +
665                // "Connection: keep-alive\r\n\r\n";
666
667                // TODO: make this a configurable value.  It's just here temporarily.
668                final Map<String, Set<String>> DEFAULT_HEADERS = Map.of(
669                                "Content-Type", Set.of("text/event-stream"),
670                                "Cache-Control", Set.of("no-cache"),
671                                "Connection", Set.of("keep-alive"),
672                                "X-Accel-Buffering", Set.of("on")
673                );
674
675                final Set<String> ILLEGAL_HEADER_NAMES = Set.of("Content-Length");
676
677                int statusCode = marshaledResponse.getStatusCode();
678                Map<String, Set<String>> headers = marshaledResponse.getHeaders();
679
680                List<String> lines = new ArrayList<>(1 + headers.size());
681
682                // e.g. "HTTP/1.1 200 OK"
683                lines.add(format("HTTP/1.1 %d %s", statusCode, StatusCode.fromStatusCode(statusCode).get().getReasonPhrase()));
684
685                // Write default headers
686                // TODO: do these apply for responses > HTTP 299?  Probably not.
687                for (Entry<String, Set<String>> entry : DEFAULT_HEADERS.entrySet()) {
688                        String headerName = entry.getKey();
689                        Set<String> headerValues = entry.getValue();
690
691                        if (headerValues != null)
692                                for (String headerValue : headerValues)
693                                        lines.add(format("%s: %s", headerName, headerValue));
694                }
695
696                // Write custom headers
697                for (Entry<String, Set<String>> entry : headers.entrySet()) {
698                        // TODO: case-insensitive comparison
699                        String headerName = entry.getKey();
700                        Set<String> headerValues = entry.getValue();
701
702                        // Only write headers that are not part of the default set
703                        if (!DEFAULT_HEADERS.containsKey(headerName) && !ILLEGAL_HEADER_NAMES.contains(headerName))
704                                if (headerValues != null)
705                                        for (String headerValue : headerValues)
706                                                lines.add(format("%s: %s", headerName, headerValue));
707                }
708
709                return lines.stream().collect(Collectors.joining("\r\n")) + "\r\n\r\n";
710        }
711
712        @Nonnull
713        protected String formatForResponse(@Nonnull ServerSentEvent serverSentEvent) {
714                requireNonNull(serverSentEvent);
715
716                if (serverSentEvent.isHeartbeat())
717                        return ":\n\n";
718
719                String event = serverSentEvent.getEvent().orElse(null);
720
721                String data = serverSentEvent.getData().orElse(null);
722                List<String> dataLines = data == null ? List.of() : data.lines()
723                                .map(line -> format("data: %s", line))
724                                .toList();
725
726                String id = serverSentEvent.getId().orElse(null);
727                Duration retry = serverSentEvent.getRetry().orElse(null);
728
729                List<String> lines = new ArrayList<>(16);
730
731                if (event != null)
732                        lines.add(format("event: %s", event));
733
734                if (id != null)
735                        lines.add(format("id: %s", id));
736
737                if (retry != null)
738                        lines.add(format("retry: %d", retry.toMillis()));
739
740                if (dataLines.size() > 0)
741                        lines.addAll(dataLines);
742
743                if (lines.size() == 0)
744                        return ":\n\n";
745
746                return format("%s\n\n", lines.stream().collect(Collectors.joining("\n")));
747        }
748
749        @Nonnull
750        protected String debuggingString(@Nonnull Request request) {
751                requireNonNull(request);
752                return format("%s %s %s", request.getId(), request.getHttpMethod().name(), request.getUri());
753        }
754
755        @ThreadSafe
756        protected static class ServerSentEventConnection {
757                @Nonnull
758                private final Request request;
759                @Nonnull
760                private final ResourceMethod resourceMethod;
761                @Nonnull
762                private final BlockingQueue<ServerSentEvent> writeQueue;
763                @Nonnull
764                private final Instant establishedAt;
765
766                public ServerSentEventConnection(@Nonnull Request request,
767                                                                                                                                                 @Nonnull ResourceMethod resourceMethod) {
768                        requireNonNull(request);
769                        requireNonNull(resourceMethod);
770
771                        this.request = request;
772                        this.resourceMethod = resourceMethod;
773                        this.writeQueue = new ArrayBlockingQueue<>(8);
774                        this.establishedAt = Instant.now(); // Don't use this for anything currently, but might later
775                }
776
777                @Nonnull
778                public Request getRequest() {
779                        return this.request;
780                }
781
782                @Nonnull
783                public ResourceMethod getResourceMethod() {
784                        return this.resourceMethod;
785                }
786
787                @Nonnull
788                public BlockingQueue<ServerSentEvent> getWriteQueue() {
789                        return this.writeQueue;
790                }
791
792                @Nonnull
793                public Instant getEstablishedAt() {
794                        return this.establishedAt;
795                }
796        }
797
798        protected record ClientSocketChannelRegistration(@Nonnull ServerSentEventConnection serverSentEventConnection,
799                                                                                                                                                                                                         @Nonnull DefaultServerSentEventBroadcaster broadcaster) {
800                public ClientSocketChannelRegistration {
801                        requireNonNull(serverSentEventConnection);
802                        requireNonNull(broadcaster);
803                }
804        }
805
806        @Nonnull
807        protected Optional<ClientSocketChannelRegistration> registerClientSocketChannel(@Nonnull SocketChannel clientSocketChannel,
808                                                                                                                                                                                                                                                                                                                                        @Nonnull Request request) {
809                requireNonNull(clientSocketChannel);
810                requireNonNull(request);
811
812                ResourcePath resourcePath = request.getResourcePath();
813
814                if (!matchingResourcePath(resourcePath).isPresent())
815                        return Optional.empty();
816
817                // Get a handle to the event source (it will be created if necessary)
818                DefaultServerSentEventBroadcaster broadcaster = acquireBroadcasterInternal(resourcePath).get();
819
820                // Create the connection and register it with the EventSource
821                ServerSentEventConnection serverSentEventConnection = new ServerSentEventConnection(request, broadcaster.getResourceMethod());
822
823                if (!broadcaster.registerServerSentEventConnection(serverSentEventConnection))
824                        return Optional.empty();
825
826                // Also register the connection globally so we can enforce an overall limit on the number of open connections.
827                // If this causes an eviction, the eviction callback supplied to the ConcurrentLruMap will handle cleanup.
828                getGlobalConnections().put(serverSentEventConnection, broadcaster);
829
830                return Optional.of(new ClientSocketChannelRegistration(serverSentEventConnection, broadcaster));
831        }
832
833        @Nonnull
834        protected Request parseRequest(@Nonnull String requestIdentifier,
835                                                                                                                                 @Nonnull String rawRequest) {
836                requireNonNull(requestIdentifier);
837                requireNonNull(rawRequest);
838
839                rawRequest = Utilities.trimAggressivelyToNull(rawRequest);
840
841                if (rawRequest == null)
842                        throw new IllegalStateException("Server-Sent Event HTTP request has no data");
843
844                // Example request structure:
845                //
846                // GET /testing?one=two HTTP/1.1
847                // Host: localhost:8081
848                // Connection: keep-alive
849                // sec-ch-ua-platform: "macOS"
850                // Cache-Control: no-cache
851                // User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36
852                // Accept: text/event-stream
853                // sec-ch-ua: "Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"
854                // sec-ch-ua-mobile: ?0
855                // Origin: null
856                // Sec-Fetch-Site: cross-site
857                // Sec-Fetch-Mode: cors
858                // Sec-Fetch-Dest: empty
859                // Accept-Encoding: gzip, deflate, br, zstd
860                // Accept-Language: en-US,en;q=0.9,fr-CA;q=0.8,fr;q=0.7
861
862                // We know any EventSource request must be a GET.  As a result, we know there is no request body.
863
864                // First line is the URL and the rest are headers.
865                // Line 1: GET /testing?one=two HTTP/1.1
866                // Line 2: Accept-Encoding: gzip, deflate, br, zstd
867                // ...and so forth.
868
869                Request.Builder requestBuilder = null;
870                Map<String, Set<String>> headers = new LinkedCaseInsensitiveMap<>(32);
871
872                for (String line : rawRequest.lines().toList()) {
873                        line = Utilities.trimAggressivelyToNull(line);
874
875                        if (line == null)
876                                continue;
877
878                        if (requestBuilder == null) {
879                                // This is the first line.
880                                // Example: GET /testing?one=two HTTP/1.1
881                                String[] components = line.split(" ");
882
883                                if (components.length != 3)
884                                        throw new IllegalStateException(format("Malformed Server-Sent Event request line '%s'. Expected a format like 'GET /example?one=two HTTP/1.1'", line));
885
886                                String httpMethod = components[0];
887                                String rawUri = components[1];
888                                URI uri = null;
889
890                                if (!httpMethod.equals("GET"))
891                                        throw new IllegalStateException(format("Malformed Server-Sent Event request line '%s'. Expected a format like 'GET /example?one=two HTTP/1.1'", line));
892
893                                if (rawUri != null) {
894                                        try {
895                                                uri = new URI(rawUri);
896                                        } catch (Exception ignored) {
897                                                // Malformed URI
898                                        }
899                                }
900
901                                if (uri == null)
902                                        throw new IllegalStateException(format("Malformed Server-Sent Event request line '%s'. Expected a format like 'GET /example?one=two HTTP/1.1'", line));
903
904                                requestBuilder = Request.with(HttpMethod.GET, rawUri);
905                        } else {
906                                // This is a header line.
907                                // Example: Accept-Encoding: gzip, deflate, br, zstd
908                                int indexOfFirstColon = line.indexOf(":");
909
910                                if (indexOfFirstColon == -1)
911                                        throw new IllegalStateException(format("Malformed Server-Sent Event request line '%s'. Expected a format like 'Header-Name: Value", line));
912
913                                String headerName = line.substring(0, indexOfFirstColon);
914                                String headerValue = Utilities.trimAggressivelyToNull(line.substring(indexOfFirstColon + 1));
915
916                                Set<String> headerValues = headers.get(headerName);
917
918                                if (headerValues == null) {
919                                        headerValues = new LinkedHashSet<>();
920                                        headers.put(headerName, headerValues);
921                                }
922
923                                // Blank headers will have a key in the map, but an empty set of header values.
924                                if (headerValue != null)
925                                        headerValues.add(headerValue);
926                        }
927                }
928
929                return requestBuilder.id(requestIdentifier).headers(headers).build();
930        }
931
932        @Nonnull
933        protected String readRequest(@Nonnull String requestIdentifier,
934                                                                                                                         @Nonnull SocketChannel clientSocketChannel) throws IOException {
935                requireNonNull(requestIdentifier);
936                requireNonNull(clientSocketChannel);
937
938                // Because reads from the socket channel are blocking, there is no way to specify a timeout for it.
939                // We work around this by performing the read in a virtual thread, and use the timeout functionality
940                // built in to Futures to interrupt the thread if it doesn't finish in time.
941                Future<String> readFuture = null;
942
943                try {
944                        readFuture = getRequestReaderExecutorService().get().submit(() -> {
945                                ByteBuffer buffer = ByteBuffer.allocate(getRequestReadBufferSizeInBytes());
946                                StringBuilder requestBuilder = new StringBuilder();
947                                boolean headersComplete = false;
948                                int totalBytesRead = 0;
949
950                                while (!headersComplete) {
951                                        int bytesRead = clientSocketChannel.read(buffer);
952
953                                        // If the thread was interrupted while blocked in read(...),
954                                        // the read call should throw InterruptedIOException or similar.
955                                        if (Thread.interrupted())
956                                                throw new InterruptedIOException("Thread interrupted while reading request data");
957
958                                        // End of stream (connection closed by client)
959                                        if (bytesRead == -1)
960                                                throw new IOException("Client closed the connection before request was complete");
961
962                                        // Flip the buffer to read mode
963                                        buffer.flip();
964
965                                        // Decode the buffer content to a string and append to the request
966                                        byte[] bytes = new byte[buffer.remaining()];
967                                        buffer.get(bytes);
968
969                                        totalBytesRead += bytes.length;
970
971                                        // Check size limit
972                                        // To test:
973                                        // echo -ne 'GET /example HTTP/1.1\r\nHost: example.com FILLER_UNTIL_WE_ARE_TOO_BIG\r\n\r\n' | netcat -v localhost 8081
974                                        if (totalBytesRead > getMaximumRequestSizeInBytes()) {
975                                                String rawRequest = requestBuilder.toString();
976
977                                                // Given our partial raw request, try to parse it into a request...
978                                                Request tooLargeRequest = parseTooLargeRequestForRawRequest(requestIdentifier, rawRequest).orElse(null);
979
980                                                // ...if unable to parse into a request (as in, we can't even make it through the first line), bail
981                                                if (tooLargeRequest == null)
982                                                        throw new IOException(format("Request is too large (exceeded %d bytes) but we do not have enough data available to know its path", getMaximumRequestSizeInBytes()));
983
984                                                throw new RequestTooLargeIOException(format("Request too large (exceeded %d bytes)", getMaximumRequestSizeInBytes()), tooLargeRequest);
985                                        }
986
987                                        requestBuilder.append(new String(bytes, StandardCharsets.UTF_8));
988
989                                        // Check if the headers are complete (look for the "\r\n\r\n" marker)
990                                        if (requestBuilder.indexOf("\r\n\r\n") != -1)
991                                                headersComplete = true;
992
993                                        // Clear the buffer for the next read
994                                        buffer.clear();
995                                }
996
997                                return requestBuilder.toString();
998                        });
999
1000                        // Wait up to the specified timeout for reading to complete
1001                        return readFuture.get(getRequestTimeout().getSeconds(), TimeUnit.SECONDS);
1002                } catch (TimeoutException e) {
1003                        // Time's up; cancel the task so the blocking read is interrupted
1004                        if (readFuture != null)
1005                                readFuture.cancel(true);
1006
1007                        throw new SocketTimeoutException(format("Reading request took longer than %d seconds", getRequestTimeout().getSeconds()));
1008                } catch (InterruptedException e) {
1009                        // Current thread interrupted while waiting
1010                        Thread.currentThread().interrupt(); // restore interrupt status
1011                        throw new IOException("Interrupted while awaiting request data", e);
1012                } catch (ExecutionException e) {
1013                        // The task itself threw an exception
1014                        if (e.getCause() instanceof IOException)
1015                                throw (IOException) e.getCause();
1016
1017                        throw new IOException("Unexpected exception while reading request", e.getCause());
1018                }
1019        }
1020
1021        /**
1022         * Given partial raw request data (once we hit max size threshold, we stop collecting it), parse out what we have as
1023         * best we can into a request that is marked "too large".
1024         * <p>
1025         * If there isn't sufficient data to parse into a request (or if the data is malformed), then return the empty value.
1026         */
1027        @Nonnull
1028        protected Optional<Request> parseTooLargeRequestForRawRequest(@Nonnull String requestIdentifier,
1029                                                                                                                                                                                                                                                                @Nullable String rawRequest) {
1030                requireNonNull(requestIdentifier);
1031
1032                // Supports both relative and absolute paths.
1033                // e.g. "GET /index.html HTTP/1.1\r\n" would return "/index.html".
1034                // e.g. "GET https://www.soklet.com/index.html HTTP/1.1\r\n" would return "/index.html".
1035                String firstLine = null;
1036
1037                int crLfIndex = rawRequest.indexOf("\r\n");
1038
1039                if (crLfIndex != -1)
1040                        firstLine = rawRequest.substring(0, crLfIndex).trim();
1041
1042                // We don't even have a complete first line of the request
1043                if (firstLine == null || firstLine.length() == 0)
1044                        return Optional.empty();
1045
1046                String[] parts = firstLine.split(" ");
1047
1048                // First line of the request is malformed
1049                if (parts.length < 2)
1050                        return Optional.empty();
1051
1052                String rawHttpMethod = parts[0];
1053
1054                if (rawHttpMethod != null)
1055                        rawHttpMethod = rawHttpMethod.trim().toUpperCase(Locale.ENGLISH);
1056
1057                HttpMethod httpMethod = null;
1058
1059                try {
1060                        httpMethod = HttpMethod.valueOf(rawHttpMethod);
1061                } catch (IllegalArgumentException e) {
1062                        // Malformed HTTP method specified
1063                        return Optional.empty();
1064                }
1065
1066                String rawUri = parts[1];
1067
1068                // Validate URI
1069                if (rawUri != null) {
1070                        try {
1071                                new URI(rawUri.trim());
1072                        } catch (Exception e) {
1073                                // Malformed URI specified
1074                                return Optional.empty();
1075                        }
1076                }
1077
1078                // TODO: eventually would be nice to parse headers as best we can.  For now, we just parse the first request line
1079                return Optional.of(Request.with(httpMethod, rawUri)
1080                                .id(requestIdentifier)
1081                                .contentTooLarge(true)
1082                                .build());
1083        }
1084
1085        @NotThreadSafe
1086        protected static class RequestTooLargeIOException extends IOException {
1087                @Nonnull
1088                private final Request tooLargeRequest;
1089
1090                public RequestTooLargeIOException(@Nullable String message,
1091                                                                                                                                                        @Nonnull Request tooLargeRequest) {
1092                        super(message);
1093                        this.tooLargeRequest = requireNonNull(tooLargeRequest);
1094                }
1095
1096                @Nonnull
1097                public Request getTooLargeRequest() {
1098                        return this.tooLargeRequest;
1099                }
1100        }
1101
1102        @Override
1103        public void stop() {
1104                getLock().lock();
1105
1106                boolean interrupted = false;
1107                this.stopping = false;
1108
1109                try {
1110                        if (!isStarted())
1111                                return;
1112
1113                        this.stopping = true;
1114
1115                        try {
1116                                this.connectionValidityExecutorService.shutdown();
1117                                this.connectionValidityExecutorService.awaitTermination(getShutdownTimeout().getSeconds(), TimeUnit.SECONDS);
1118                        } catch (InterruptedException e) {
1119                                interrupted = true;
1120                        } catch (Exception e) {
1121                                safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to shut down Server-Sent Event connection validity checker")
1122                                                .throwable(e)
1123                                                .build());
1124                        }
1125
1126                        getStopPoisonPill().set(true);
1127
1128                        // TODO: need an additional check/lock to prevent race condition where someone acquires an event source while we are shutting down
1129                        for (DefaultServerSentEventBroadcaster broadcaster : getBroadcastersByResourcePath().values()) {
1130                                try {
1131                                        broadcaster.unregisterAllServerSentEventConnections(true);
1132                                } catch (Exception e) {
1133                                        safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to shut down open Server-Sent Event connections")
1134                                                        .throwable(e)
1135                                                        .build());
1136                                }
1137                        }
1138
1139                        // Clear global connections map for sanity (though it should be empty by this point)
1140                        getGlobalConnections().clear();
1141
1142                        try {
1143                                getRequestHandlerExecutorService().get().shutdown();
1144                                getRequestHandlerExecutorService().get().awaitTermination(getShutdownTimeout().getSeconds(), TimeUnit.SECONDS);
1145                        } catch (InterruptedException e) {
1146                                interrupted = true;
1147                        } catch (Exception e) {
1148                                safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to shut down Server-Sent Event request handler")
1149                                                .throwable(e)
1150                                                .build());
1151                        }
1152
1153                        try {
1154                                getRequestReaderExecutorService().get().shutdown();
1155                                getRequestReaderExecutorService().get().awaitTermination(getShutdownTimeout().getSeconds(), TimeUnit.SECONDS);
1156                        } catch (InterruptedException e) {
1157                                interrupted = true;
1158                        } catch (Exception e) {
1159                                safelyLog(LogEvent.with(LogEventType.SERVER_SENT_EVENT_SERVER_INTERNAL_ERROR, "Unable to shut down Server-Sent Event request reader")
1160                                                .throwable(e)
1161                                                .build());
1162                        }
1163                } finally {
1164                        this.started = false;
1165                        this.eventLoopThread = null;
1166                        this.requestHandlerExecutorService = null;
1167                        this.requestReaderExecutorService = null;
1168                        this.connectionValidityExecutorService = null;
1169                        this.getBroadcastersByResourcePath().clear();
1170                        this.getResourcePathDeclarationsByResourcePathCache().clear();
1171                        getStopPoisonPill().set(false);
1172
1173                        if (interrupted)
1174                                Thread.currentThread().interrupt();
1175
1176                        getLock().unlock();
1177                }
1178        }
1179
1180        @Nonnull
1181        @Override
1182        public Boolean isStarted() {
1183                getLock().lock();
1184
1185                try {
1186                        return this.started;
1187                } finally {
1188                        getLock().unlock();
1189                }
1190        }
1191
1192        @Nonnull
1193        protected Boolean isStopping() {
1194                getLock().lock();
1195
1196                try {
1197                        return this.stopping;
1198                } finally {
1199                        getLock().unlock();
1200                }
1201        }
1202
1203        @Nonnull
1204        @Override
1205        public Optional<? extends ServerSentEventBroadcaster> acquireBroadcaster(@Nullable ResourcePath resourcePath) {
1206                if (resourcePath == null)
1207                        return Optional.empty();
1208
1209                return acquireBroadcasterInternal(resourcePath);
1210        }
1211
1212        @Nonnull
1213        protected Optional<DefaultServerSentEventBroadcaster> acquireBroadcasterInternal(@Nullable ResourcePath resourcePath) {
1214                if (resourcePath == null)
1215                        return Optional.empty();
1216
1217                ResourcePathDeclaration resourcePathDeclaration = matchingResourcePath(resourcePath).orElse(null);
1218
1219                if (resourcePathDeclaration == null)
1220                        return Optional.empty();
1221
1222                ResourceMethod resourceMethod = getResourceMethodsByResourcePathDeclaration().get(resourcePathDeclaration);
1223
1224                // TODO: should this be sent as a LogEvent?
1225                if (resourceMethod == null)
1226                        throw new IllegalStateException(format("Internal error: unable to find %s instance that matches %s", ResourceMethod.class, resourcePathDeclaration));
1227
1228                // Create the event source if it does not already exist
1229                DefaultServerSentEventBroadcaster broadcaster = getBroadcastersByResourcePath()
1230                                .computeIfAbsent(resourcePath, (ignored) -> new DefaultServerSentEventBroadcaster(resourceMethod, resourcePath, (serverSentEventConnection -> {
1231                                        // When the broadcaster unregisters a connection it manages, remove it from the global set of connections as well
1232                                        getGlobalConnections().remove(serverSentEventConnection);
1233                                })));
1234
1235                return Optional.of(broadcaster);
1236        }
1237
1238        @Nonnull
1239        protected Optional<ResourcePathDeclaration> matchingResourcePath(@Nullable ResourcePath resourcePath) {
1240                if (resourcePath == null)
1241                        return Optional.empty();
1242
1243                // TODO: convert to computeIfAbsent()
1244
1245                // Try a cache lookup first
1246                ResourcePathDeclaration resourcePathDeclaration = getResourcePathDeclarationsByResourcePathCache().get(resourcePath);
1247
1248                if (resourcePathDeclaration == null) {
1249                        // If the cache lookup fails, perform a manual lookup
1250                        for (ResourcePathDeclaration registeredResourcePathDeclaration : getResourceMethodsByResourcePathDeclaration().keySet()) {
1251                                if (registeredResourcePathDeclaration.matches(resourcePath)) {
1252                                        resourcePathDeclaration = registeredResourcePathDeclaration;
1253                                        break;
1254                                }
1255                        }
1256
1257                        // Put the value in the cache for quick access later
1258                        if (resourcePathDeclaration != null)
1259                                getResourcePathDeclarationsByResourcePathCache().put(resourcePath, resourcePathDeclaration);
1260                }
1261
1262                return Optional.ofNullable(resourcePathDeclaration);
1263        }
1264
1265        protected void safelyLog(@Nonnull LogEvent logEvent) {
1266                requireNonNull(logEvent);
1267
1268                try {
1269                        getLifecycleInterceptor().get().didReceiveLogEvent(logEvent);
1270                } catch (Throwable throwable) {
1271                        // The LifecycleInterceptor implementation errored out, but we can't let that affect us - swallow its exception.
1272                        // Not much else we can do here but dump to stderr
1273                        throwable.printStackTrace();
1274                }
1275        }
1276
1277        @Nonnull
1278        protected Integer getPort() {
1279                return this.port;
1280        }
1281
1282        @Nonnull
1283        protected String getHost() {
1284                return this.host;
1285        }
1286
1287        @Nonnull
1288        protected Duration getRequestTimeout() {
1289                return this.requestTimeout;
1290        }
1291
1292        @Nonnull
1293        protected Duration getShutdownTimeout() {
1294                return this.shutdownTimeout;
1295        }
1296
1297        @Nonnull
1298        protected Integer getMaximumRequestSizeInBytes() {
1299                return this.maximumRequestSizeInBytes;
1300        }
1301
1302        @Nonnull
1303        protected Integer getRequestReadBufferSizeInBytes() {
1304                return this.requestReadBufferSizeInBytes;
1305        }
1306
1307        @Nonnull
1308        public Map<ResourcePathDeclaration, ResourceMethod> getResourceMethodsByResourcePathDeclaration() {
1309                return this.resourceMethodsByResourcePathDeclaration;
1310        }
1311
1312        @Nonnull
1313        protected ConcurrentHashMap<ResourcePath, DefaultServerSentEventBroadcaster> getBroadcastersByResourcePath() {
1314                return this.broadcastersByResourcePath;
1315        }
1316
1317        @Nonnull
1318        protected ConcurrentHashMap<ResourcePath, ResourcePathDeclaration> getResourcePathDeclarationsByResourcePathCache() {
1319                return this.resourcePathDeclarationsByResourcePathCache;
1320        }
1321
1322        @Nonnull
1323        protected Optional<ExecutorService> getRequestHandlerExecutorService() {
1324                return Optional.ofNullable(this.requestHandlerExecutorService);
1325        }
1326
1327        @Nonnull
1328        protected Optional<ExecutorService> getRequestReaderExecutorService() {
1329                return Optional.ofNullable(this.requestReaderExecutorService);
1330        }
1331
1332        @Nonnull
1333        protected ReentrantLock getLock() {
1334                return this.lock;
1335        }
1336
1337        @Nonnull
1338        protected Supplier<ExecutorService> getRequestHandlerExecutorServiceSupplier() {
1339                return this.requestHandlerExecutorServiceSupplier;
1340        }
1341
1342        @Nonnull
1343        protected Supplier<ExecutorService> getRequestReaderExecutorServiceSupplier() {
1344                return this.requestReaderExecutorServiceSupplier;
1345        }
1346
1347        @Nonnull
1348        protected Integer getConcurrentConnectionLimit() {
1349                return this.concurrentConnectionLimit;
1350        }
1351
1352        @Nonnull
1353        protected ConcurrentLruMap<ServerSentEventConnection, DefaultServerSentEventBroadcaster> getGlobalConnections() {
1354                return this.globalConnections;
1355        }
1356
1357        @Nullable
1358        protected ScheduledExecutorService getConnectionValidityExecutorService() {
1359                return this.connectionValidityExecutorService;
1360        }
1361
1362        @Nonnull
1363        protected AtomicBoolean getStopPoisonPill() {
1364                return this.stopPoisonPill;
1365        }
1366
1367        @Nonnull
1368        protected Optional<Thread> getEventLoopThread() {
1369                return Optional.ofNullable(this.eventLoopThread);
1370        }
1371
1372        @Nonnull
1373        protected Optional<RequestHandler> getRequestHandler() {
1374                return Optional.ofNullable(this.requestHandler);
1375        }
1376
1377        @Nonnull
1378        protected Optional<LifecycleInterceptor> getLifecycleInterceptor() {
1379                return Optional.ofNullable(this.lifecycleInterceptor);
1380        }
1381
1382        /**
1383         * Builder used to construct instances of {@link DefaultServerSentEventServer}.
1384         * <p>
1385         * This class is intended for use by a single thread.
1386         *
1387         * @author <a href="https://www.revetkn.com">Mark Allen</a>
1388         */
1389        @NotThreadSafe
1390        public static class Builder {
1391                @Nonnull
1392                private Integer port;
1393                @Nullable
1394                private String host;
1395                @Nullable
1396                private Duration requestTimeout;
1397                @Nullable
1398                private Duration shutdownTimeout;
1399                @Nullable
1400                private Integer maximumRequestSizeInBytes;
1401                @Nullable
1402                private Integer requestReadBufferSizeInBytes;
1403                @Nullable
1404                private Supplier<ExecutorService> requestHandlerExecutorServiceSupplier;
1405                @Nullable
1406                private Integer concurrentConnectionLimit;
1407
1408                @Nonnull
1409                protected Builder(@Nonnull Integer port) {
1410                        requireNonNull(port);
1411                        this.port = port;
1412                }
1413
1414                @Nonnull
1415                public Builder port(@Nonnull Integer port) {
1416                        requireNonNull(port);
1417                        this.port = port;
1418                        return this;
1419                }
1420
1421                @Nonnull
1422                public Builder host(@Nullable String host) {
1423                        this.host = host;
1424                        return this;
1425                }
1426
1427                @Nonnull
1428                public Builder requestTimeout(@Nullable Duration requestTimeout) {
1429                        this.requestTimeout = requestTimeout;
1430                        return this;
1431                }
1432
1433                @Nonnull
1434                public Builder shutdownTimeout(@Nullable Duration shutdownTimeout) {
1435                        this.shutdownTimeout = shutdownTimeout;
1436                        return this;
1437                }
1438
1439                @Nonnull
1440                public Builder maximumRequestSizeInBytes(@Nullable Integer maximumRequestSizeInBytes) {
1441                        this.maximumRequestSizeInBytes = maximumRequestSizeInBytes;
1442                        return this;
1443                }
1444
1445                @Nonnull
1446                public Builder requestReadBufferSizeInBytes(@Nullable Integer requestReadBufferSizeInBytes) {
1447                        this.requestReadBufferSizeInBytes = requestReadBufferSizeInBytes;
1448                        return this;
1449                }
1450
1451                @Nonnull
1452                public Builder concurrentConnectionLimit(@Nullable Integer concurrentConnectionLimit) {
1453                        this.concurrentConnectionLimit = concurrentConnectionLimit;
1454                        return this;
1455                }
1456
1457                @Nonnull
1458                public Builder requestHandlerExecutorServiceSupplier(@Nullable Supplier<ExecutorService> requestHandlerExecutorServiceSupplier) {
1459                        this.requestHandlerExecutorServiceSupplier = requestHandlerExecutorServiceSupplier;
1460                        return this;
1461                }
1462
1463                @Nonnull
1464                public DefaultServerSentEventServer build() {
1465                        return new DefaultServerSentEventServer(this);
1466                }
1467        }
1468}