Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/advanced/subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,44 @@ Down on the low-level `Server` there is no pre-wired anything — and the same p
* `ListenHandler(bus)` is the same handler `MCPServer` registers; `on_subscriptions_listen=` is an ordinary handler slot. Don't want the SDK's semantics? Write your own handler for the slot — the spec obligations come with it.
* `ListenHandler.close()` gracefully ends every open stream: each one receives the listen request's result as its final frame, the spec's signal that the server ended the subscription deliberately — a clean end, as opposed to the abrupt drop a client may treat as a cue to reconnect. Without it, streams end when the client disconnects.

## The client side

Consuming a subscription is one context manager:

```python title="client.py" hl_lines="9 10"
--8<-- "docs_src/subscriptions/tutorial003.py"
```

* `client.listen(...)` takes the filter as keyword arguments — they mirror the wire `SubscriptionFilter` field for field. Entering sends the request and returns once the server's acknowledgment arrives, so `sub.honored` (the subset the server agreed to deliver) is always there before the first event.
* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.

@cubic-dev-ai cubic-dev-ai Bot Jul 1, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/advanced/subscriptions.md, line 91:

<comment>The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.</comment>

<file context>
@@ -88,9 +88,9 @@ Consuming a subscription is one context manager:
 
 * `client.listen(...)` takes the filter as keyword arguments — they mirror the wire `SubscriptionFilter` field for field. Entering sends the request and returns once the server's acknowledgment arrives, so `sub.honored` (the subset the server agreed to deliver) is always there before the first event.
-* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)`. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
+* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
 * Leaving the block ends the subscription, with the transport's own spelling: over streamable HTTP the request's response stream is closed (that is the 2026 cancellation signal), on stream transports `notifications/cancelled` is sent.
-* The stream's two endings are control flow. The server closing gracefully simply ends the `async for`; an abrupt drop raises `SubscriptionLost`. There is no replay and no automatic re-listen — a client that reconnects refetches what it depends on:
</file context>
Suggested change
* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)`. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
Fix with cubic

* Leaving the block ends the subscription, with the transport's own spelling: over streamable HTTP the request's response stream is closed (that is the 2026 cancellation signal), on stream transports `notifications/cancelled` is sent.
* The stream's two endings are control flow. The server closing gracefully simply ends the `async for`; an abrupt drop raises `SubscriptionLost`. The distinction is diagnostic — a clean end versus a connection worth suspecting — not a difference in what to do next: either way the stream is gone, nothing is replayed, and a watcher that still cares re-listens and refetches. Servers close streams gracefully for their own reasons — shutdown, or shedding a subscriber whose backlog grew past bounds, as this SDK's `ListenHandler` does — so a graceful close is not a signal to stop watching:

```python
async def watch(client: Client, uri: str) -> None:
while True:
try:
async with client.listen(resource_subscriptions=[uri]) as sub:
await client.read_resource(uri) # refetch: no replay across streams
async for _event in sub:
await client.read_resource(uri)
except SubscriptionLost:
pass
# Graceful close or abrupt drop, the stream is gone either way. Back
# off before re-listening - a graceful close may be the server
# shedding load, and reconnecting instantly recreates the pressure.
await anyio.sleep(1)
```

