Summary
run_agent wraps AgentSideConnection.listen() with no try/finally. When the calling task is cancelled mid-listen(), Connection.close() is never invoked, leaving the MessageSender._loop and MessageDispatcher._run tasks the Connection's TaskSupervisor registered orphaned on the event loop. Python's GC later destroys them while still pending and reports:
ERROR asyncio: Task was destroyed but it is pending!
task: <Task pending name='acp.Sender.loop' …>
ERROR asyncio: Task was destroyed but it is pending!
task: <Task pending name='acp.Dispatcher.loop' …>
ERROR root: Send loop failed
RuntimeError: cannot reuse already awaited coroutine
Reproduction
Any pattern where the caller cancels run_agent mid-flight reproduces this. The minimal case I have is a WebSocket bridge that schedules run_agent as a task and races it against peer tasks via asyncio.wait(FIRST_COMPLETED). When the peer closes the WebSocket, my bridge cancels agent_task — straightforward cancellation that ought to result in clean shutdown but instead leaks the Connection's internal tasks. (Real-world trigger: a backend health-checker that opens a fresh WebSocket every N seconds, sends initialize, and closes.)
I verified the chain by tracing through the package:
run_agent (in acp/core.py:38-72) constructs AgentSideConnection(..., listening=False) then does a bare await conn.listen().
AgentSideConnection.listen (in acp/agent/connection.py) is just await self._conn.main_loop().
Connection.main_loop (in acp/connection.py:119) is await self._receive_loop().
- None of the layers between
run_agent and _receive_loop has a try/finally. A cancellation propagates straight through.
- The Connection's
__init__ (line 88, 105 in acp/connection.py) registered MessageSender._loop and MessageDispatcher._run with TaskSupervisor. These are orphaned by the cancellation because Connection.close() (line 109) — which would call _dispatcher.stop(), _sender.close(), and _tasks.shutdown() — is never reached.
Proposed fix
Two options, in order of preference:
Option 1: fix run_agent itself
Wrap the await conn.listen() in acp/core.py:38-72:
async def run_agent(agent, input_stream=None, output_stream=None, *, …):
…
conn = AgentSideConnection(agent, input_stream, output_stream, listening=False, …)
try:
await conn.listen()
finally:
await asyncio.shield(conn._conn.close())
asyncio.shield keeps the close coroutine running even if run_agent itself is being cancelled — without it, the awaits in Connection.close() would re-raise CancelledError before the supervised tasks finish shutting down.
Option 2: expose a public close API on AgentSideConnection
Add close() / __aexit__ to AgentSideConnection that delegates to self._conn.close(), then either (a) keep run_agent as-is and let callers use async with AgentSideConnection(...), or (b) apply Option 1 too. Either way, callers who don't want run_agent's convenience can do their own cleanup without reaching into _conn.
I'd argue for both: Option 1 keeps run_agent self-contained, and Option 2 unblocks callers that want to construct the connection manually.
My downstream workaround
In jaegertracing/jaeger (commit) I'm shipping a wrapper that re-implements run_agent's essentials:
async def _run_agent_with_cleanup(agent, agent_writer, agent_reader):
conn = AgentSideConnection(agent, agent_writer, agent_reader, listening=False)
try:
await conn.listen()
finally:
try:
await asyncio.shield(conn._conn.close()) # noqa: SLF001
except asyncio.CancelledError:
raise
This works but reaches into _conn (private) and won't track future additions to run_agent's signature (e.g., **connection_kwargs, use_unstable_protocol). Happy to switch to whatever you ship.
Environment
- Python 3.14
agent-client-protocol 0.8.1 (latest on PyPI as of writing)
Regression test idea
A test that calls asyncio.create_task(run_agent(…)) against a stub stream pair, cancels the task, and then gc.collect()s, asserting the asyncio loop's exception handler captured no "Task was destroyed but it is pending!" entries.
Summary
run_agentwrapsAgentSideConnection.listen()with notry/finally. When the calling task is cancelled mid-listen(),Connection.close()is never invoked, leaving theMessageSender._loopandMessageDispatcher._runtasks the Connection'sTaskSupervisorregistered orphaned on the event loop. Python's GC later destroys them while still pending and reports:Reproduction
Any pattern where the caller cancels
run_agentmid-flight reproduces this. The minimal case I have is a WebSocket bridge that schedulesrun_agentas a task and races it against peer tasks viaasyncio.wait(FIRST_COMPLETED). When the peer closes the WebSocket, my bridge cancelsagent_task— straightforward cancellation that ought to result in clean shutdown but instead leaks the Connection's internal tasks. (Real-world trigger: a backend health-checker that opens a fresh WebSocket every N seconds, sendsinitialize, and closes.)I verified the chain by tracing through the package:
run_agent(inacp/core.py:38-72) constructsAgentSideConnection(..., listening=False)then does a bareawait conn.listen().AgentSideConnection.listen(inacp/agent/connection.py) is justawait self._conn.main_loop().Connection.main_loop(inacp/connection.py:119) isawait self._receive_loop().run_agentand_receive_loophas atry/finally. A cancellation propagates straight through.__init__(line 88, 105 inacp/connection.py) registeredMessageSender._loopandMessageDispatcher._runwithTaskSupervisor. These are orphaned by the cancellation becauseConnection.close()(line 109) — which would call_dispatcher.stop(),_sender.close(), and_tasks.shutdown()— is never reached.Proposed fix
Two options, in order of preference:
Option 1: fix
run_agentitselfWrap the
await conn.listen()inacp/core.py:38-72:asyncio.shieldkeeps the close coroutine running even ifrun_agentitself is being cancelled — without it, the awaits inConnection.close()would re-raiseCancelledErrorbefore the supervised tasks finish shutting down.Option 2: expose a public close API on
AgentSideConnectionAdd
close()/__aexit__toAgentSideConnectionthat delegates toself._conn.close(), then either (a) keeprun_agentas-is and let callers useasync with AgentSideConnection(...), or (b) apply Option 1 too. Either way, callers who don't wantrun_agent's convenience can do their own cleanup without reaching into_conn.I'd argue for both: Option 1 keeps
run_agentself-contained, and Option 2 unblocks callers that want to construct the connection manually.My downstream workaround
In
jaegertracing/jaeger(commit) I'm shipping a wrapper that re-implementsrun_agent's essentials:This works but reaches into
_conn(private) and won't track future additions torun_agent's signature (e.g.,**connection_kwargs,use_unstable_protocol). Happy to switch to whatever you ship.Environment
agent-client-protocol0.8.1 (latest on PyPI as of writing)Regression test idea
A test that calls
asyncio.create_task(run_agent(…))against a stub stream pair, cancels the task, and thengc.collect()s, asserting the asyncio loop's exception handler captured no "Task was destroyed but it is pending!" entries.