Skip to content

Commit 080f2a8

Browse files
authored
Harden the dual-era stream loop's era-lock and rejection semantics (#3040)
1 parent d39c68d commit 080f2a8

15 files changed

Lines changed: 652 additions & 125 deletions

File tree

docs/advanced/subscriptions.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ Two more things the stream is *not*:
4646
* **It is not a replay log.** A dropped stream is gone; events published while nobody was connected are not queued. The client's contract is to re-listen and re-fetch what it cares about.
4747
* **It is not the 2025 path.** Clients on earlier protocol versions that called `resources/subscribe` are served by `ctx.session.send_resource_updated(uri)` — the `notify_*` methods reach `subscriptions/listen` streams only.
4848

49+
!!! warning "Streamable HTTP only, for now"
50+
`subscriptions/listen` is served on the streamable-HTTP transport. Over stdio (and other
51+
stream-pair transports) a 2026-07-28 connection rejects it with METHOD_NOT_FOUND — the
52+
open-stream semantics haven't been built for that transport yet, even though
53+
`server/discover` still advertises the subscription capabilities there.
54+
4955
## One process is the default. More takes a bus
5056

5157
Publishes travel from your handler to the open streams over a `SubscriptionBus`. The default is in-memory: one process, every stream in it. That is the right answer until you run replicas behind a load balancer — then a client's stream is pinned to one replica, and a publish on another replica has to reach it.

docs/client/protocol-versions.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ Either way you come out connected, and `client.protocol_version` tells you which
2626
That is the whole feature. One `Client`, any era of server, no branching in your code.
2727

2828
!!! info
29-
`MCPServer` answers `server/discover`, so against your own in-memory server `auto` always lands
30-
on `2026-07-28`. The fallback only ever fires against a real pre-2026 server, which is exactly
31-
when you want it to.
29+
`MCPServer` answers `server/discover` on every transport — in-memory, stdio, streamable
30+
HTTP — so against your own server `auto` always lands on `2026-07-28`. The fallback only
31+
ever fires against a real pre-2026 server, which is exactly when you want it to.
3232

3333
## `mode="legacy"`
3434

docs/migration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,8 @@ On the high-level `Client`, `client.server_capabilities`, `client.server_info`,
411411

412412
In v1, connecting to a server always performed the `initialize` handshake. In v2, `Client` defaults to `mode='auto'`: on enter it probes `server/discover` and, if the server doesn't support it, falls back to the `initialize` handshake. Pass `mode='legacy'` to force the initialize handshake and reproduce v1's byte-identical pre-2026 behavior, or pass a modern protocol-version string (e.g. `mode='2026-07-28'`) to pin a version without probing.
413413

414+
The probe is transport-independent: v2 servers answer it over stdio (and any other stream-pair transport) as well as streamable HTTP, so `mode='auto'` lands on `2026-07-28` against a v2 server on every transport. If your stdio workflow relies on server-initiated requests (sampling, push elicitation), pass `mode='legacy'` — a 2026-07-28 connection refuses them on every transport.
415+
414416
For an in-process `Client(server)` (where `server` is a `Server` or `MCPServer` instance), `mode='auto'` dispatches calls directly through `DirectDispatcher` with no JSON-RPC framing. Pass `mode='legacy'` if you need the in-memory JSON-RPC transport that v1 used.
415417

416418
`Client.send_ping()` is deprecated (ping is removed in 2026-07-28); pin `mode='legacy'` if you need it.

