001/*
002 * Copyright 2022-2026 Revetware LLC.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.soklet;
018
019import org.jspecify.annotations.NonNull;
020import org.jspecify.annotations.Nullable;
021
022import javax.annotation.concurrent.NotThreadSafe;
023import javax.annotation.concurrent.ThreadSafe;
024import java.io.InputStream;
025import java.io.Reader;
026import java.nio.ByteBuffer;
027import java.nio.charset.Charset;
028import java.nio.charset.CharsetEncoder;
029import java.nio.charset.CodingErrorAction;
030import java.util.concurrent.Flow;
031import java.util.function.Supplier;
032
033import static java.util.Objects.requireNonNull;
034
035/**
036 * A streaming HTTP response body.
037 * <p>
038 * This type describes how response bytes are produced; it is not itself responsible for writing to a transport.
039 * Descriptors are immutable and thread-safe, but caller-supplied writers, publishers, readers, input streams, and
040 * suppliers are responsible for their own behavior.
041 *
042 * @author <a href="https://www.revetkn.com">Mark Allen</a>
043 */
044@ThreadSafe
045public sealed interface StreamingResponseBody permits StreamingResponseBody.PublisherBody, StreamingResponseBody.InputStreamBody, StreamingResponseBody.ReaderBody, StreamingResponseBody.WriterBody {
046        @NonNull
047        Integer DEFAULT_INPUT_STREAM_BUFFER_SIZE_IN_BYTES = 1_024 * 16;
048        @NonNull
049        Integer DEFAULT_READER_BUFFER_SIZE_IN_CHARACTERS = 8_192;
050        @NonNull
051        CodingErrorAction DEFAULT_MALFORMED_INPUT_ACTION = CodingErrorAction.REPORT;
052        @NonNull
053        CodingErrorAction DEFAULT_UNMAPPABLE_CHARACTER_ACTION = CodingErrorAction.REPORT;
054
055        /**
056         * Creates a streaming response body backed by a writer callback.
057         *
058         * @param writer the callback that writes the response
059         * @return a streaming response body
060         */
061        @NonNull
062        static StreamingResponseBody fromWriter(@NonNull StreamingResponseWriter writer) {
063                return new WriterBody(writer);
064        }
065
066        /**
067         * Creates a streaming response body backed by a {@link Flow.Publisher} of byte buffers.
068         * <p>
069         * Soklet requests one item at a time and writes each item through its bounded streaming queue. If the queue is
070         * full, the subscriber's {@code onNext} path may block until space is available.
071         * <p>
072         * If the response is canceled before the publisher terminates, Soklet cancels the publisher subscription.
073         *
074         * @param publisher the publisher that emits response bytes
075         * @return a streaming response body
076         */
077        @NonNull
078        static StreamingResponseBody fromPublisher(java.util.concurrent.Flow.@NonNull Publisher<@NonNull ByteBuffer> publisher) {
079                return new PublisherBody(publisher);
080        }
081
082        /**
083         * Creates a streaming response body backed by an input stream supplier.
084         * <p>
085         * This adapter is intended for special cases where an existing streaming source is already exposed as an
086         * {@link InputStream}. Dynamic application streaming should usually use {@link #fromWriter(StreamingResponseWriter)}
087         * or {@link #fromPublisher(Flow.Publisher)}.
088         * <p>
089         * If the response is canceled while a read is blocked, Soklet closes the supplied input stream.
090         *
091         * @param inputStreamSupplier supplies the input stream to copy
092         * @return a streaming response body
093         */
094        @NonNull
095        static StreamingResponseBody fromInputStream(@NonNull Supplier<? extends InputStream> inputStreamSupplier) {
096                return withInputStream(inputStreamSupplier).build();
097        }
098
099        /**
100         * Acquires a builder for an input-stream-backed response body.
101         *
102         * @param inputStreamSupplier supplies the input stream to copy
103         * @return the builder
104         */
105        @NonNull
106        static InputStreamBuilder withInputStream(@NonNull Supplier<? extends InputStream> inputStreamSupplier) {
107                return new InputStreamBuilder(inputStreamSupplier);
108        }
109
110        /**
111         * Creates a streaming response body backed by a reader supplier.
112         * <p>
113         * The charset is required. Encoding errors default to {@link CodingErrorAction#REPORT}; use
114         * {@link #withReader(Supplier, Charset)} to override the JDK encoder actions explicitly.
115         * If the response is canceled while a read is blocked, Soklet closes the supplied reader.
116         *
117         * @param readerSupplier supplies the reader to copy
118         * @param charset        charset used to encode characters to response bytes
119         * @return a streaming response body
120         */
121        @NonNull
122        static StreamingResponseBody fromReader(@NonNull Supplier<? extends Reader> readerSupplier,
123                                                                                                                                                                        @NonNull Charset charset) {
124                return withReader(readerSupplier, charset).build();
125        }
126
127        /**
128         * Acquires a builder for a reader-backed response body.
129         *
130         * @param readerSupplier supplies the reader to copy
131         * @param charset        charset used to encode characters to response bytes
132         * @return the builder
133         */
134        @NonNull
135        static ReaderBuilder withReader(@NonNull Supplier<? extends Reader> readerSupplier,
136                                                                                                                                        @NonNull Charset charset) {
137                return new ReaderBuilder(readerSupplier, charset);
138        }
139
140        /**
141         * A streaming body backed by a {@link Flow.Publisher}.
142         */
143        final class PublisherBody implements StreamingResponseBody {
144                private final java.util.concurrent.Flow.@NonNull Publisher<@NonNull ByteBuffer> publisher;
145
146                private PublisherBody(java.util.concurrent.Flow.@NonNull Publisher<@NonNull ByteBuffer> publisher) {
147                        this.publisher = requireNonNull(publisher);
148                }
149
150                /**
151                 * The publisher that emits byte buffers for this response.
152                 *
153                 * @return the byte-buffer publisher
154                 */
155                public java.util.concurrent.Flow.@NonNull Publisher<@NonNull ByteBuffer> getPublisher() {
156                        return this.publisher;
157                }
158        }
159
160        /**
161         * A streaming body backed by an {@link InputStream} supplier.
162         */
163        final class InputStreamBody implements StreamingResponseBody {
164                @NonNull
165                private final Supplier<? extends InputStream> inputStreamSupplier;
166                @NonNull
167                private final Integer bufferSizeInBytes;
168
169                private InputStreamBody(@NonNull InputStreamBuilder builder) {
170                        requireNonNull(builder);
171
172                        this.inputStreamSupplier = builder.inputStreamSupplier;
173                        this.bufferSizeInBytes = builder.bufferSizeInBytes == null
174                                        ? DEFAULT_INPUT_STREAM_BUFFER_SIZE_IN_BYTES
175                                        : builder.bufferSizeInBytes;
176
177                        if (this.bufferSizeInBytes < 1)
178                                throw new IllegalArgumentException("Input stream buffer size must be > 0");
179                }
180
181                /**
182                 * The supplier that opens the source input stream.
183                 *
184                 * @return the input stream supplier
185                 */
186                @NonNull
187                public Supplier<? extends InputStream> getInputStreamSupplier() {
188                        return this.inputStreamSupplier;
189                }
190
191                /**
192                 * The adapter buffer size.
193                 *
194                 * @return the adapter buffer size in bytes
195                 */
196                @NonNull
197                public Integer getBufferSizeInBytes() {
198                        return this.bufferSizeInBytes;
199                }
200        }
201
202        /**
203         * A streaming body backed by a {@link Reader} supplier.
204         */
205        final class ReaderBody implements StreamingResponseBody {
206                @NonNull
207                private final Supplier<? extends Reader> readerSupplier;
208                @NonNull
209                private final Charset charset;
210                @NonNull
211                private final Integer bufferSizeInCharacters;
212                @NonNull
213                private final CodingErrorAction malformedInputAction;
214                @NonNull
215                private final CodingErrorAction unmappableCharacterAction;
216
217                private ReaderBody(@NonNull ReaderBuilder builder) {
218                        requireNonNull(builder);
219
220                        this.readerSupplier = builder.readerSupplier;
221                        this.charset = builder.charset;
222                        this.bufferSizeInCharacters = builder.bufferSizeInCharacters == null
223                                        ? DEFAULT_READER_BUFFER_SIZE_IN_CHARACTERS
224                                        : builder.bufferSizeInCharacters;
225                        this.malformedInputAction = builder.malformedInputAction == null
226                                        ? DEFAULT_MALFORMED_INPUT_ACTION
227                                        : builder.malformedInputAction;
228                        this.unmappableCharacterAction = builder.unmappableCharacterAction == null
229                                        ? DEFAULT_UNMAPPABLE_CHARACTER_ACTION
230                                        : builder.unmappableCharacterAction;
231
232                        if (this.bufferSizeInCharacters < 1)
233                                throw new IllegalArgumentException("Reader buffer size must be > 0");
234                }
235
236                /**
237                 * The supplier that opens the source reader.
238                 *
239                 * @return the reader supplier
240                 */
241                @NonNull
242                public Supplier<? extends Reader> getReaderSupplier() {
243                        return this.readerSupplier;
244                }
245
246                /**
247                 * The charset used to encode characters to response bytes.
248                 *
249                 * @return the charset
250                 */
251                @NonNull
252                public Charset getCharset() {
253                        return this.charset;
254                }
255
256                /**
257                 * The adapter buffer size.
258                 *
259                 * @return the adapter buffer size in characters
260                 */
261                @NonNull
262                public Integer getBufferSizeInCharacters() {
263                        return this.bufferSizeInCharacters;
264                }
265
266                /**
267                 * Encoder behavior for malformed input.
268                 *
269                 * @return the malformed-input behavior
270                 */
271                @NonNull
272                public CodingErrorAction getMalformedInputAction() {
273                        return this.malformedInputAction;
274                }
275
276                /**
277                 * Encoder behavior for unmappable characters.
278                 *
279                 * @return the unmappable-character behavior
280                 */
281                @NonNull
282                public CodingErrorAction getUnmappableCharacterAction() {
283                        return this.unmappableCharacterAction;
284                }
285
286                /**
287                 * Creates a new charset encoder configured with this body's error actions.
288                 *
289                 * @return a newly configured charset encoder
290                 */
291                @NonNull
292                public CharsetEncoder newEncoder() {
293                        return getCharset().newEncoder()
294                                        .onMalformedInput(getMalformedInputAction())
295                                        .onUnmappableCharacter(getUnmappableCharacterAction());
296                }
297        }
298
299        /**
300         * A streaming body backed by a writer callback.
301         */
302        final class WriterBody implements StreamingResponseBody {
303                @NonNull
304                private final StreamingResponseWriter writer;
305
306                private WriterBody(@NonNull StreamingResponseWriter writer) {
307                        this.writer = requireNonNull(writer);
308                }
309
310                /**
311                 * The callback that writes the response body.
312                 *
313                 * @return the streaming response writer
314                 */
315                @NonNull
316                public StreamingResponseWriter getWriter() {
317                        return this.writer;
318                }
319        }
320
321        /**
322         * Builder for input-stream-backed response bodies.
323         * <p>
324         * This class is intended for use by a single thread.
325         */
326        @NotThreadSafe
327        final class InputStreamBuilder {
328                @NonNull
329                private final Supplier<? extends InputStream> inputStreamSupplier;
330                @Nullable
331                private Integer bufferSizeInBytes;
332
333                private InputStreamBuilder(@NonNull Supplier<? extends InputStream> inputStreamSupplier) {
334                        this.inputStreamSupplier = requireNonNull(inputStreamSupplier);
335                }
336
337                /**
338                 * Sets the input stream copy buffer size.
339                 *
340                 * @param bufferSizeInBytes buffer size in bytes, or {@code null} for the default
341                 * @return this builder
342                 */
343                @NonNull
344                public InputStreamBuilder bufferSizeInBytes(@Nullable Integer bufferSizeInBytes) {
345                        this.bufferSizeInBytes = bufferSizeInBytes;
346                        return this;
347                }
348
349                /**
350                 * Builds an input-stream-backed response body.
351                 *
352                 * @return a streaming response body
353                 */
354                @NonNull
355                public StreamingResponseBody build() {
356                        return new InputStreamBody(this);
357                }
358        }
359
360        /**
361         * Builder for reader-backed response bodies.
362         * <p>
363         * This class is intended for use by a single thread.
364         */
365        @NotThreadSafe
366        final class ReaderBuilder {
367                @NonNull
368                private final Supplier<? extends Reader> readerSupplier;
369                @NonNull
370                private final Charset charset;
371                @Nullable
372                private Integer bufferSizeInCharacters;
373                @Nullable
374                private CodingErrorAction malformedInputAction;
375                @Nullable
376                private CodingErrorAction unmappableCharacterAction;
377
378                private ReaderBuilder(@NonNull Supplier<? extends Reader> readerSupplier,
379                                                                                                        @NonNull Charset charset) {
380                        this.readerSupplier = requireNonNull(readerSupplier);
381                        this.charset = requireNonNull(charset);
382                }
383
384                /**
385                 * Sets the reader copy buffer size.
386                 *
387                 * @param bufferSizeInCharacters buffer size in characters, or {@code null} for the default
388                 * @return this builder
389                 */
390                @NonNull
391                public ReaderBuilder bufferSizeInCharacters(@Nullable Integer bufferSizeInCharacters) {
392                        this.bufferSizeInCharacters = bufferSizeInCharacters;
393                        return this;
394                }
395
396                /**
397                 * Sets encoder behavior for malformed input.
398                 *
399                 * @param malformedInputAction malformed-input behavior, or {@code null} for the default
400                 * @return this builder
401                 */
402                @NonNull
403                public ReaderBuilder malformedInputAction(@Nullable CodingErrorAction malformedInputAction) {
404                        this.malformedInputAction = malformedInputAction;
405                        return this;
406                }
407
408                /**
409                 * Sets encoder behavior for unmappable characters.
410                 *
411                 * @param unmappableCharacterAction unmappable-character behavior, or {@code null} for the default
412                 * @return this builder
413                 */
414                @NonNull
415                public ReaderBuilder unmappableCharacterAction(@Nullable CodingErrorAction unmappableCharacterAction) {
416                        this.unmappableCharacterAction = unmappableCharacterAction;
417                        return this;
418                }
419
420                /**
421                 * Builds a reader-backed response body.
422                 *
423                 * @return a streaming response body
424                 */
425                @NonNull
426                public StreamingResponseBody build() {
427                        return new ReaderBody(this);
428                }
429        }
430}