From 8dcf0d55ed6a116fddf7d44fb4f156cc8bd7a23a Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 17 Apr 2026 12:32:06 -0700 Subject: [PATCH 1/2] [world-vercel] Propagate cancel from streams.get to upstream fetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a consumer cancels the ReadableStream returned by streams.get (e.g. an HTTP client hanging up on an endpoint that pipes `run.getReadable()`), the pull loop could continue running and even trigger a fresh reconnect via `connect()` — the new fetch was never tied to the cancellation, so the request kept running in the background. Fix: - Track a `cancelled` flag that `pull` checks before and after each read and before each reconnect. - Plumb an `AbortController.signal` into `fetch` so the in-flight upstream request (including a pending reconnect) is aborted when the consumer cancels. Regression tests cover the abort-signal contract, the mid-timeout reconnect race, and cancel-during-read. Port of #1801 (targeting `stable`). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Peter Wielander --- .changeset/stream-cancel-propagation-main.md | 5 + packages/world-vercel/src/streamer.test.ts | 134 +++++++++++++++++++ packages/world-vercel/src/streamer.ts | 32 ++++- 3 files changed, 169 insertions(+), 2 deletions(-) create mode 100644 .changeset/stream-cancel-propagation-main.md diff --git a/.changeset/stream-cancel-propagation-main.md b/.changeset/stream-cancel-propagation-main.md new file mode 100644 index 0000000000..4e40f90d1b --- /dev/null +++ b/.changeset/stream-cancel-propagation-main.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-vercel': patch +--- + +Propagate consumer cancellation to upstream fetch in `streams.get`. Previously, cancelling a stream (e.g. a client disconnecting from an API endpoint returning `run.getReadable()`) could leave the pull loop reconnecting in the background. diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts index 4666dbfbd2..29ab6152ba 100644 --- a/packages/world-vercel/src/streamer.test.ts +++ b/packages/world-vercel/src/streamer.test.ts @@ -503,3 +503,137 @@ describe('writeMulti pagination', () => { ]); }); }); + +// Regression tests for consumer-cancel propagation on streams.get. +// +// Before the fix, cancelling the ReadableStream returned by streams.get() +// only called reader.cancel() on the currently-captured fetch body reader. +// The pull loop kept running, and if the upstream had emitted a timeout +// control frame, pull would happily call connect() again and keep reading +// — so a consumer disconnect (e.g. an HTTP client hanging up on +// `run.getReadable()`) would leave the server still fetching in the +// background. +describe('streams.get consumer cancel', () => { + async function getStreamer() { + const { createStreamer } = await import('./streamer.js'); + return createStreamer(); + } + + function chunkedStream(chunks: Uint8Array[]): ReadableStream { + let i = 0; + return new ReadableStream({ + pull(controller) { + if (i < chunks.length) { + controller.enqueue(chunks[i++]); + } else { + controller.close(); + } + }, + }); + } + + function streamResponse(...chunks: Uint8Array[]): Response { + return new Response(chunkedStream(chunks), { + status: 200, + headers: { 'Content-Type': 'application/octet-stream' }, + }); + } + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('aborts the in-flight upstream fetch via AbortSignal', async () => { + const capturedSignals: (AbortSignal | null | undefined)[] = []; + let resolveFirstFetch: (r: Response) => void; + const firstFetchPromise = new Promise((resolve) => { + resolveFirstFetch = resolve; + }); + vi.spyOn(globalThis, 'fetch').mockImplementation(async (_url, init) => { + capturedSignals.push(init?.signal); + return firstFetchPromise; + }); + + const streamer = await getStreamer(); + // streams.get kicks off the fetch synchronously via connect(). + const streamPromise = streamer.streams.get('run-1', 'my-stream'); + // Give connect() a tick to start the fetch. + await new Promise((r) => setTimeout(r, 0)); + + // Resolve the fetch so the ReadableStream construction can complete. + resolveFirstFetch!(streamResponse(new Uint8Array(0))); + const stream = await streamPromise; + + await stream.cancel(); + + expect(capturedSignals.length).toBeGreaterThan(0); + const signal = capturedSignals[0]; + expect(signal).toBeDefined(); + expect(signal?.aborted).toBe(true); + }); + + it('does not reconnect after the consumer cancels mid-timeout', async () => { + const chunk1 = new TextEncoder().encode('hello, world'); + const timeout = buildControlFrame(false, 1); + + let fetchCount = 0; + vi.spyOn(globalThis, 'fetch').mockImplementation(async (_url, init) => { + fetchCount++; + if (fetchCount === 1) { + return streamResponse(chunk1, timeout); + } + // A reconnect should never happen after cancel. If it does, hang + // indefinitely so the test can assert fetchCount === 1. + return new Promise((_, reject) => { + init?.signal?.addEventListener('abort', () => + reject(new DOMException('aborted', 'AbortError')) + ); + }); + }); + + const streamer = await getStreamer(); + const stream = await streamer.streams.get('run-1', 'my-stream'); + const reader = stream.getReader(); + + // Drain the data bytes — this forces pull to observe the upstream + // close and enter the reconnect branch. + const first = await reader.read(); + expect(first.done).toBe(false); + + // Cancel while (or right after) pull is attempting to reconnect. + await reader.cancel(); + + // Give any lingering pull work a chance to misbehave. + await new Promise((r) => setTimeout(r, 50)); + + // Only the initial connection should ever have been made. + expect(fetchCount).toBe(1); + }); + + it('cancels the active reader when cancel is called during a read', async () => { + // Upstream body that hangs — so pull is blocked in reader.read() when + // cancel arrives. A correct implementation cancels the body reader and + // lets pull exit cleanly; the broken one would continue spinning. + const hangingBody = new ReadableStream({ + start() { + // Never enqueue or close; we want pull to be stuck in read(). + }, + }); + vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce( + new Response(hangingBody, { status: 200 }) + ); + + const streamer = await getStreamer(); + const stream = await streamer.streams.get('run-1', 'my-stream'); + const reader = stream.getReader(); + + // Schedule a read so pull starts and gets parked in reader.read(). + const readPromise = reader.read(); + await new Promise((r) => setTimeout(r, 10)); + + await reader.cancel(); + + const result = await readPromise; + expect(result.done).toBe(true); + }); +}); diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 259f441d4f..4cbae3a01d 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -262,6 +262,14 @@ export function createStreamer(config?: APIConfig): Streamer { const MAX_RECONNECTS = 50; let reconnectCount = 0; + // Propagates consumer cancellation all the way to the upstream fetch. + // Without this, a client disconnect mid-reconnect would leave the new + // fetch untouched and the pull loop happily reading from it, so the + // caller (e.g. an API endpoint returning run.getReadable()) would keep + // running well after its own client hung up. + let cancelled = false; + const abortController = new AbortController(); + const connect = async (): Promise< ReadableStreamDefaultReader > => { @@ -270,6 +278,7 @@ export function createStreamer(config?: APIConfig): Streamer { url.searchParams.set('startIndex', String(currentStartIndex)); const response = await fetch(url, { headers: httpConfig.headers, + signal: abortController.signal, }); if (!response.ok) { throw new Error(`Failed to fetch stream: ${response.status}`); @@ -289,10 +298,15 @@ export function createStreamer(config?: APIConfig): Streamer { return new ReadableStream({ pull: async (controller) => { for (;;) { + if (cancelled) return; + let result: { done: boolean; value?: Uint8Array }; try { result = await reader.read(); } catch (err) { + // A cancel during an in-flight read can surface here as an + // AbortError — swallow it, since the consumer asked for it. + if (cancelled) return; // Network error — not a clean close. Forward any buffered // data and propagate the error so consumers know the stream // was truncated. @@ -304,6 +318,8 @@ export function createStreamer(config?: APIConfig): Streamer { return; } + if (cancelled) return; + if (!result.done) { // Append new data to tail buffer, forward everything except // the last STREAM_CONTROL_FRAME_SIZE bytes. @@ -351,7 +367,13 @@ export function createStreamer(config?: APIConfig): Streamer { return; } currentStartIndex = control.nextIndex; - reader = await connect(); + try { + reader = await connect(); + } catch (err) { + if (cancelled) return; + controller.error(err); + return; + } continue; } @@ -366,7 +388,13 @@ export function createStreamer(config?: APIConfig): Streamer { } }, cancel: async () => { - await reader.cancel(); + cancelled = true; + abortController.abort(); + // Best-effort cancel of the current reader. `reader` is a closure + // variable, so this sees whichever connection is active right now; + // the `cancelled` flag guards against a concurrent reconnect + // swapping in a fresh reader after this runs. + await reader.cancel().catch(() => {}); }, }); }, From 9de92719beac6325a60a1879faaccdbc76d120de Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 17 Apr 2026 12:34:10 -0700 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Peter Wielander Signed-off-by: Peter Wielander --- .changeset/stream-cancel-propagation-main.md | 2 +- packages/world-vercel/src/streamer.ts | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.changeset/stream-cancel-propagation-main.md b/.changeset/stream-cancel-propagation-main.md index 4e40f90d1b..9322616a54 100644 --- a/.changeset/stream-cancel-propagation-main.md +++ b/.changeset/stream-cancel-propagation-main.md @@ -2,4 +2,4 @@ '@workflow/world-vercel': patch --- -Propagate consumer cancellation to upstream fetch in `streams.get`. Previously, cancelling a stream (e.g. a client disconnecting from an API endpoint returning `run.getReadable()`) could leave the pull loop reconnecting in the background. +Propagate consumer cancellation to upstream fetch in `streams.get` diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 4cbae3a01d..f490da47d9 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -394,7 +394,9 @@ export function createStreamer(config?: APIConfig): Streamer { // variable, so this sees whichever connection is active right now; // the `cancelled` flag guards against a concurrent reconnect // swapping in a fresh reader after this runs. - await reader.cancel().catch(() => {}); + await reader.cancel().catch((err) => { + console.warn('Failed to cancel reader:', err); + }); }, }); },