Skip to content

run_agent leaks MessageSender / MessageDispatcher tasks when cancelled #108

@yurishkuro

Description

@yurishkuro

Summary

run_agent wraps AgentSideConnection.listen() with no try/finally. When the calling task is cancelled mid-listen(), Connection.close() is never invoked, leaving the MessageSender._loop and MessageDispatcher._run tasks the Connection's TaskSupervisor registered orphaned on the event loop. Python's GC later destroys them while still pending and reports:

ERROR asyncio: Task was destroyed but it is pending!
  task: <Task pending name='acp.Sender.loop' …>
ERROR asyncio: Task was destroyed but it is pending!
  task: <Task pending name='acp.Dispatcher.loop' …>
ERROR root: Send loop failed
  RuntimeError: cannot reuse already awaited coroutine

Reproduction

Any pattern where the caller cancels run_agent mid-flight reproduces this. The minimal case I have is a WebSocket bridge that schedules run_agent as a task and races it against peer tasks via asyncio.wait(FIRST_COMPLETED). When the peer closes the WebSocket, my bridge cancels agent_task — straightforward cancellation that ought to result in clean shutdown but instead leaks the Connection's internal tasks. (Real-world trigger: a backend health-checker that opens a fresh WebSocket every N seconds, sends initialize, and closes.)

I verified the chain by tracing through the package:

  1. run_agent (in acp/core.py:38-72) constructs AgentSideConnection(..., listening=False) then does a bare await conn.listen().
  2. AgentSideConnection.listen (in acp/agent/connection.py) is just await self._conn.main_loop().
  3. Connection.main_loop (in acp/connection.py:119) is await self._receive_loop().
  4. None of the layers between run_agent and _receive_loop has a try/finally. A cancellation propagates straight through.
  5. The Connection's __init__ (line 88, 105 in acp/connection.py) registered MessageSender._loop and MessageDispatcher._run with TaskSupervisor. These are orphaned by the cancellation because Connection.close() (line 109) — which would call _dispatcher.stop(), _sender.close(), and _tasks.shutdown() — is never reached.

Proposed fix

Two options, in order of preference:

Option 1: fix run_agent itself

Wrap the await conn.listen() in acp/core.py:38-72:

async def run_agent(agent, input_stream=None, output_stream=None, *, …):
    …
    conn = AgentSideConnection(agent, input_stream, output_stream, listening=False, …)
    try:
        await conn.listen()
    finally:
        await asyncio.shield(conn._conn.close())

asyncio.shield keeps the close coroutine running even if run_agent itself is being cancelled — without it, the awaits in Connection.close() would re-raise CancelledError before the supervised tasks finish shutting down.

Option 2: expose a public close API on AgentSideConnection

Add close() / __aexit__ to AgentSideConnection that delegates to self._conn.close(), then either (a) keep run_agent as-is and let callers use async with AgentSideConnection(...), or (b) apply Option 1 too. Either way, callers who don't want run_agent's convenience can do their own cleanup without reaching into _conn.

I'd argue for both: Option 1 keeps run_agent self-contained, and Option 2 unblocks callers that want to construct the connection manually.

My downstream workaround

In jaegertracing/jaeger (commit) I'm shipping a wrapper that re-implements run_agent's essentials:

async def _run_agent_with_cleanup(agent, agent_writer, agent_reader):
    conn = AgentSideConnection(agent, agent_writer, agent_reader, listening=False)
    try:
        await conn.listen()
    finally:
        try:
            await asyncio.shield(conn._conn.close())  # noqa: SLF001
        except asyncio.CancelledError:
            raise

This works but reaches into _conn (private) and won't track future additions to run_agent's signature (e.g., **connection_kwargs, use_unstable_protocol). Happy to switch to whatever you ship.

Environment

  • Python 3.14
  • agent-client-protocol 0.8.1 (latest on PyPI as of writing)

Regression test idea

A test that calls asyncio.create_task(run_agent(…)) against a stub stream pair, cancels the task, and then gc.collect()s, asserting the asyncio loop's exception handler captured no "Task was destroyed but it is pending!" entries.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions