001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet; 018 019import org.jspecify.annotations.NonNull; 020import org.jspecify.annotations.Nullable; 021 022import javax.annotation.concurrent.NotThreadSafe; 023import javax.annotation.concurrent.ThreadSafe; 024import java.io.InputStream; 025import java.io.Reader; 026import java.nio.ByteBuffer; 027import java.nio.charset.Charset; 028import java.nio.charset.CharsetEncoder; 029import java.nio.charset.CodingErrorAction; 030import java.util.concurrent.Flow; 031import java.util.function.Supplier; 032 033import static java.util.Objects.requireNonNull; 034 035/** 036 * A streaming HTTP response body. 037 * <p> 038 * This type describes how response bytes are produced; it is not itself responsible for writing to a transport. 039 * Descriptors are immutable and thread-safe, but caller-supplied writers, publishers, readers, input streams, and 040 * suppliers are responsible for their own behavior. 041 * 042 * @author <a href="https://www.revetkn.com">Mark Allen</a> 043 */ 044@ThreadSafe 045public sealed interface StreamingResponseBody permits StreamingResponseBody.PublisherBody, StreamingResponseBody.InputStreamBody, StreamingResponseBody.ReaderBody, StreamingResponseBody.WriterBody { 046 @NonNull 047 Integer DEFAULT_INPUT_STREAM_BUFFER_SIZE_IN_BYTES = 1_024 * 16; 048 @NonNull 049 Integer DEFAULT_READER_BUFFER_SIZE_IN_CHARACTERS = 8_192; 050 @NonNull 051 CodingErrorAction DEFAULT_MALFORMED_INPUT_ACTION = CodingErrorAction.REPORT; 052 @NonNull 053 CodingErrorAction DEFAULT_UNMAPPABLE_CHARACTER_ACTION = CodingErrorAction.REPORT; 054 055 /** 056 * Creates a streaming response body backed by a writer callback. 057 * 058 * @param writer the callback that writes the response 059 * @return a streaming response body 060 */ 061 @NonNull 062 static StreamingResponseBody fromWriter(@NonNull StreamingResponseWriter writer) { 063 return new WriterBody(writer); 064 } 065 066 /** 067 * Creates a streaming response body backed by a {@link Flow.Publisher} of byte buffers. 068 * <p> 069 * Soklet requests one item at a time and writes each item through its bounded streaming queue. If the queue is 070 * full, the subscriber's {@code onNext} path may block until space is available. 071 * <p> 072 * If the response is canceled before the publisher terminates, Soklet cancels the publisher subscription. 073 * 074 * @param publisher the publisher that emits response bytes 075 * @return a streaming response body 076 */ 077 @NonNull 078 static StreamingResponseBody fromPublisher(java.util.concurrent.Flow.@NonNull Publisher<@NonNull ByteBuffer> publisher) { 079 return new PublisherBody(publisher); 080 } 081 082 /** 083 * Creates a streaming response body backed by an input stream supplier. 084 * <p> 085 * This adapter is intended for special cases where an existing streaming source is already exposed as an 086 * {@link InputStream}. Dynamic application streaming should usually use {@link #fromWriter(StreamingResponseWriter)} 087 * or {@link #fromPublisher(Flow.Publisher)}. 088 * <p> 089 * If the response is canceled while a read is blocked, Soklet closes the supplied input stream. 090 * 091 * @param inputStreamSupplier supplies the input stream to copy 092 * @return a streaming response body 093 */ 094 @NonNull 095 static StreamingResponseBody fromInputStream(@NonNull Supplier<? extends InputStream> inputStreamSupplier) { 096 return withInputStream(inputStreamSupplier).build(); 097 } 098 099 /** 100 * Acquires a builder for an input-stream-backed response body. 101 * 102 * @param inputStreamSupplier supplies the input stream to copy 103 * @return the builder 104 */ 105 @NonNull 106 static InputStreamBuilder withInputStream(@NonNull Supplier<? extends InputStream> inputStreamSupplier) { 107 return new InputStreamBuilder(inputStreamSupplier); 108 } 109 110 /** 111 * Creates a streaming response body backed by a reader supplier. 112 * <p> 113 * The charset is required. Encoding errors default to {@link CodingErrorAction#REPORT}; use 114 * {@link #withReader(Supplier, Charset)} to override the JDK encoder actions explicitly. 115 * If the response is canceled while a read is blocked, Soklet closes the supplied reader. 116 * 117 * @param readerSupplier supplies the reader to copy 118 * @param charset charset used to encode characters to response bytes 119 * @return a streaming response body 120 */ 121 @NonNull 122 static StreamingResponseBody fromReader(@NonNull Supplier<? extends Reader> readerSupplier, 123 @NonNull Charset charset) { 124 return withReader(readerSupplier, charset).build(); 125 } 126 127 /** 128 * Acquires a builder for a reader-backed response body. 129 * 130 * @param readerSupplier supplies the reader to copy 131 * @param charset charset used to encode characters to response bytes 132 * @return the builder 133 */ 134 @NonNull 135 static ReaderBuilder withReader(@NonNull Supplier<? extends Reader> readerSupplier, 136 @NonNull Charset charset) { 137 return new ReaderBuilder(readerSupplier, charset); 138 } 139 140 /** 141 * A streaming body backed by a {@link Flow.Publisher}. 142 */ 143 final class PublisherBody implements StreamingResponseBody { 144 private final java.util.concurrent.Flow.@NonNull Publisher<@NonNull ByteBuffer> publisher; 145 146 private PublisherBody(java.util.concurrent.Flow.@NonNull Publisher<@NonNull ByteBuffer> publisher) { 147 this.publisher = requireNonNull(publisher); 148 } 149 150 /** 151 * The publisher that emits byte buffers for this response. 152 * 153 * @return the byte-buffer publisher 154 */ 155 public java.util.concurrent.Flow.@NonNull Publisher<@NonNull ByteBuffer> getPublisher() { 156 return this.publisher; 157 } 158 } 159 160 /** 161 * A streaming body backed by an {@link InputStream} supplier. 162 */ 163 final class InputStreamBody implements StreamingResponseBody { 164 @NonNull 165 private final Supplier<? extends InputStream> inputStreamSupplier; 166 @NonNull 167 private final Integer bufferSizeInBytes; 168 169 private InputStreamBody(@NonNull InputStreamBuilder builder) { 170 requireNonNull(builder); 171 172 this.inputStreamSupplier = builder.inputStreamSupplier; 173 this.bufferSizeInBytes = builder.bufferSizeInBytes == null 174 ? DEFAULT_INPUT_STREAM_BUFFER_SIZE_IN_BYTES 175 : builder.bufferSizeInBytes; 176 177 if (this.bufferSizeInBytes < 1) 178 throw new IllegalArgumentException("Input stream buffer size must be > 0"); 179 } 180 181 /** 182 * The supplier that opens the source input stream. 183 * 184 * @return the input stream supplier 185 */ 186 @NonNull 187 public Supplier<? extends InputStream> getInputStreamSupplier() { 188 return this.inputStreamSupplier; 189 } 190 191 /** 192 * The adapter buffer size. 193 * 194 * @return the adapter buffer size in bytes 195 */ 196 @NonNull 197 public Integer getBufferSizeInBytes() { 198 return this.bufferSizeInBytes; 199 } 200 } 201 202 /** 203 * A streaming body backed by a {@link Reader} supplier. 204 */ 205 final class ReaderBody implements StreamingResponseBody { 206 @NonNull 207 private final Supplier<? extends Reader> readerSupplier; 208 @NonNull 209 private final Charset charset; 210 @NonNull 211 private final Integer bufferSizeInCharacters; 212 @NonNull 213 private final CodingErrorAction malformedInputAction; 214 @NonNull 215 private final CodingErrorAction unmappableCharacterAction; 216 217 private ReaderBody(@NonNull ReaderBuilder builder) { 218 requireNonNull(builder); 219 220 this.readerSupplier = builder.readerSupplier; 221 this.charset = builder.charset; 222 this.bufferSizeInCharacters = builder.bufferSizeInCharacters == null 223 ? DEFAULT_READER_BUFFER_SIZE_IN_CHARACTERS 224 : builder.bufferSizeInCharacters; 225 this.malformedInputAction = builder.malformedInputAction == null 226 ? DEFAULT_MALFORMED_INPUT_ACTION 227 : builder.malformedInputAction; 228 this.unmappableCharacterAction = builder.unmappableCharacterAction == null 229 ? DEFAULT_UNMAPPABLE_CHARACTER_ACTION 230 : builder.unmappableCharacterAction; 231 232 if (this.bufferSizeInCharacters < 1) 233 throw new IllegalArgumentException("Reader buffer size must be > 0"); 234 } 235 236 /** 237 * The supplier that opens the source reader. 238 * 239 * @return the reader supplier 240 */ 241 @NonNull 242 public Supplier<? extends Reader> getReaderSupplier() { 243 return this.readerSupplier; 244 } 245 246 /** 247 * The charset used to encode characters to response bytes. 248 * 249 * @return the charset 250 */ 251 @NonNull 252 public Charset getCharset() { 253 return this.charset; 254 } 255 256 /** 257 * The adapter buffer size. 258 * 259 * @return the adapter buffer size in characters 260 */ 261 @NonNull 262 public Integer getBufferSizeInCharacters() { 263 return this.bufferSizeInCharacters; 264 } 265 266 /** 267 * Encoder behavior for malformed input. 268 * 269 * @return the malformed-input behavior 270 */ 271 @NonNull 272 public CodingErrorAction getMalformedInputAction() { 273 return this.malformedInputAction; 274 } 275 276 /** 277 * Encoder behavior for unmappable characters. 278 * 279 * @return the unmappable-character behavior 280 */ 281 @NonNull 282 public CodingErrorAction getUnmappableCharacterAction() { 283 return this.unmappableCharacterAction; 284 } 285 286 /** 287 * Creates a new charset encoder configured with this body's error actions. 288 * 289 * @return a newly configured charset encoder 290 */ 291 @NonNull 292 public CharsetEncoder newEncoder() { 293 return getCharset().newEncoder() 294 .onMalformedInput(getMalformedInputAction()) 295 .onUnmappableCharacter(getUnmappableCharacterAction()); 296 } 297 } 298 299 /** 300 * A streaming body backed by a writer callback. 301 */ 302 final class WriterBody implements StreamingResponseBody { 303 @NonNull 304 private final StreamingResponseWriter writer; 305 306 private WriterBody(@NonNull StreamingResponseWriter writer) { 307 this.writer = requireNonNull(writer); 308 } 309 310 /** 311 * The callback that writes the response body. 312 * 313 * @return the streaming response writer 314 */ 315 @NonNull 316 public StreamingResponseWriter getWriter() { 317 return this.writer; 318 } 319 } 320 321 /** 322 * Builder for input-stream-backed response bodies. 323 * <p> 324 * This class is intended for use by a single thread. 325 */ 326 @NotThreadSafe 327 final class InputStreamBuilder { 328 @NonNull 329 private final Supplier<? extends InputStream> inputStreamSupplier; 330 @Nullable 331 private Integer bufferSizeInBytes; 332 333 private InputStreamBuilder(@NonNull Supplier<? extends InputStream> inputStreamSupplier) { 334 this.inputStreamSupplier = requireNonNull(inputStreamSupplier); 335 } 336 337 /** 338 * Sets the input stream copy buffer size. 339 * 340 * @param bufferSizeInBytes buffer size in bytes, or {@code null} for the default 341 * @return this builder 342 */ 343 @NonNull 344 public InputStreamBuilder bufferSizeInBytes(@Nullable Integer bufferSizeInBytes) { 345 this.bufferSizeInBytes = bufferSizeInBytes; 346 return this; 347 } 348 349 /** 350 * Builds an input-stream-backed response body. 351 * 352 * @return a streaming response body 353 */ 354 @NonNull 355 public StreamingResponseBody build() { 356 return new InputStreamBody(this); 357 } 358 } 359 360 /** 361 * Builder for reader-backed response bodies. 362 * <p> 363 * This class is intended for use by a single thread. 364 */ 365 @NotThreadSafe 366 final class ReaderBuilder { 367 @NonNull 368 private final Supplier<? extends Reader> readerSupplier; 369 @NonNull 370 private final Charset charset; 371 @Nullable 372 private Integer bufferSizeInCharacters; 373 @Nullable 374 private CodingErrorAction malformedInputAction; 375 @Nullable 376 private CodingErrorAction unmappableCharacterAction; 377 378 private ReaderBuilder(@NonNull Supplier<? extends Reader> readerSupplier, 379 @NonNull Charset charset) { 380 this.readerSupplier = requireNonNull(readerSupplier); 381 this.charset = requireNonNull(charset); 382 } 383 384 /** 385 * Sets the reader copy buffer size. 386 * 387 * @param bufferSizeInCharacters buffer size in characters, or {@code null} for the default 388 * @return this builder 389 */ 390 @NonNull 391 public ReaderBuilder bufferSizeInCharacters(@Nullable Integer bufferSizeInCharacters) { 392 this.bufferSizeInCharacters = bufferSizeInCharacters; 393 return this; 394 } 395 396 /** 397 * Sets encoder behavior for malformed input. 398 * 399 * @param malformedInputAction malformed-input behavior, or {@code null} for the default 400 * @return this builder 401 */ 402 @NonNull 403 public ReaderBuilder malformedInputAction(@Nullable CodingErrorAction malformedInputAction) { 404 this.malformedInputAction = malformedInputAction; 405 return this; 406 } 407 408 /** 409 * Sets encoder behavior for unmappable characters. 410 * 411 * @param unmappableCharacterAction unmappable-character behavior, or {@code null} for the default 412 * @return this builder 413 */ 414 @NonNull 415 public ReaderBuilder unmappableCharacterAction(@Nullable CodingErrorAction unmappableCharacterAction) { 416 this.unmappableCharacterAction = unmappableCharacterAction; 417 return this; 418 } 419 420 /** 421 * Builds a reader-backed response body. 422 * 423 * @return a streaming response body 424 */ 425 @NonNull 426 public StreamingResponseBody build() { 427 return new ReaderBody(this); 428 } 429 } 430}