src/mcp/client/_probe.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
the same path. Any non-``MCPError`` exception (network/connection errors,
1212
anyio cancellation, the ``RuntimeError`` from ``adopt()`` on no-mutual)
1313
propagates to the caller; an outage or in-process bug is never an era verdict.
14+
15+
The fallback handshake itself can be answered with ``-32022`` — e.g. a probe
16+
that timed out client-side but succeeded on a slow-starting server locked the
17+
connection modern before the pipelined ``initialize`` arrived. That code is
18+
itself positive modern evidence (it names the server's versions), so it
19+
triggers one re-probe at a mutual version instead of failing the connect.
1420
"""
1521

1622
from __future__ import annotations
@@ -49,7 +55,8 @@ async def negotiate_auto(session: ClientSession) -> None:
4955
5056
Raises:
5157
MCPError: The server is modern-only and shares no version with this
52-
client (-32022 with a disjoint ``supported`` list).
58+
client (-32022 with a disjoint ``supported`` list), or the
59+
fallback handshake failed and one corrective re-probe did too.
5360
Exception: Any transport/network error from the probe propagates as-is.
5461
"""
5562
version = LATEST_MODERN_VERSION
@@ -65,7 +72,22 @@ async def negotiate_auto(session: ClientSession) -> None:
6572
continue
6673
if supported is not None and not any(v in HANDSHAKE_PROTOCOL_VERSIONS for v in supported):
6774
raise # server is modern-only and disjoint — real incompatibility
68-
await session.initialize() # every other rpc-error → legacy (the denylist)
75+
try:
76+
await session.initialize() # every other rpc-error → legacy (the denylist)
77+
except MCPError as handshake_exc:
78+
if handshake_exc.code != UNSUPPORTED_PROTOCOL_VERSION or attempt != 0:
79+
raise
80+
# -32022 from the handshake is itself modern evidence: a probe
81+
# that timed out client-side but succeeded on the server locked
82+
# the connection modern before this initialize arrived. Re-probe
83+
# once at a version the server names; the era is already
84+
# settled, so the second probe answers without the slow start.
85+
supported = _parse_supported(handshake_exc.error.data)
86+
mutual = [v for v in MODERN_PROTOCOL_VERSIONS if v in (supported or ())]
87+
if not mutual:
88+
raise
89+
version = mutual[-1]
90+
continue
6991
return
7092
# any other exception (httpx.TransportError, ConnectionError, anyio errors,
7193
# RuntimeError from adopt) → propagate

src/mcp/server/_streamable_http_modern.py

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,18 @@
2222
import logging
2323
from collections.abc import Awaitable, Mapping
2424
from dataclasses import dataclass, field
25-
from typing import TYPE_CHECKING, Any, Final, TypeVar, cast
25+
from typing import TYPE_CHECKING, Any, Final, cast
2626

2727
import anyio
2828
from anyio.streams.memory import MemoryObjectSendStream
2929
from mcp_types import (
3030
CLIENT_CAPABILITIES_META_KEY,
3131
CLIENT_INFO_META_KEY,
3232
HEADER_MISMATCH,
33-
INTERNAL_ERROR,
3433
INVALID_REQUEST,
3534
PARSE_ERROR,
3635
PROTOCOL_VERSION_META_KEY,
37-
ClientCapabilities,
3836
ErrorData,
39-
Implementation,
4037
JSONRPCError,
4138
JSONRPCNotification,
4239
JSONRPCRequest,
@@ -45,13 +42,13 @@
4542
RequestId,
4643
)
4744
from mcp_types import methods as _methods
48-
from pydantic import BaseModel, ValidationError
45+
from pydantic import ValidationError
4946
from starlette.requests import Request
5047
from starlette.responses import Response
5148
from starlette.types import Receive, Scope, Send
5249

5350
from mcp.server.connection import Connection
54-
from mcp.server.runner import serve_one
51+
from mcp.server.runner import modern_error_data, serve_one
5552
from mcp.server.streamable_http import check_accept_headers
5653
from mcp.server.transport_security import TransportSecurityMiddleware, TransportSecuritySettings
5754
from mcp.shared.dispatcher import CallOptions
@@ -65,7 +62,7 @@
6562
find_duplicated_routing_header,
6663
validate_mcp_param_headers,
6764
)
68-
from mcp.shared.jsonrpc_dispatcher import handler_exception_to_error_data, progress_token_from_params
65+
from mcp.shared.jsonrpc_dispatcher import progress_token_from_params
6966
from mcp.shared.message import MessageMetadata, ServerMessageMetadata
7067
from mcp.shared.transport_context import TransportContext
7168

@@ -74,7 +71,6 @@
7471

7572
logger = logging.getLogger(__name__)
7673

77-
_ModelT = TypeVar("_ModelT", bound=BaseModel)
7874

7975
_OK_STATUS = 200
8076