* Checking the acknowledgment (the spec's client SHOULD) is reading `sub.honored` — the kinds this stream will actually receive. A server may narrow the filter it agrees to honor (a multi-tenant server declining a URI, say), and `sub.honored` is that delivery contract — it says nothing about what exists in the catalog. Multiple subscriptions may be open concurrently; each demultiplexes by its own subscription id.
* Tool calls and other requests run freely beside an open stream — from the same task between events, or from sibling tasks sharing the client. A watcher task that refetches inside its event loop is the intended pattern, not a re-entrancy hazard.
* `listen()` requires a 2026-07-28 connection and raises `ListenNotSupportedError` on older ones, steering to the deprecated `subscribe_resource` and `message_handler` spelling those wires use.

## Recap

* A client opts in with one `subscriptions/listen` request; the response is the stream. There is nothing to configure server-side — serving it is built in.
* You publish: `await ctx.notify_resource_updated(uri)`, `notify_tools_changed()`, `notify_prompts_changed()`, `notify_resources_changed()`. Idle servers make these free.
* Streams receive only what their filter requested; URIs match exactly; nothing is replayed.
* Scaling out means implementing `SubscriptionBus` — two methods — over your own pub/sub, and passing it as `MCPServer(subscriptions=...)`.
* The low-level spelling is the same machinery held in your hands: a bus, `ListenHandler(bus)`, one constructor argument.
* Consuming is `async with client.listen(...)` and `async for event in sub` — typed events, honored filter on the handle, clean end vs `SubscriptionLost`.
17 changes: 17 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,23 @@ Tasks are expected to return as a separate MCP extension in a future release.

## Deprecations

### Client resource-subscription methods deprecated (SEP-2575)

[SEP-2575](https://github.com/modelcontextprotocol/modelcontextprotocol/issues/2575) removes `resources/subscribe` and `resources/unsubscribe` from the 2026-07-28 wire; per-URI subscriptions travel in the `subscriptions/listen` filter instead. The client verbs now carry `typing_extensions.deprecated`:

- `Client.subscribe_resource()` / `Client.unsubscribe_resource()`
- `ClientSession.subscribe_resource()` / `ClientSession.unsubscribe_resource()`

They keep working against 2025-era servers; a 2026-07-28 server answers them with `-32601` (method not found). Migrate to the listen driver:

```python
async with client.listen(resource_subscriptions=["note://todo"]) as sub:
async for event in sub: # ResourceUpdated(uri="note://todo")
...
```

See the [Subscriptions](advanced/subscriptions.md) page for the full client-side contract (typed events, the honored filter, clean end vs `SubscriptionLost`).

### Roots, Sampling, and Logging methods deprecated (SEP-2577)

[SEP-2577](https://github.com/modelcontextprotocol/modelcontextprotocol/pull/2577) deprecates the Roots, Sampling, and Logging features as of the 2026-07-28 spec. The deprecation is advisory only: there are no wire-level changes, capability negotiation is unchanged, and every method keeps working for sessions negotiating 2025-11-25 and earlier.
Expand Down
14 changes: 14 additions & 0 deletions docs_src/subscriptions/tutorial003.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from mcp import Client
from mcp.client.subscriptions import ResourceUpdated

from .tutorial001 import mcp


async def watch_todo() -> str:
"""Wait for the todo note to change once, then stop listening."""
async with Client(mcp) as client:
async with client.listen(resource_subscriptions=["note://todo"]) as sub:
async for event in sub:
assert isinstance(event, ResourceUpdated)
return f"changed: {event.uri}"
return "the server closed the stream before any change"
80 changes: 13 additions & 67 deletions examples/stories/subscriptions/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,88 +4,34 @@
import mcp_types as types

from mcp.client import Client
from mcp.client.subscriptions import ResourceUpdated, ToolsListChanged
from stories._harness import Target, run_client

SUBSCRIPTION_ID = "io.modelcontextprotocol/subscriptionId"


async def main(target: Target, *, mode: str = "auto") -> None:
# Stream frames arrive as ordinary server notifications; `message_handler`
# is constructor-only on `Client`, so the list it fills exists first.
received: list[types.ServerNotification] = []
arrival = anyio.Event()

async def on_message(message: object) -> None:
nonlocal arrival
if isinstance(
message,
types.SubscriptionsAcknowledgedNotification
| types.ResourceUpdatedNotification
| types.ToolListChangedNotification,
):
received.append(message)
arrival.set()
arrival = anyio.Event()

async def wait_for(count: int) -> None:
with anyio.fail_after(10):
while len(received) < count:
await arrival.wait()

async with Client(target, mode=mode, message_handler=on_message) as client:
async with Client(target, mode=mode) as client:
before = await client.list_tools()
assert "search" not in {tool.name for tool in before.tools}

async with anyio.create_task_group() as tg:
# There is no client-side listen API yet, so the story drops to the
# `client.session` escape hatch. The request parks for the stream's
# lifetime, so it runs as a task; cancelling it releases the local
# awaiting scope. In-memory that also ends the server's stream; over
# HTTP today nothing aborts the POST, so the server-side stream ends
# when the connection closes (the `Client` exit right below).
async def listen() -> None:
request = types.SubscriptionsListenRequest(
params=types.SubscriptionsListenRequestParams(
notifications=types.SubscriptionFilter(
tools_list_changed=True, resource_subscriptions=["note://todo"]
)
)
)
await client.session.send_request(request, types.SubscriptionsListenResult)

tg.start_soon(listen)

# ── the ack is the first frame: it echoes the honored filter, tagged ──
await wait_for(1)
ack = received[0]
assert isinstance(ack, types.SubscriptionsAcknowledgedNotification), ack
assert ack.params.notifications.tools_list_changed is True
assert ack.params.notifications.resource_subscriptions == ["note://todo"]
assert ack.params.meta is not None and SUBSCRIPTION_ID in ack.params.meta
async with client.listen(tools_list_changed=True, resource_subscriptions=["note://todo"]) as sub:
# ── entering waited for the ack: the honored filter is already in hand ──
assert sub.honored.tools_list_changed is True
assert sub.honored.resource_subscriptions == ["note://todo"]

# ── exact-URI filtering: an unsubscribed note edit stays silent ──
await client.call_tool("edit_note", {"name": "journal", "text": "day two"})
# ── the subscribed URI delivers, carrying the same subscription id ──
# ── the subscribed URI delivers ──
await client.call_tool("edit_note", {"name": "todo", "text": "water plants"})
await wait_for(2)
updated = received[1]
assert isinstance(updated, types.ResourceUpdatedNotification), updated
assert updated.params.uri == "note://todo"
assert updated.params.meta is not None
assert updated.params.meta[SUBSCRIPTION_ID] == ack.params.meta[SUBSCRIPTION_ID]
assert len(received) == 2, "the journal edit must not have been delivered"
with anyio.fail_after(10):
event = await anext(sub)
assert event == ResourceUpdated(uri="note://todo"), "the journal edit must not have been delivered"

# ── a runtime tool registration announces itself ──
await client.call_tool("enable_search", {})
await wait_for(3)
assert isinstance(received[2], types.ToolListChangedNotification), received[2]

# The client is done listening: cancel the parked request and let
# the connection teardown below end the stream server-side.
tg.cancel_scope.cancel()
with anyio.fail_after(10):
assert await anext(sub) == ToolsListChanged()

# list_changed told us to re-fetch - the new tool is callable, and the
# session outlives the closed stream.
# ── leaving the block closed the stream; the session lives on ──
tools = await client.list_tools()
assert "search" in {tool.name for tool in tools.tools}
result = await client.call_tool("search", {"query": "water"})
Expand Down
68 changes: 63 additions & 5 deletions src/mcp/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import uuid
from collections.abc import Awaitable, Callable, Mapping, Sequence
from contextlib import AsyncExitStack
from contextlib import AbstractAsyncContextManager, AsyncExitStack
from dataclasses import KW_ONLY, dataclass, field
from typing import Any, Literal, TypeVar, cast

Expand Down Expand Up @@ -58,6 +58,8 @@
SamplingFnT,
)
from mcp.client.streamable_http import streamable_http_client
from mcp.client.subscriptions import ServerEvent, Subscription
from mcp.client.subscriptions import listen as _listen
from mcp.server import Server
from mcp.server.mcpserver import MCPServer
from mcp.server.runner import modern_on_request
Expand All @@ -67,6 +69,7 @@
from mcp.shared.extension import validate_extension_identifier
from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher
from mcp.shared.session import RequestResponder
from mcp.shared.subscriptions import event_to_notification

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -662,13 +665,68 @@ async def retry(r: InputResponses | None, s: str | None) -> ReadResourceResult |
# Driver rounds carry inputResponses, so a terminal result reached through them is never cached (spec MUST).
return await self._drive_input_required(first, retry)

def listen(
self,
*,
tools_list_changed: bool = False,
prompts_list_changed: bool = False,
resources_list_changed: bool = False,
resource_subscriptions: Sequence[str] = (),
) -> AbstractAsyncContextManager[Subscription]:
"""Open a `subscriptions/listen` stream of typed change events (2026-07-28 only).

Keyword args mirror the wire `SubscriptionFilter`; entering waits for the ack (honored subset: `sub.honored`):

async with client.listen(tools_list_changed=True) as sub:
async for event in sub:
tools = await client.list_tools() # refetch on change

A graceful close ends the loop; an abrupt drop raises `SubscriptionLost`. No replay: re-listen and refetch.

Raises:
ListenNotSupportedError: The negotiated protocol version predates 2026-07-28.
MCPError: The server rejected the request or the connection failed first.
SubscriptionLost: The stream ended before it was acknowledged.
TimeoutError: The read timeout elapsed before the acknowledgment.
"""
return _listen(
self.session,
tools_list_changed=tools_list_changed,
prompts_list_changed=prompts_list_changed,
resources_list_changed=resources_list_changed,
resource_subscriptions=resource_subscriptions,
on_event=self._evict_for_listen_event if self._response_cache is not None else None,
)

async def _evict_for_listen_event(self, event: ServerEvent) -> None:
"""Finish response-cache eviction before a listen consumer can refetch.

Without it the iterator wakes first and refetches a still-warm entry, with no
corrective wake (events are deduplicated level triggers). The tee path repeats
the eviction; deliberate: idempotent, and it covers non-iterating consumers.
"""
cache = self._response_cache
assert cache is not None # installed as the event barrier only when a cache exists
try:
await cache.evict_for_notification(event_to_notification(event, {}))
except Exception: # boundary: eviction reaches user store code; a cache fault must not block delivery
logger.exception("Response cache eviction failed; the event is still delivered")

@deprecated(
"resources/subscribe is removed as of 2026-07-28; use Client.listen() instead.",
category=MCPDeprecationWarning,
)
async def subscribe_resource(self, uri: str, *, meta: RequestParamsMeta | None = None) -> EmptyResult:
"""Subscribe to resource updates."""
return await self.session.subscribe_resource(uri, meta=meta)
"""Subscribe to resource updates (2025-era servers only)."""
return await self.session.subscribe_resource(uri, meta=meta) # pyright: ignore[reportDeprecated]

@deprecated(
"resources/unsubscribe is removed as of 2026-07-28; use Client.listen() instead.",
category=MCPDeprecationWarning,
)
async def unsubscribe_resource(self, uri: str, *, meta: RequestParamsMeta | None = None) -> EmptyResult:
"""Unsubscribe from resource updates."""
return await self.session.unsubscribe_resource(uri, meta=meta)
"""Unsubscribe from resource updates (2025-era servers only)."""
return await self.session.unsubscribe_resource(uri, meta=meta) # pyright: ignore[reportDeprecated]

async def call_tool(
self,
Expand Down
Loading
Loading