Add the client-side subscriptions/listen driver#3047
Conversation
client.listen() opens one subscription as an async context manager: entering sends the request and waits for the server's acknowledgment (the honored filter subset is on the handle before the first event), iteration yields the same four typed events the server publishes, a graceful server close simply ends the loop, and an abrupt drop raises SubscriptionLost. Exiting the context ends the subscription with the transport's own cancellation spelling (aborting the request's stream over streamable HTTP, notifications/cancelled on stream transports). There is no replay and no automatic re-listen. Pending events deduplicate - every kind is a level trigger - so the backlog is bounded by the filter's width by construction. The event vocabulary moves to mcp/shared/subscriptions.py (re-exported from the server module) so both sides speak the same types. The session demultiplexes stream frames by the _meta subscription id: acks in the driver's id namespace are consumed (raw escape-hatch listens keep observing theirs through message_handler), change events are delivered after the message_handler tee so cache eviction always completes before an event-triggered refetch can run, and session teardown settles every open route so a watcher task in a sibling task group cannot hang when the client closes. A per-request SSE stream that drops without ever carrying an event id now resolves its request with CONNECTION_CLOSED instead of leaving it pending for the session's lifetime. The legacy subscribe_resource/unsubscribe_resource verbs are deprecated on both Client and ClientSession with a pointer at the replacement.
📚 Documentation preview
|
The docs-example checker pipes snippets to ruff in the platform encoding; an em-dash in a code comment arrives as invalid UTF-8 on Windows runners.
There was a problem hiding this comment.
3 issues found across 23 files
Reply with feedback, questions, or to request a fix.
Fix all with cubic | Re-trigger cubic
Move the client's listen-stream demux to a synchronous notification intercept on the dispatcher's receive path, where cancelled/progress interception already lives: acks, events, and teardown signals now advance in wire order relative to the listen result, so a graceful close can no longer outrun the events that preceded it or clobber the acknowledged filter with the fabricated empty one. Ack consumption keys on the live route registry alone (the per-session id set is gone), route admission enforces the honored filter - loose on URIs, which the spec lets be sub-resources, with a capped backlog settling the route lost against floods - and the response-cache eviction barrier moved to the consumption point, held pending until it completes so a cancelled consumer cannot lose a level trigger. On the transport, a request whose SSE stream can never answer - a non-resumable drop or an exhausted reconnection budget - now resolves with a synthesized error, contained against teardown races. Docs: a graceful close is not "stop watching" (servers shed load by closing gracefully, including this SDK's ListenHandler); the watch loop re-listens on both endings, and sub.honored is the delivery contract, not catalog state.
There was a problem hiding this comment.
2 issues found across 16 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="docs/advanced/subscriptions.md">
<violation number="1" location="docs/advanced/subscriptions.md:91">
P2: The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.</violation>
</file>
<file name="src/mcp/client/subscriptions.py">
<violation number="1" location="src/mcp/client/subscriptions.py:153">
P2: A subscription to one resource URI currently accepts `ResourceUpdated` events for any URI, not just acknowledged ones. In `ListenRoute.deliver`, `ResourceUpdated` admission is `bool(self._honored_uris)`, so once at least one URI is honored, unrelated updates are queued and can wake consumers or even drive backlog-loss on noise. Aligning this branch with URI-based matching (same predicate used server-side) would keep delivery scoped to the acknowledged filter.</violation>
</file>
Reply with feedback, questions, or to request a fix.
Fix all with cubic | Re-trigger cubic
| ``` | ||
|
|
||
| * `client.listen(...)` takes the filter as keyword arguments — they mirror the wire `SubscriptionFilter` field for field. Entering sends the request and returns once the server's acknowledgment arrives, so `sub.honored` (the subset the server agreed to deliver) is always there before the first event. | ||
| * Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one. |
There was a problem hiding this comment.
P2: The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/advanced/subscriptions.md, line 91:
<comment>The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.</comment>
<file context>
@@ -88,9 +88,9 @@ Consuming a subscription is one context manager:
* `client.listen(...)` takes the filter as keyword arguments — they mirror the wire `SubscriptionFilter` field for field. Entering sends the request and returns once the server's acknowledgment arrives, so `sub.honored` (the subset the server agreed to deliver) is always there before the first event.
-* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)`. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
+* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
* Leaving the block ends the subscription, with the transport's own spelling: over streamable HTTP the request's response stream is closed (that is the 2026 cancellation signal), on stream transports `notifications/cancelled` is sent.
-* The stream's two endings are control flow. The server closing gracefully simply ends the `async for`; an abrupt drop raises `SubscriptionLost`. There is no replay and no automatic re-listen — a client that reconnects refetches what it depends on:
</file context>
| * Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one. | |
| * Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)`. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one. |
| if self.end is not None or self.honored is None: | ||
| return | ||
| if isinstance(event, ResourceUpdated): | ||
| admitted = bool(self._honored_uris) |
There was a problem hiding this comment.
P2: A subscription to one resource URI currently accepts ResourceUpdated events for any URI, not just acknowledged ones. In ListenRoute.deliver, ResourceUpdated admission is bool(self._honored_uris), so once at least one URI is honored, unrelated updates are queued and can wake consumers or even drive backlog-loss on noise. Aligning this branch with URI-based matching (same predicate used server-side) would keep delivery scoped to the acknowledged filter.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/mcp/client/subscriptions.py, line 153:
<comment>A subscription to one resource URI currently accepts `ResourceUpdated` events for any URI, not just acknowledged ones. In `ListenRoute.deliver`, `ResourceUpdated` admission is `bool(self._honored_uris)`, so once at least one URI is honored, unrelated updates are queued and can wake consumers or even drive backlog-loss on noise. Aligning this branch with URI-based matching (same predicate used server-side) would keep delivery scoped to the acknowledged filter.</comment>
<file context>
@@ -89,36 +101,71 @@ class SubscriptionLost(RuntimeError):
+ if self.end is not None or self.honored is None:
+ return
+ if isinstance(event, ResourceUpdated):
+ admitted = bool(self._honored_uris)
+ else:
+ admitted = event_matches(self.honored, self._honored_uris, event)
</file context>
Comment-only pass over the branch's additions: keep the non-inferable invariants and motivations (receive-order rationale, sub-resource admission, the wake-snapshot race, peek/commit semantics) at one to three lines each, tighten docstrings to Google style with Raises sections kept, and drop narration the code already states. No code changes.
Stacked on #3046 (the request-id and cancellation plumbing) — review this one for the driver only; the base branch carries the transport work.
The client half of
subscriptions/listen(2026-07-28, SEP-2575), shaped after the ecosystem's own streaming idioms: one context manager,async forconsumption, the acknowledgment as state on the handle rather than an event in the stream.Motivation and Context
On the 2026 wire the response to
subscriptions/listenis the notification stream. The server half shipped in #3035; until now a client had to hand-roll amessage_handler, park a rawsend_requestin a task, and demultiplex_metasubscription ids by hand (see the oldexamples/stories/subscriptions/client.pyfor what that looked like). The driver folds all of it into structured concurrency:sub.honored— the filter subset the server agreed to deliver — is always populated, making the spec's "client SHOULD check the acknowledged filter" a plain attribute read. Pre-ack failures raise from enter (server rejection asMCPError, a stream that dies unacknowledged asSubscriptionLost); nothing degrades silently.mcp/shared/subscriptions.pyand re-exported from the server module (no import breaks). Events are level triggers with no payload beyond identity, so unconsumed duplicates deduplicate and the backlog is bounded by the filter's width by construction — no cap, no overflow policy.async for; a drop raisesSubscriptionLost. No replay, no automatic re-listen — a reconnecting client refetches, which the docs show as a four-line loop.notifications/cancelledis sent on stream transports. The driver itself is transport-blind.Client.listen()on a pre-2026 connection raises a typedListenNotSupportedErrorsteering to the legacy verbs (TypeScript SDK parity), andsubscribe_resource/unsubscribe_resourcenow carry deprecation warnings on bothClientandClientSession, with a migration-guide entry.Session-level mechanics worth a reviewer's attention: acks are consumed only for ids in the driver's minted namespace, so raw escape-hatch listens (driving
send_requestdirectly) still observe their acks throughmessage_handler; delivered events are handed to the consumer after themessage_handlertee, so the caching layer's eviction always completes before an event-triggered refetch can run; and session teardown settles every open route, so a watcher task in a sibling task group getsSubscriptionLostinstead of hanging when the client closes. One transport fix rides along: a per-request SSE stream that drops without ever carrying an event id now resolves its request withCONNECTION_CLOSEDinstead of leaving the waiter pending for the session's lifetime.How Has This Been Tested?
Client: honored filter round-trip, both event kinds, exact-URI filter silence, exit-frees-slot proven by a second listen on the same session, refetch-after-change, and the legacy steer.Breaking Changes
None. The event types moved to
mcp/shared/subscriptions.pybut remain importable frommcp.server.subscriptions; the deprecated subscription verbs keep working against 2025-era servers.Types of changes
Checklist
Additional context
The conformance suite has no client-role listen scenarios — the upstream SEP traceability file explicitly excludes the two client-side obligations (subscription-id correlation is client-internal demux, not wire-observable; the ack check has no wire-observable definition) and the stdio client harness is tracked separately upstream. Coverage here is therefore the interaction suite plus TypeScript-SDK behavioral parity; flagging per the repo's conformance rule so the exclusion is a deliberate acceptance, not an oversight.
Deliberate omissions, for the record: no callbacks layer (the TypeScript SDK routes listen events into its notification-handler registrations; here
message_handlerstill sees every teed frame, so a handler-style consumer remains constructible), no auto-open-at-connect, no auto-re-listen, and noclose()method — the context manager is the lifecycle.