@@ -125,37 +121,20 @@ async def progress(self, progress: float, total: float | None = None, message: s
125121
await self.notify("notifications/progress", params)
126122

127123

128-
def _typed(model: type[_ModelT], raw: Any) -> _ModelT | None:
129-
"""Validate the classifier's raw envelope value into a typed model.
130-
131-
Rung 1 guarantees the envelope key was present; a ``null`` or mis-shaped
132-
value falls through to ``ValidationError`` and is treated as not supplied
133-
so the request still routes.
134-
"""
135-
try:
136-
return model.model_validate(raw, by_name=False)
137-
except ValidationError:
138-
return None
139-
140-
141124
async def _to_jsonrpc_response(
142125
request_id: RequestId, coro: Awaitable[dict[str, Any]]
143126
) -> JSONRPCResponse | JSONRPCError:
144127
"""Await ``coro`` and wrap its outcome as the JSON-RPC reply for ``request_id``.
145128
146129
The exception-to-wire boundary for the modern HTTP entry, composed around
147-
`serve_one`. `MCPError` and `ValidationError` map via the shared
148-
`handler_exception_to_error_data` ladder; any other exception is logged and
149-
surfaced as `INTERNAL_ERROR` so handler internals never reach the wire.
130+
`serve_one`: `modern_error_data` maps the shared ladder and surfaces
131+
anything else as a generic `INTERNAL_ERROR` so handler internals never
132+
reach the wire.
150133
"""
151134
try:
152135
result = await coro
153136
except Exception as exc:
154-
error = handler_exception_to_error_data(exc)
155-
if error is None:
156-
logger.exception("request handler raised")
157-
error = ErrorData(code=INTERNAL_ERROR, message="Internal server error")
158-
return JSONRPCError(jsonrpc="2.0", id=request_id, error=error)
137+
return JSONRPCError(jsonrpc="2.0", id=request_id, error=modern_error_data(exc))
159138
return JSONRPCResponse(jsonrpc="2.0", id=request_id, result=result)
160139

161140

@@ -251,16 +230,16 @@ async def _tool_input_schema(
251230
logger.debug("Mcp-Param header validation skipped: the request envelope fails tools/list validation")
252231
return None
253232
seen_cursors: set[str] = set()
254-
client_info = _typed(Implementation, verdict.client_info)
255-
client_capabilities = _typed(ClientCapabilities, verdict.client_capabilities)
256233
dctx = _SingleExchangeDispatchContext(
257234
transport=TransportContext(kind="streamable-http", can_send_request=False, headers=request.headers),
258235
request_id=request_id,
259236
message_metadata=ServerMessageMetadata(request_context=request),
260237
)
261238
for _ in range(_MCP_PARAM_LIST_PAGE_CAP):
262239
# Fresh Connection per page: serve_one tears down the connection's exit stack on the way out.
263-
connection = Connection.from_envelope(verdict.protocol_version, client_info, client_capabilities)
240+
connection = Connection.from_envelope(
241+
verdict.protocol_version, verdict.client_info, verdict.client_capabilities
242+
)
264243
try:
265244
result = await serve_one(
266245
app, dctx, "tools/list", list_params, connection=connection, lifespan_state=lifespan_state
@@ -409,8 +388,8 @@ async def handle_modern_request(
409388

410389
connection = Connection.from_envelope(
411390
verdict.protocol_version,
412-
_typed(Implementation, verdict.client_info),
413-
_typed(ClientCapabilities, verdict.client_capabilities),
391+
verdict.client_info,
392+
verdict.client_capabilities,
414393
)
415394
dctx = _SingleExchangeDispatchContext(
416395
transport=TransportContext(kind="streamable-http", can_send_request=False, headers=request.headers),

src/mcp/server/connection.py

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
)
4343
from mcp_types import methods as _methods
4444
from mcp_types.version import LATEST_HANDSHAKE_VERSION
45-
from pydantic import BaseModel
45+
from pydantic import BaseModel, ValidationError
4646
from typing_extensions import deprecated
4747

4848
from mcp.shared.dispatcher import CallOptions, Outbound
@@ -68,6 +68,23 @@
6868
}
6969

7070

71+
_ModelT = TypeVar("_ModelT", bound=BaseModel)
72+
73+
74+
def _typed(model: type[_ModelT], raw: Any) -> _ModelT | None:
75+
"""Validate a raw envelope value into a typed model.
76+
77+
A missing, null or mis-shaped value falls through to `ValidationError`
78+
and is treated as not supplied so the request still routes. Spec methods
79+
are separately re-validated by the kernel's per-version params surface,
80+
which types the reserved `_meta` keys strictly.
81+
"""
82+
try:
83+
return model.model_validate(raw, by_name=False)
84+
except ValidationError:
85+
return None
86+
87+
7188
def _notification_params(payload: dict[str, Any] | None, meta: Meta | None) -> dict[str, Any] | None:
7289
if not meta:
7390
return payload
@@ -100,26 +117,18 @@ async def notify(self, method: str, params: Mapping[str, Any] | None, opts: Call
100117
_NO_CHANNEL = _NoChannelOutbound()
101118

102119

103-
class NotifyOnlyOutbound:
120+
class NotifyOnlyOutbound(_NoChannelOutbound):
104121
"""Connection-scoped `Outbound` that forwards notifications and refuses requests.
105122
106123
Installed by `serve_dual_era_loop` for modern (2026-07-28+) connections
107124
over duplex stream transports: the pipe is real, so server notifications
108125
ride it, but the modern protocol forbids server-initiated JSON-RPC
109-
requests, so `send_raw_request` refuses by construction.
126+
requests, so `send_raw_request` (inherited) refuses by construction.
110127
"""
111128

112129
def __init__(self, outbound: Outbound) -> None:
113130
self._outbound = outbound
114131

115-
async def send_raw_request(
116-
self,
117-
method: str,
118-
params: Mapping[str, Any] | None,
119-
opts: CallOptions | None = None,
120-
) -> dict[str, Any]:
121-
raise NoBackChannelError(method)
122-
123132
async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None:
124133
await self._outbound.notify(method, params, opts)
125134

@@ -180,26 +189,34 @@ def __init__(
180189
def from_envelope(
181190
cls,
182191
protocol_version: str,
183-
client_info: Implementation | None,
184-
client_capabilities: ClientCapabilities | None,
192+
client_info: Any,
193+
client_capabilities: Any,
185194
*,
186195
outbound: Outbound = _NO_CHANNEL,
187196
) -> Connection:
188197
"""A born-ready connection populated from a request's `_meta` envelope.
189198
190-
`initialized` is set and the envelope's client info/capabilities (when
191-
both supplied) are recorded as `client_params` so capability checks
192-
work. `outbound` defaults to the no-channel sentinel for the
193-
single-exchange HTTP path; duplex modern transports (e.g. stdio) pass
194-
a notify-only wrapper around the dispatcher so server notifications
195-
ride the pipe while server-initiated requests stay refused.
199+
`protocol_version` must be an already-validated version string - the
200+
inbound classification ladder owns rejecting non-string or unsupported
201+
values. `client_info` and `client_capabilities` are the raw envelope
202+
values: this constructor owns turning them into connection identity,
203+
identically on every modern entry, so a mis-shaped value degrades to
204+
not-supplied rather than failing the request. `initialized`
205+
is set and the info/capabilities (when both supplied and well-formed)
206+
are recorded as `client_params` so capability checks work. `outbound`
207+
defaults to the no-channel sentinel for the single-exchange HTTP path;
208+
duplex modern transports (e.g. stdio) pass a notify-only wrapper
209+
around the dispatcher so server notifications ride the pipe while
210+
server-initiated requests stay refused.
196211
"""
212+
info = _typed(Implementation, client_info)
213+
capabilities = _typed(ClientCapabilities, client_capabilities)
197214
client_params = None
198-
if client_info is not None and client_capabilities is not None:
215+
if info is not None and capabilities is not None:
199216
client_params = InitializeRequestParams(
200217
protocol_version=protocol_version,
201-
capabilities=client_capabilities,
202-
client_info=client_info,
218+
capabilities=capabilities,
219+
client_info=info,
203220
)
204221
connection = cls(outbound, protocol_version=protocol_version, client_params=client_params)
205222
connection.initialized.set()
@@ -230,7 +247,12 @@ def for_loop(
230247
def has_standalone_channel(self) -> bool:
231248
"""Whether this connection has a real back-channel for server-initiated
232249
messages. Derived from `outbound` - the no-channel sentinel is the only
233-
case that doesn't."""
250+
case that doesn't.
251+
252+
Channel presence, not request permission: a modern (2026-07-28+)
253+
duplex connection has a channel that carries notifications while
254+
`send_raw_request` still refuses, because the protocol forbids
255+
server-initiated requests."""
234256
return self.outbound is not _NO_CHANNEL
235257

236258
@property
@@ -255,7 +277,9 @@ async def send_raw_request(
255277
256278
Raises:
257279
MCPError: The peer responded with an error.
258-
NoBackChannelError: `has_standalone_channel` is `False`.
280+
NoBackChannelError: no back-channel for server-initiated requests -
281+
`has_standalone_channel` is `False`, or a modern (2026-07-28+)
282+
connection, where the protocol forbids them.
259283
"""
260284
return await self.outbound.send_raw_request(method, params, opts)
261285

@@ -316,7 +340,9 @@ async def ping(self, *, meta: Meta | None = None, opts: CallOptions | None = Non
316340
317341
Raises:
318342
MCPError: The peer responded with an error.
319-
NoBackChannelError: `has_standalone_channel` is `False`.
343+
NoBackChannelError: no back-channel for server-initiated requests -
344+
`has_standalone_channel` is `False`, or a modern (2026-07-28+)
345+
connection, where the protocol forbids them.
320346
"""
321347
await self.send_raw_request("ping", dump_params(None, meta), opts)
322348

src/mcp/server/lowlevel/server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -691,8 +691,8 @@ async def run(
691691
692692
Thin wrapper over `serve_dual_era_loop`: enters the server lifespan,
693693
then drives the loop, serving the legacy handshake era and the modern
694-
per-request-envelope era (the first era-distinctive message locks the
695-
connection). Transports with their own lifespan owner (the
694+
per-request-envelope era (the first era-distinctive message to succeed
695+
locks the connection). Transports with their own lifespan owner (the
696696
streamable-HTTP manager) call `serve_loop` directly instead.
697697
"""
698698
async with self.lifespan(self) as lifespan_context:

0 commit comments

Comments
 (0)