001/*
002 * Copyright 2022-2026 Revetware LLC.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.soklet;
018
019import org.jspecify.annotations.NonNull;
020import org.jspecify.annotations.Nullable;
021
022import javax.annotation.concurrent.ThreadSafe;
023import java.util.List;
024import java.util.Objects;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicReference;
028import java.util.concurrent.locks.ReentrantLock;
029import java.util.function.Consumer;
030
031import static java.lang.String.format;
032import static java.util.Objects.requireNonNull;
033
034/**
035 * Result of simulator-mode MCP request handling.
036 *
037 * @author <a href="https://www.revetkn.com">Mark Allen</a>
038 */
039public sealed interface McpRequestResult permits McpRequestResult.ResponseCompleted, McpRequestResult.StreamOpened {
040        /**
041         * Provides the low-level Soklet request result associated with the simulated MCP request.
042         *
043         * @return the low-level request result
044         */
045        @NonNull
046        HttpRequestResult getHttpRequestResult();
047
048        /**
049         * MCP request completed without leaving an open stream.
050         */
051        @ThreadSafe
052        final class ResponseCompleted implements McpRequestResult {
053                @NonNull
054                private final HttpRequestResult requestResult;
055
056                ResponseCompleted(@NonNull HttpRequestResult requestResult) {
057                        requireNonNull(requestResult);
058                        this.requestResult = requestResult;
059                }
060
061                @NonNull
062                @Override
063                public HttpRequestResult getHttpRequestResult() {
064                        return this.requestResult;
065                }
066        }
067
068        /**
069         * MCP request left an open simulated stream.
070         */
071        @ThreadSafe
072        final class StreamOpened implements McpRequestResult {
073                @NonNull
074                private final HttpRequestResult requestResult;
075                @NonNull
076                private final AtomicReference<@Nullable Consumer<Throwable>> streamErrorHandler;
077                @NonNull
078                private final ReentrantLock lock;
079                @NonNull
080                private final List<@NonNull McpObject> bufferedMessages;
081                @NonNull
082                private final Boolean closeAfterBufferedReplay;
083                @NonNull
084                private final AtomicBoolean closed;
085                @NonNull
086                private final AtomicReference<@Nullable Runnable> onClose;
087                @Nullable
088                private volatile Consumer<McpObject> messageConsumer;
089
090                StreamOpened(@NonNull HttpRequestResult requestResult,
091                                                                 @Nullable AtomicReference<@Nullable Consumer<Throwable>> streamErrorHandler,
092                                                                 @NonNull Boolean closeAfterBufferedReplay) {
093                        requireNonNull(requestResult);
094                        requireNonNull(closeAfterBufferedReplay);
095
096                        this.requestResult = requestResult;
097                        this.streamErrorHandler = streamErrorHandler == null ? new AtomicReference<>() : streamErrorHandler;
098                        this.lock = new ReentrantLock();
099                        this.bufferedMessages = new CopyOnWriteArrayList<>();
100                        this.closeAfterBufferedReplay = closeAfterBufferedReplay;
101                        this.closed = new AtomicBoolean(false);
102                        this.onClose = new AtomicReference<>();
103                        this.messageConsumer = null;
104                }
105
106                /**
107                 * Registers the consumer that should receive MCP messages from the open simulated stream.
108                 *
109                 * @param messageConsumer the message consumer
110                 */
111                public void registerMessageConsumer(@NonNull Consumer<McpObject> messageConsumer) {
112                        requireNonNull(messageConsumer);
113
114                        this.lock.lock();
115
116                        try {
117                                if (this.messageConsumer != null)
118                                        throw new IllegalStateException(format("You cannot specify more than one message consumer for the same %s", StreamOpened.class.getSimpleName()));
119
120                                this.messageConsumer = messageConsumer;
121
122                                for (McpObject bufferedMessage : this.bufferedMessages) {
123                                        try {
124                                                messageConsumer.accept(bufferedMessage);
125                                        } catch (Throwable throwable) {
126                                                handleMessageConsumerError(throwable);
127                                        }
128                                }
129
130                                this.bufferedMessages.clear();
131
132                                if (this.closeAfterBufferedReplay)
133                                        terminate();
134                        } finally {
135                                this.lock.unlock();
136                        }
137                }
138
139                /**
140                 * Simulates client-side closure of the open MCP stream.
141                 */
142                public void close() {
143                        if (!this.closed.compareAndSet(false, true))
144                                return;
145
146                        Runnable onClose = this.onClose.get();
147
148                        if (onClose != null)
149                                onClose.run();
150                }
151
152                /**
153                 * Indicates whether the simulated stream has been closed.
154                 *
155                 * @return {@code true} if the stream is closed
156                 */
157                @NonNull
158                public Boolean isClosed() {
159                        return this.closed.get();
160                }
161
162                @NonNull
163                @Override
164                public HttpRequestResult getHttpRequestResult() {
165                        return this.requestResult;
166                }
167
168                void emitMessage(@NonNull McpObject message) {
169                        requireNonNull(message);
170
171                        if (isClosed())
172                                return;
173
174                        Consumer<McpObject> messageConsumer = this.messageConsumer;
175
176                        if (messageConsumer == null) {
177                                this.bufferedMessages.add(message);
178                                return;
179                        }
180
181                        try {
182                                messageConsumer.accept(message);
183                        } catch (Throwable throwable) {
184                                handleMessageConsumerError(throwable);
185                        }
186                }
187
188                void terminate() {
189                        this.closed.set(true);
190                }
191
192                void onClose(@Nullable Runnable onClose) {
193                        this.onClose.set(onClose);
194                }
195
196                private void handleMessageConsumerError(@NonNull Throwable throwable) {
197                        requireNonNull(throwable);
198
199                        Consumer<Throwable> onStreamError = this.streamErrorHandler.get();
200
201                        if (onStreamError != null)
202                                onStreamError.accept(throwable);
203                        else
204                                throw new IllegalStateException("Unhandled exception thrown by MCP stream consumer", throwable);
205                }
206
207                @Override
208                public boolean equals(@Nullable Object other) {
209                        if (this == other)
210                                return true;
211
212                        if (!(other instanceof StreamOpened streamOpened))
213                                return false;
214
215                        return getHttpRequestResult().equals(streamOpened.getHttpRequestResult())
216                                        && isClosed().equals(streamOpened.isClosed());
217                }
218
219                @Override
220                public int hashCode() {
221                        return Objects.hash(getHttpRequestResult(), isClosed());
222                }
223        }
224}