001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet; 018 019import com.soklet.Soklet.DefaultSimulator; 020import com.soklet.Soklet.MockSseUnicaster; 021import org.jspecify.annotations.NonNull; 022import org.jspecify.annotations.Nullable; 023 024import javax.annotation.concurrent.ThreadSafe; 025import java.util.List; 026import java.util.Objects; 027import java.util.Optional; 028import java.util.concurrent.CopyOnWriteArrayList; 029import java.util.concurrent.atomic.AtomicReference; 030import java.util.concurrent.locks.ReentrantLock; 031import java.util.function.Consumer; 032 033import static java.lang.String.format; 034import static java.util.Objects.requireNonNull; 035 036/** 037 * Sealed interface used by {@link Simulator#performSseRequest(Request)} during integration tests, which encapsulates the 3 logical outcomes for SSE connections: accepted handshake, rejected handshake, and general request failure. 038 * <p> 039 * See <a href="https://www.soklet.com/docs/testing#integration-testing">https://www.soklet.com/docs/testing#integration-testing</a> for detailed documentation. 040 * 041 * @author <a href="https://www.revetkn.com">Mark Allen</a> 042 */ 043public sealed interface SseRequestResult permits SseRequestResult.HandshakeAccepted, SseRequestResult.HandshakeRejected, SseRequestResult.RequestFailed { 044 /** 045 * Represents the result of an SSE accepted handshake (connection stays open) when simulated by {@link Simulator#performSseRequest(Request)}. 046 * <p> 047 * The {@link #registerEventConsumer(Consumer)} and {@link #registerCommentConsumer(Consumer)} methods can be used to "listen" for Server-Sent Events and Comments, respectively. 048 * <p> 049 * The data provided when the handshake was accepted is available via {@link #getSseHandshakeResult()}, and the final data sent to the client is available via {@link #getHttpRequestResult()}. 050 */ 051 @ThreadSafe 052 final class HandshakeAccepted implements SseRequestResult { 053 private final SseHandshakeResult.@NonNull Accepted sseHandshakeResult; 054 @NonNull 055 private final ResourcePath resourcePath; 056 @NonNull 057 private final HttpRequestResult requestResult; 058 @NonNull 059 private final DefaultSimulator simulator; 060 @NonNull 061 private final AtomicReference<@Nullable Consumer<Throwable>> unicastErrorHandler; 062 @NonNull 063 private List<@NonNull SseEvent> clientInitializerEvents; 064 @NonNull 065 private List<@NonNull SseComment> clientInitializerComments; 066 @NonNull 067 private final ReentrantLock lock; 068 @Nullable 069 private Consumer<SseEvent> eventConsumer; 070 @Nullable 071 private Consumer<SseComment> commentConsumer; 072 073 HandshakeAccepted(SseHandshakeResult.@NonNull Accepted sseHandshakeResult, 074 @NonNull ResourcePath resourcePath, 075 @NonNull HttpRequestResult requestResult, 076 @NonNull DefaultSimulator simulator, 077 @Nullable Consumer<SseUnicaster> clientInitializer) { 078 requireNonNull(sseHandshakeResult); 079 requireNonNull(resourcePath); 080 requireNonNull(requestResult); 081 requireNonNull(simulator); 082 083 this.sseHandshakeResult = sseHandshakeResult; 084 this.resourcePath = resourcePath; 085 this.requestResult = requestResult; 086 this.simulator = simulator; 087 this.unicastErrorHandler = simulator.getSseServer() 088 .map(sseServer -> sseServer.getUnicastErrorHandler()) 089 .orElseGet(AtomicReference::new); 090 this.eventConsumer = null; 091 this.commentConsumer = null; 092 this.lock = new ReentrantLock(); 093 094 this.clientInitializerEvents = new CopyOnWriteArrayList<>(); 095 this.clientInitializerComments = new CopyOnWriteArrayList<>(); 096 097 if (clientInitializer != null) { 098 clientInitializer.accept(new MockSseUnicaster( 099 getResourcePath(), 100 (sseEvent) -> { 101 requireNonNull(sseEvent); 102 103 // If we don't have an event consumer registered, collect the events in a list to be fired off once the consumer is registered. 104 // If we do have the event consumer registered, send immediately 105 Consumer<SseEvent> eventConsumer = getEventConsumer().orElse(null); 106 107 if (eventConsumer == null) 108 clientInitializerEvents.add(sseEvent); 109 else { 110 try { 111 eventConsumer.accept(sseEvent); 112 } catch (Throwable throwable) { 113 handleUnicastError(throwable); 114 } 115 } 116 }, 117 (sseComment) -> { 118 requireNonNull(sseComment); 119 120 // If we don't have an event consumer registered, collect the events in a list to be fired off once the consumer is registered. 121 // If we do have the event consumer registered, send immediately 122 Consumer<SseComment> commentConsumer = getCommentConsumer().orElse(null); 123 124 if (commentConsumer == null) 125 clientInitializerComments.add(sseComment); 126 else { 127 try { 128 commentConsumer.accept(sseComment); 129 } catch (Throwable throwable) { 130 handleUnicastError(throwable); 131 } 132 } 133 }, 134 getUnicastErrorHandler(), 135 this::safelyLog) 136 ); 137 } 138 } 139 140 /** 141 * Registers a {@link SseEvent} "consumer" for this connection - similar to how a real client would listen for Server-Sent Events. 142 * <p> 143 * Each connection may have at most 1 event consumer. 144 * <p> 145 * See documentation at <a href="https://www.soklet.com/docs/testing#server-sent-events">https://www.soklet.com/docs/testing#server-sent-events</a>. 146 * 147 * @param eventConsumer function to be invoked when a Server-Sent Event has been unicast/broadcast on the Resource Path 148 * @throws IllegalStateException if you attempt to register more than 1 event consumer 149 */ 150 public void registerEventConsumer(@NonNull Consumer<SseEvent> eventConsumer) { 151 requireNonNull(eventConsumer); 152 153 getLock().lock(); 154 155 try { 156 if (getEventConsumer().isPresent()) 157 throw new IllegalStateException(format("You cannot specify more than one event consumer for the same %s", HandshakeAccepted.class.getSimpleName())); 158 159 this.eventConsumer = eventConsumer; 160 161 // Send client initializer unicast events immediately, before any broadcasts can make it through 162 for (SseEvent event : getClientInitializerEvents()) { 163 try { 164 eventConsumer.accept(event); 165 } catch (Throwable throwable) { 166 handleUnicastError(throwable); 167 } 168 } 169 170 // Register with the mock SSE server broadcaster, preserving client context 171 Object clientContext = getSseHandshakeResult().getClientContext().orElse(null); 172 getSimulator().getSseServer().get().registerEventConsumer(getResourcePath(), eventConsumer, clientContext); 173 } finally { 174 getLock().unlock(); 175 } 176 } 177 178 /** 179 * Registers a Server-Sent comment "consumer" for this connection - similar to how a real client would listen for Server-Sent comment payloads. 180 * <p> 181 * Each connection may have at most 1 comment consumer. 182 * <p> 183 * See documentation at <a href="https://www.soklet.com/docs/testing#server-sent-events">https://www.soklet.com/docs/testing#server-sent-events</a>. 184 * 185 * @param commentConsumer function to be invoked when a Server-Sent comment has been unicast/broadcast on the Resource Path 186 * @throws IllegalStateException if you attempt to register more than 1 comment consumer 187 */ 188 public void registerCommentConsumer(@NonNull Consumer<SseComment> commentConsumer) { 189 requireNonNull(commentConsumer); 190 191 getLock().lock(); 192 193 try { 194 if (getCommentConsumer().isPresent()) 195 throw new IllegalStateException(format("You cannot specify more than one comment consumer for the same %s", HandshakeAccepted.class.getSimpleName())); 196 197 this.commentConsumer = commentConsumer; 198 199 // Send client initializer unicast comments immediately, before any broadcasts can make it through 200 for (SseComment comment : getClientInitializerComments()) { 201 try { 202 commentConsumer.accept(comment); 203 } catch (Throwable throwable) { 204 handleUnicastError(throwable); 205 } 206 } 207 208 // Register with the mock SSE server broadcaster, preserving client context 209 Object clientContext = getSseHandshakeResult().getClientContext().orElse(null); 210 getSimulator().getSseServer().get().registerCommentConsumer(getResourcePath(), commentConsumer, clientContext); 211 } finally { 212 getLock().unlock(); 213 } 214 } 215 216 void unregisterConsumers() { 217 getLock().lock(); 218 219 try { 220 getEventConsumer().ifPresent((eventConsumer -> 221 getSimulator().getSseServer().get().unregisterEventConsumer(getResourcePath(), eventConsumer))); 222 223 getCommentConsumer().ifPresent((commentConsumer -> 224 getSimulator().getSseServer().get().unregisterCommentConsumer(getResourcePath(), commentConsumer))); 225 } finally { 226 getLock().unlock(); 227 } 228 } 229 230 /** 231 * Gets the data provided when the handshake was accepted by the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em>. 232 * 233 * @return the data provided when the handshake was accepted 234 */ 235 public SseHandshakeResult.@NonNull Accepted getSseHandshakeResult() { 236 return this.sseHandshakeResult; 237 } 238 239 @Override 240 public String toString() { 241 return format("%s{sseHandshakeResult=%s}", HandshakeAccepted.class.getSimpleName(), getSseHandshakeResult()); 242 } 243 244 /** 245 * The initial result of the handshake, as written back to the client (note that the connection remains open). 246 * <p> 247 * Useful for examining headers/cookies written via {@link HttpRequestResult#getMarshaledResponse()}. 248 * 249 * @return the result of this request 250 */ 251 @NonNull 252 public HttpRequestResult getHttpRequestResult() { 253 return this.requestResult; 254 } 255 256 @NonNull 257 private ResourcePath getResourcePath() { 258 return this.resourcePath; 259 } 260 261 @NonNull 262 private DefaultSimulator getSimulator() { 263 return this.simulator; 264 } 265 266 @NonNull 267 private AtomicReference<@Nullable Consumer<Throwable>> getUnicastErrorHandler() { 268 return this.unicastErrorHandler; 269 } 270 271 private void handleUnicastError(@NonNull Throwable throwable) { 272 requireNonNull(throwable); 273 Consumer<Throwable> handler = getUnicastErrorHandler().get(); 274 275 if (handler != null) { 276 try { 277 handler.accept(throwable); 278 return; 279 } catch (Throwable ignored) { 280 // Fall through to default behavior 281 } 282 } 283 284 safelyLog(LogEvent.with(LogEventType.SSE_SERVER_INTERNAL_ERROR, 285 "SSE simulator unicast consumer failed") 286 .throwable(throwable) 287 .build()); 288 } 289 290 private void safelyLog(@NonNull LogEvent logEvent) { 291 requireNonNull(logEvent); 292 293 getSimulator().getSseServer().ifPresent(sseServer -> sseServer.safelyLog(logEvent)); 294 } 295 296 @NonNull 297 private List<@NonNull SseEvent> getClientInitializerEvents() { 298 return this.clientInitializerEvents; 299 } 300 301 @NonNull 302 private List<@NonNull SseComment> getClientInitializerComments() { 303 return this.clientInitializerComments; 304 } 305 306 @NonNull 307 private Optional<Consumer<SseEvent>> getEventConsumer() { 308 return Optional.ofNullable(this.eventConsumer); 309 } 310 311 @NonNull 312 private Optional<Consumer<SseComment>> getCommentConsumer() { 313 return Optional.ofNullable(this.commentConsumer); 314 } 315 316 @NonNull 317 private ReentrantLock getLock() { 318 return this.lock; 319 } 320 } 321 322 /** 323 * Represents the result of an SSE rejected handshake (explicit rejection; connection closed) when simulated by {@link Simulator#performSseRequest(Request)}. 324 * <p> 325 * The data provided when the handshake was rejected is available via {@link #getSseHandshakeResult()}, and the final data sent to the client is available via {@link #getHttpRequestResult()}. 326 */ 327 @ThreadSafe 328 final class HandshakeRejected implements SseRequestResult { 329 private final SseHandshakeResult.@NonNull Rejected sseHandshakeResult; 330 @NonNull 331 private final HttpRequestResult requestResult; 332 333 HandshakeRejected(SseHandshakeResult.@NonNull Rejected sseHandshakeResult, 334 @NonNull HttpRequestResult requestResult) { 335 requireNonNull(sseHandshakeResult); 336 requireNonNull(requestResult); 337 338 this.sseHandshakeResult = sseHandshakeResult; 339 this.requestResult = requestResult; 340 } 341 342 /** 343 * Gets the data provided when the handshake was explicitly rejected by the {@link com.soklet.annotation.SseEventSource}-annotated <em>Resource Method</em>. 344 * 345 * @return the data provided when the handshake was rejected 346 */ 347 public SseHandshakeResult.@NonNull Rejected getSseHandshakeResult() { 348 return this.sseHandshakeResult; 349 } 350 351 /** 352 * The result of the handshake, as written back to the client (the connection is then closed). 353 * 354 * @return the result of this request 355 */ 356 @NonNull 357 public HttpRequestResult getHttpRequestResult() { 358 return this.requestResult; 359 } 360 361 @Override 362 public String toString() { 363 return format("%s{sseHandshakeResult=%s, requestResult=%s}", HandshakeRejected.class.getSimpleName(), getSseHandshakeResult(), getHttpRequestResult()); 364 } 365 366 @Override 367 public boolean equals(@Nullable Object object) { 368 if (this == object) 369 return true; 370 371 if (!(object instanceof HandshakeRejected handshakeRejected)) 372 return false; 373 374 return Objects.equals(getSseHandshakeResult(), handshakeRejected.getSseHandshakeResult()) 375 && Objects.equals(getHttpRequestResult(), handshakeRejected.getHttpRequestResult()); 376 } 377 378 @Override 379 public int hashCode() { 380 return Objects.hash(getSseHandshakeResult(), getHttpRequestResult()); 381 } 382 } 383 384 /** 385 * Represents the result of an SSE request failure (implicit rejection, e.g. an exception occurred; connection closed) when simulated by {@link Simulator#performSseRequest(Request)}. 386 * <p> 387 * The final data sent to the client is available via {@link #getHttpRequestResult()}. 388 */ 389 @ThreadSafe 390 final class RequestFailed implements SseRequestResult { 391 @NonNull 392 private final HttpRequestResult requestResult; 393 394 RequestFailed(@NonNull HttpRequestResult requestResult) { 395 requireNonNull(requestResult); 396 this.requestResult = requestResult; 397 } 398 399 /** 400 * The result of the handshake, as written back to the client (the connection is then closed). 401 * 402 * @return the result of this request 403 */ 404 @NonNull 405 public HttpRequestResult getHttpRequestResult() { 406 return this.requestResult; 407 } 408 409 @Override 410 public String toString() { 411 return format("%s{requestResult=%s}", RequestFailed.class.getSimpleName(), getHttpRequestResult()); 412 } 413 414 @Override 415 public boolean equals(@Nullable Object object) { 416 if (this == object) 417 return true; 418 419 if (!(object instanceof RequestFailed requestFailed)) 420 return false; 421 422 return Objects.equals(getHttpRequestResult(), requestFailed.getHttpRequestResult()); 423 } 424 425 @Override 426 public int hashCode() { 427 return Objects.hash(getHttpRequestResult()); 428 } 429 } 430}