001/* 002 * Copyright 2022-2026 Revetware LLC. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.soklet; 018 019import org.jspecify.annotations.NonNull; 020import org.jspecify.annotations.Nullable; 021 022import javax.annotation.concurrent.ThreadSafe; 023import java.util.List; 024import java.util.Objects; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicReference; 028import java.util.concurrent.locks.ReentrantLock; 029import java.util.function.Consumer; 030 031import static java.lang.String.format; 032import static java.util.Objects.requireNonNull; 033 034/** 035 * Result of simulator-mode MCP request handling. 036 * 037 * @author <a href="https://www.revetkn.com">Mark Allen</a> 038 */ 039public sealed interface McpRequestResult permits McpRequestResult.ResponseCompleted, McpRequestResult.StreamOpened { 040 /** 041 * Provides the low-level Soklet request result associated with the simulated MCP request. 042 * 043 * @return the low-level request result 044 */ 045 @NonNull 046 HttpRequestResult getHttpRequestResult(); 047 048 /** 049 * MCP request completed without leaving an open stream. 050 */ 051 @ThreadSafe 052 final class ResponseCompleted implements McpRequestResult { 053 @NonNull 054 private final HttpRequestResult requestResult; 055 056 ResponseCompleted(@NonNull HttpRequestResult requestResult) { 057 requireNonNull(requestResult); 058 this.requestResult = requestResult; 059 } 060 061 @NonNull 062 @Override 063 public HttpRequestResult getHttpRequestResult() { 064 return this.requestResult; 065 } 066 } 067 068 /** 069 * MCP request left an open simulated stream. 070 */ 071 @ThreadSafe 072 final class StreamOpened implements McpRequestResult { 073 @NonNull 074 private final HttpRequestResult requestResult; 075 @NonNull 076 private final AtomicReference<@Nullable Consumer<Throwable>> streamErrorHandler; 077 @NonNull 078 private final ReentrantLock lock; 079 @NonNull 080 private final List<@NonNull McpObject> bufferedMessages; 081 @NonNull 082 private final Boolean closeAfterBufferedReplay; 083 @NonNull 084 private final AtomicBoolean closed; 085 @NonNull 086 private final AtomicReference<@Nullable Runnable> onClose; 087 @Nullable 088 private volatile Consumer<McpObject> messageConsumer; 089 090 StreamOpened(@NonNull HttpRequestResult requestResult, 091 @Nullable AtomicReference<@Nullable Consumer<Throwable>> streamErrorHandler, 092 @NonNull Boolean closeAfterBufferedReplay) { 093 requireNonNull(requestResult); 094 requireNonNull(closeAfterBufferedReplay); 095 096 this.requestResult = requestResult; 097 this.streamErrorHandler = streamErrorHandler == null ? new AtomicReference<>() : streamErrorHandler; 098 this.lock = new ReentrantLock(); 099 this.bufferedMessages = new CopyOnWriteArrayList<>(); 100 this.closeAfterBufferedReplay = closeAfterBufferedReplay; 101 this.closed = new AtomicBoolean(false); 102 this.onClose = new AtomicReference<>(); 103 this.messageConsumer = null; 104 } 105 106 /** 107 * Registers the consumer that should receive MCP messages from the open simulated stream. 108 * 109 * @param messageConsumer the message consumer 110 */ 111 public void registerMessageConsumer(@NonNull Consumer<McpObject> messageConsumer) { 112 requireNonNull(messageConsumer); 113 114 this.lock.lock(); 115 116 try { 117 if (this.messageConsumer != null) 118 throw new IllegalStateException(format("You cannot specify more than one message consumer for the same %s", StreamOpened.class.getSimpleName())); 119 120 this.messageConsumer = messageConsumer; 121 122 for (McpObject bufferedMessage : this.bufferedMessages) { 123 try { 124 messageConsumer.accept(bufferedMessage); 125 } catch (Throwable throwable) { 126 handleMessageConsumerError(throwable); 127 } 128 } 129 130 this.bufferedMessages.clear(); 131 132 if (this.closeAfterBufferedReplay) 133 terminate(); 134 } finally { 135 this.lock.unlock(); 136 } 137 } 138 139 /** 140 * Simulates client-side closure of the open MCP stream. 141 */ 142 public void close() { 143 if (!this.closed.compareAndSet(false, true)) 144 return; 145 146 Runnable onClose = this.onClose.get(); 147 148 if (onClose != null) 149 onClose.run(); 150 } 151 152 /** 153 * Indicates whether the simulated stream has been closed. 154 * 155 * @return {@code true} if the stream is closed 156 */ 157 @NonNull 158 public Boolean isClosed() { 159 return this.closed.get(); 160 } 161 162 @NonNull 163 @Override 164 public HttpRequestResult getHttpRequestResult() { 165 return this.requestResult; 166 } 167 168 void emitMessage(@NonNull McpObject message) { 169 requireNonNull(message); 170 171 if (isClosed()) 172 return; 173 174 Consumer<McpObject> messageConsumer = this.messageConsumer; 175 176 if (messageConsumer == null) { 177 this.bufferedMessages.add(message); 178 return; 179 } 180 181 try { 182 messageConsumer.accept(message); 183 } catch (Throwable throwable) { 184 handleMessageConsumerError(throwable); 185 } 186 } 187 188 void terminate() { 189 this.closed.set(true); 190 } 191 192 void onClose(@Nullable Runnable onClose) { 193 this.onClose.set(onClose); 194 } 195 196 private void handleMessageConsumerError(@NonNull Throwable throwable) { 197 requireNonNull(throwable); 198 199 Consumer<Throwable> onStreamError = this.streamErrorHandler.get(); 200 201 if (onStreamError != null) 202 onStreamError.accept(throwable); 203 else 204 throw new IllegalStateException("Unhandled exception thrown by MCP stream consumer", throwable); 205 } 206 207 @Override 208 public boolean equals(@Nullable Object other) { 209 if (this == other) 210 return true; 211 212 if (!(other instanceof StreamOpened streamOpened)) 213 return false; 214 215 return getHttpRequestResult().equals(streamOpened.getHttpRequestResult()) 216 && isClosed().equals(streamOpened.isClosed()); 217 } 218 219 @Override 220 public int hashCode() { 221 return Objects.hash(getHttpRequestResult(), isClosed()); 222 } 223 } 224}