Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,17 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None:
logger.debug("Resumption GET SSE connection established")

async for sse in event_source.aiter_sse(): # pragma: no branch
is_complete = await self._handle_sse_event(
await self._handle_sse_event(
sse,
ctx.read_stream_writer,
original_request_id,
ctx.metadata.on_resumption_token_update if ctx.metadata else None,
)
if is_complete:
await event_source.response.aclose()
break
# Drain to EOF rather than breaking on completion (see
# _handle_sse_response): breaking leaves the enclosing
# `async with aconnect_sse(...)` to close an un-drained stream,
# which can stall the next request that reuses the connection.
# Cancellation still tears the stream down promptly.

async def _handle_post_request(self, ctx: RequestContext) -> None:
"""Handle a POST request with response processing."""
Expand Down Expand Up @@ -340,6 +342,8 @@ async def _handle_sse_response(
assert isinstance(ctx.session_message.message, JSONRPCRequest)
original_request_id = ctx.session_message.message.id

response_complete = False

try:
event_source = EventSource(response)
async for sse in event_source.aiter_sse(): # pragma: no branch
Expand All @@ -358,16 +362,20 @@ async def _handle_sse_response(
resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None),
is_initialization=is_initialization,
)
# If the SSE event indicates completion, like returning response/error
# break the loop
# Once the response/error arrives the caller is unblocked, but we keep
# iterating so the SSE body is drained to EOF before the connection is
# released. Closing the response early (await response.aclose()) leaves
# the keepalive connection un-drained, which can stall the next request
# that reuses it. Cancellation still tears down promptly: CancelledError
# propagates out of aiter_sse() and the caller's
# `async with client.stream(...)` closes the response.
if is_complete:
await response.aclose()
return # Normal completion, no reconnect needed
response_complete = True
except Exception:
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover

# Stream ended without response - reconnect if we received an event with ID
if last_event_id is not None: # pragma: no branch
# Stream ended without a response - reconnect if we received an event with ID
if not response_complete and last_event_id is not None:
logger.info("SSE stream disconnected, reconnecting...")
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms)

Expand Down Expand Up @@ -404,6 +412,7 @@ async def _handle_reconnection(
# Track for potential further reconnection
reconnect_last_event_id: str = last_event_id
reconnect_retry_ms = retry_interval_ms
response_complete = False

async for sse in event_source.aiter_sse():
if sse.id: # pragma: no branch
Expand All @@ -417,9 +426,12 @@ async def _handle_reconnection(
original_request_id,
ctx.metadata.on_resumption_token_update if ctx.metadata else None,
)
# Drain to EOF instead of closing early (see _handle_sse_response).
if is_complete:
await event_source.response.aclose()
return
response_complete = True

if response_complete:
return

# Stream ended again without response - reconnect again (reset attempt counter)
logger.info("SSE stream disconnected, reconnecting...")
Expand Down
Loading