001/*
002 * Copyright 2022-2026 Revetware LLC.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.soklet;
018
019import com.soklet.Soklet.DefaultSimulator;
020import com.soklet.Soklet.MockSseUnicaster;
021import org.jspecify.annotations.NonNull;
022import org.jspecify.annotations.Nullable;
023
024import javax.annotation.concurrent.ThreadSafe;
025import java.util.List;
026import java.util.Objects;
027import java.util.Optional;
028import java.util.concurrent.CopyOnWriteArrayList;
029import java.util.concurrent.atomic.AtomicReference;
030import java.util.concurrent.locks.ReentrantLock;
031import java.util.function.Consumer;
032
033import static java.lang.String.format;
034import static java.util.Objects.requireNonNull;
035
036/**
037 * Sealed interface used by {@link Simulator#performSseRequest(Request)} during integration tests, which encapsulates the 3 logical outcomes for SSE connections: accepted handshake, rejected handshake, and general request failure.
038 * <p>
039 * See <a href="https://www.soklet.com/docs/testing#integration-testing">https://www.soklet.com/docs/testing#integration-testing</a> for detailed documentation.
040 *
041 * @author <a href="https://www.revetkn.com">Mark Allen</a>
042 */
043public sealed interface SseRequestResult permits SseRequestResult.HandshakeAccepted, SseRequestResult.HandshakeRejected, SseRequestResult.RequestFailed {
044        /**
045         * Represents the result of an SSE accepted handshake (connection stays open) when simulated by {@link Simulator#performSseRequest(Request)}.
046         * <p>
047         * The {@link #registerEventConsumer(Consumer)} and {@link #registerCommentConsumer(Consumer)} methods can be used to "listen" for Server-Sent Events and Comments, respectively.
048         * <p>
049         * The data provided when the handshake was accepted is available via {@link #getSseHandshakeResult()}, and the final data sent to the client is available via {@link #getHttpRequestResult()}.
050         */
051        @ThreadSafe
052        final class HandshakeAccepted implements SseRequestResult {
053                private final SseHandshakeResult.@NonNull Accepted sseHandshakeResult;
054                @NonNull
055                private final ResourcePath resourcePath;
056                @NonNull
057                private final HttpRequestResult requestResult;
058                @NonNull
059                private final DefaultSimulator simulator;
060                @NonNull
061                private final AtomicReference<@Nullable Consumer<Throwable>> unicastErrorHandler;
062                @NonNull
063                private List<@NonNull SseEvent> clientInitializerEvents;
064                @NonNull
065                private List<@NonNull SseComment> clientInitializerComments;
066                @NonNull
067                private final ReentrantLock lock;
068                @Nullable
069                private Consumer<SseEvent> eventConsumer;
070                @Nullable
071                private Consumer<SseComment> commentConsumer;
072
073                HandshakeAccepted(SseHandshakeResult.@NonNull Accepted sseHandshakeResult,
074                                                                                        @NonNull ResourcePath resourcePath,
075                                                                                        @NonNull HttpRequestResult requestResult,
076                                                                                        @NonNull DefaultSimulator simulator,
077                                                                                        @Nullable Consumer<SseUnicaster> clientInitializer) {
078                        requireNonNull(sseHandshakeResult);
079                        requireNonNull(resourcePath);
080                        requireNonNull(requestResult);
081                        requireNonNull(simulator);
082
083                        this.sseHandshakeResult = sseHandshakeResult;
084                        this.resourcePath = resourcePath;
085                        this.requestResult = requestResult;
086                        this.simulator = simulator;
087                        this.unicastErrorHandler = simulator.getSseServer()
088                                        .map(sseServer -> sseServer.getUnicastErrorHandler())
089                                        .orElseGet(AtomicReference::new);
090                        this.eventConsumer = null;
091                        this.commentConsumer = null;
092                        this.lock = new ReentrantLock();
093
094                        this.clientInitializerEvents = new CopyOnWriteArrayList<>();
095                        this.clientInitializerComments = new CopyOnWriteArrayList<>();
096
097                        if (clientInitializer != null) {
098                                clientInitializer.accept(new MockSseUnicaster(
099                                                getResourcePath(),
100                                                (sseEvent) -> {
101                                                        requireNonNull(sseEvent);
102
103                                                        // If we don't have an event consumer registered, collect the events in a list to be fired off once the consumer is registered.
104                                                        // If we do have the event consumer registered, send immediately
105                                                        Consumer<SseEvent> eventConsumer = getEventConsumer().orElse(null);
106
107                                                        if (eventConsumer == null)
108                                                                clientInitializerEvents.add(sseEvent);
109                                                        else {
110                                                                try {
111                                                                        eventConsumer.accept(sseEvent);
112                                                                } catch (Throwable throwable) {
113                                                                        handleUnicastError(throwable);
114                                                                }
115                                                        }
116                                                },
117                                                (sseComment) -> {
118                                                        requireNonNull(sseComment);
119
120                                                        // If we don't have an event consumer registered, collect the events in a list to be fired off once the consumer is registered.
121                                                        // If we do have the event consumer registered, send immediately
122                                                        Consumer<SseComment> commentConsumer = getCommentConsumer().orElse(null);
123
124                                                        if (commentConsumer == null)
125                                                                clientInitializerComments.add(sseComment);
126                                                        else {
127                                                                try {
128                                                                        commentConsumer.accept(sseComment);
129                                                                } catch (Throwable throwable) {
130                                                                        handleUnicastError(throwable);
131                                                                }
132                                                        }
133                                                },
134                                                getUnicastErrorHandler(),
135                                                this::safelyLog)
136                                );
137                        }
138                }
139
140                /**
141                 * Registers a {@link SseEvent} "consumer" for this connection - similar to how a real client would listen for Server-Sent Events.
142                 * <p>
143                 * Each connection may have at most 1 event consumer.
144                 * <p>
145                 * See documentation at <a href="https://www.soklet.com/docs/testing#server-sent-events">https://www.soklet.com/docs/testing#server-sent-events</a>.
146                 *
147                 * @param eventConsumer function to be invoked when a Server-Sent Event has been unicast/broadcast on the Resource Path
148                 * @throws IllegalStateException if you attempt to register more than 1 event consumer
149                 */
150                public void registerEventConsumer(@NonNull Consumer<SseEvent> eventConsumer) {
151                        requireNonNull(eventConsumer);
152
153                        getLock().lock();
154
155                        try {
156                                if (getEventConsumer().isPresent())
157                                        throw new IllegalStateException(format("You cannot specify more than one event consumer for the same %s", HandshakeAccepted.class.getSimpleName()));
158
159                                this.eventConsumer = eventConsumer;
160
161                                // Send client initializer unicast events immediately, before any broadcasts can make it through
162                                for (SseEvent event : getClientInitializerEvents()) {
163                                        try {
164                                                eventConsumer.accept(event);
165                                        } catch (Throwable throwable) {
166                                                handleUnicastError(throwable);
167                                        }
168                                }
169
170                                // Register with the mock SSE server broadcaster, preserving client context
171                                Object clientContext = getSseHandshakeResult().getClientContext().orElse(null);
172                                getSimulator().getSseServer().get().registerEventConsumer(getResourcePath(), eventConsumer, clientContext);
173                        } finally {
174                                getLock().unlock();
175                        }
176                }
177
178                /**
179                 * Registers a Server-Sent comment "consumer" for this connection - similar to how a real client would listen for Server-Sent comment payloads.
180                 * <p>
181                 * Each connection may have at most 1 comment consumer.
182                 * <p>
183                 * See documentation at <a href="https://www.soklet.com/docs/testing#server-sent-events">https://www.soklet.com/docs/testing#server-sent-events</a>.
184                 *
185                 * @param commentConsumer function to be invoked when a Server-Sent comment has been unicast/broadcast on the Resource Path
186                 * @throws IllegalStateException if you attempt to register more than 1 comment consumer
187                 */
188                public void registerCommentConsumer(@NonNull Consumer<SseComment> commentConsumer) {
189                        requireNonNull(commentConsumer);
190
191                        getLock().lock();
192
193                        try {
194                                if (getCommentConsumer().isPresent())
195                                        throw new IllegalStateException(format("You cannot specify more than one comment consumer for the same %s", HandshakeAccepted.class.getSimpleName()));
196
197                                this.commentConsumer = commentConsumer;
198
199                                // Send client initializer unicast comments immediately, before any broadcasts can make it through
200                                for (SseComment comment : getClientInitializerComments()) {
201                                        try {
202                                                commentConsumer.accept(comment);
203                                        } catch (Throwable throwable) {
204                                                handleUnicastError(throwable);
205                                        }
206                                }
207
208                                // Register with the mock SSE server broadcaster, preserving client context
209                                Object clientContext = getSseHandshakeResult().getClientContext().orElse(null);
210                                getSimulator().getSseServer().get().registerCommentConsumer(getResourcePath(), commentConsumer, clientContext);
211                        } finally {
212                                getLock().unlock();
213                        }
214                }
215
216                void unregisterConsumers() {
217                        getLock().lock();
218
219                        try {
220                                getEventConsumer().ifPresent((eventConsumer ->
221                                                getSimulator().getSseServer().get().unregisterEventConsumer(getResourcePath(), eventConsumer)));
222
223                                getCommentConsumer().ifPresent((commentConsumer ->
224                                                getSimulator().getSseServer().get().unregisterCommentConsumer(getResourcePath(), commentConsumer)));
225                        } finally {
226                                getLock().unlock();
227                        }
228                }
229
230                /**
231                 * Gets the data provided when the handshake was accepted by the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em>.
232                 *
233                 * @return the data provided when the handshake was accepted
234                 */
235                public SseHandshakeResult.@NonNull Accepted getSseHandshakeResult() {
236                        return this.sseHandshakeResult;
237                }
238
239                @Override
240                public String toString() {
241                        return format("%s{sseHandshakeResult=%s}", HandshakeAccepted.class.getSimpleName(), getSseHandshakeResult());
242                }
243
244                /**
245                 * The initial result of the handshake, as written back to the client (note that the connection remains open).
246                 * <p>
247                 * Useful for examining headers/cookies written via {@link HttpRequestResult#getMarshaledResponse()}.
248                 *
249                 * @return the result of this request
250                 */
251                @NonNull
252                public HttpRequestResult getHttpRequestResult() {
253                        return this.requestResult;
254                }
255
256                @NonNull
257                private ResourcePath getResourcePath() {
258                        return this.resourcePath;
259                }
260
261                @NonNull
262                private DefaultSimulator getSimulator() {
263                        return this.simulator;
264                }
265
266                @NonNull
267                private AtomicReference<@Nullable Consumer<Throwable>> getUnicastErrorHandler() {
268                        return this.unicastErrorHandler;
269                }
270
271                private void handleUnicastError(@NonNull Throwable throwable) {
272                        requireNonNull(throwable);
273                        Consumer<Throwable> handler = getUnicastErrorHandler().get();
274
275                        if (handler != null) {
276                                try {
277                                        handler.accept(throwable);
278                                        return;
279                                } catch (Throwable ignored) {
280                                        // Fall through to default behavior
281                                }
282                        }
283
284                        safelyLog(LogEvent.with(LogEventType.SSE_SERVER_INTERNAL_ERROR,
285                                                        "SSE simulator unicast consumer failed")
286                                        .throwable(throwable)
287                                        .build());
288                }
289
290                private void safelyLog(@NonNull LogEvent logEvent) {
291                        requireNonNull(logEvent);
292
293                        getSimulator().getSseServer().ifPresent(sseServer -> sseServer.safelyLog(logEvent));
294                }
295
296                @NonNull
297                private List<@NonNull SseEvent> getClientInitializerEvents() {
298                        return this.clientInitializerEvents;
299                }
300
301                @NonNull
302                private List<@NonNull SseComment> getClientInitializerComments() {
303                        return this.clientInitializerComments;
304                }
305
306                @NonNull
307                private Optional<Consumer<SseEvent>> getEventConsumer() {
308                        return Optional.ofNullable(this.eventConsumer);
309                }
310
311                @NonNull
312                private Optional<Consumer<SseComment>> getCommentConsumer() {
313                        return Optional.ofNullable(this.commentConsumer);
314                }
315
316                @NonNull
317                private ReentrantLock getLock() {
318                        return this.lock;
319                }
320        }
321
322        /**
323         * Represents the result of an SSE rejected handshake (explicit rejection; connection closed) when simulated by {@link Simulator#performSseRequest(Request)}.
324         * <p>
325         * The data provided when the handshake was rejected is available via {@link #getSseHandshakeResult()}, and the final data sent to the client is available via {@link #getHttpRequestResult()}.
326         */
327        @ThreadSafe
328        final class HandshakeRejected implements SseRequestResult {
329                private final SseHandshakeResult.@NonNull Rejected sseHandshakeResult;
330                @NonNull
331                private final HttpRequestResult requestResult;
332
333                HandshakeRejected(SseHandshakeResult.@NonNull Rejected sseHandshakeResult,
334                                                                                        @NonNull HttpRequestResult requestResult) {
335                        requireNonNull(sseHandshakeResult);
336                        requireNonNull(requestResult);
337
338                        this.sseHandshakeResult = sseHandshakeResult;
339                        this.requestResult = requestResult;
340                }
341
342                /**
343                 * Gets the data provided when the handshake was explicitly rejected by the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em>.
344                 *
345                 * @return the data provided when the handshake was rejected
346                 */
347                public SseHandshakeResult.@NonNull Rejected getSseHandshakeResult() {
348                        return this.sseHandshakeResult;
349                }
350
351                /**
352                 * The result of the handshake, as written back to the client (the connection is then closed).
353                 *
354                 * @return the result of this request
355                 */
356                @NonNull
357                public HttpRequestResult getHttpRequestResult() {
358                        return this.requestResult;
359                }
360
361                @Override
362                public String toString() {
363                        return format("%s{sseHandshakeResult=%s, requestResult=%s}", HandshakeRejected.class.getSimpleName(), getSseHandshakeResult(), getHttpRequestResult());
364                }
365
366                @Override
367                public boolean equals(@Nullable Object object) {
368                        if (this == object)
369                                return true;
370
371                        if (!(object instanceof HandshakeRejected handshakeRejected))
372                                return false;
373
374                        return Objects.equals(getSseHandshakeResult(), handshakeRejected.getSseHandshakeResult())
375                                        && Objects.equals(getHttpRequestResult(), handshakeRejected.getHttpRequestResult());
376                }
377
378                @Override
379                public int hashCode() {
380                        return Objects.hash(getSseHandshakeResult(), getHttpRequestResult());
381                }
382        }
383
384        /**
385         * Represents the result of an SSE request failure (implicit rejection, e.g. an exception occurred; connection closed) when simulated by {@link Simulator#performSseRequest(Request)}.
386         * <p>
387         * The final data sent to the client is available via {@link #getHttpRequestResult()}.
388         */
389        @ThreadSafe
390        final class RequestFailed implements SseRequestResult {
391                @NonNull
392                private final HttpRequestResult requestResult;
393
394                RequestFailed(@NonNull HttpRequestResult requestResult) {
395                        requireNonNull(requestResult);
396                        this.requestResult = requestResult;
397                }
398
399                /**
400                 * The result of the handshake, as written back to the client (the connection is then closed).
401                 *
402                 * @return the result of this request
403                 */
404                @NonNull
405                public HttpRequestResult getHttpRequestResult() {
406                        return this.requestResult;
407                }
408
409                @Override
410                public String toString() {
411                        return format("%s{requestResult=%s}", RequestFailed.class.getSimpleName(), getHttpRequestResult());
412                }
413
414                @Override
415                public boolean equals(@Nullable Object object) {
416                        if (this == object)
417                                return true;
418
419                        if (!(object instanceof RequestFailed requestFailed))
420                                return false;
421
422                        return Objects.equals(getHttpRequestResult(), requestFailed.getHttpRequestResult());
423                }
424
425                @Override
426                public int hashCode() {
427                        return Objects.hash(getHttpRequestResult());
428                }
429        }
430}