diff --git a/.changeset/stream-cancel-propagation.md b/.changeset/stream-cancel-propagation.md new file mode 100644 index 0000000000..b14cc31f0c --- /dev/null +++ b/.changeset/stream-cancel-propagation.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-vercel': patch +--- + +Propagate consumer cancellation to upstream fetch in `readFromStream` diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts index 21b587e5e5..809ea8aecd 100644 --- a/packages/world-vercel/src/streamer.test.ts +++ b/packages/world-vercel/src/streamer.test.ts @@ -512,4 +512,109 @@ describe('readFromStream reconnection', () => { await reader.cancel(); }); + + // Regression tests for consumer-cancel propagation. + // + // Before the fix, cancelling the ReadableStream returned by + // readFromStream() only called reader.cancel() on the currently-captured + // fetch body reader. The pull loop kept running, and if the upstream had + // emitted a timeout control frame, pull would happily call connect() + // again and keep reading — so a consumer disconnect (e.g. an HTTP client + // hanging up on `run.getReadable()`) would leave the server still fetching + // in the background. + describe('consumer cancel', () => { + it('aborts the in-flight upstream fetch via AbortSignal', async () => { + const capturedSignals: (AbortSignal | null | undefined)[] = []; + let resolveFirstFetch: (r: Response) => void; + const firstFetchPromise = new Promise((resolve) => { + resolveFirstFetch = resolve; + }); + vi.spyOn(globalThis, 'fetch').mockImplementation(async (_url, init) => { + capturedSignals.push(init?.signal); + return firstFetchPromise; + }); + + const streamer = await getStreamer(); + // readFromStream kicks off the fetch synchronously via connect(). + const streamPromise = streamer.readFromStream('strm_test'); + // Give connect() a tick to start the fetch. + await new Promise((r) => setTimeout(r, 0)); + + // Resolve the fetch so the ReadableStream construction can complete. + resolveFirstFetch!(streamResponse(new Uint8Array(0))); + const stream = await streamPromise; + + await stream.cancel(); + + expect(capturedSignals.length).toBeGreaterThan(0); + const signal = capturedSignals[0]; + expect(signal).toBeDefined(); + expect(signal?.aborted).toBe(true); + }); + + it('does not reconnect after the consumer cancels mid-timeout', async () => { + const chunk1 = new TextEncoder().encode('hello, world'); + const timeout = buildControlFrame(false, 1); + + let fetchCount = 0; + vi.spyOn(globalThis, 'fetch').mockImplementation(async (_url, init) => { + fetchCount++; + if (fetchCount === 1) { + return streamResponse(chunk1, timeout); + } + // A reconnect should never happen after cancel. If it does, hang + // indefinitely so the test can assert fetchCount === 1. + return new Promise((_, reject) => { + init?.signal?.addEventListener('abort', () => + reject(new DOMException('aborted', 'AbortError')) + ); + }); + }); + + const streamer = await getStreamer(); + const stream = await streamer.readFromStream('strm_test'); + const reader = stream.getReader(); + + // Drain the data bytes — this forces pull to observe the upstream + // close and enter the reconnect branch. + const first = await reader.read(); + expect(first.done).toBe(false); + + // Cancel while (or right after) pull is attempting to reconnect. + await reader.cancel(); + + // Give any lingering pull work a chance to misbehave. + await new Promise((r) => setTimeout(r, 50)); + + // Only the initial connection should ever have been made. + expect(fetchCount).toBe(1); + }); + + it('cancels the active reader when cancel is called during a read', async () => { + // Upstream body that hangs — so pull is blocked in reader.read() when + // cancel arrives. A correct implementation cancels the body reader and + // lets pull exit cleanly; the broken one would continue spinning. + const hangingBody = new ReadableStream({ + start() { + // Never enqueue or close; we want pull to be stuck in read(). + }, + }); + vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce( + new Response(hangingBody, { status: 200 }) + ); + + const streamer = await getStreamer(); + const stream = await streamer.readFromStream('strm_test'); + const reader = stream.getReader(); + + // Schedule a read so pull starts and gets parked in reader.read(). + const readPromise = reader.read(); + await new Promise((r) => setTimeout(r, 10)); + + await reader.cancel(); + + const result = await readPromise; + expect(result.done).toBe(true); + }); + }); }); diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index eec5e81da2..12303503bc 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -263,6 +263,14 @@ export function createStreamer(config?: APIConfig): Streamer { const MAX_RECONNECTS = 50; let reconnectCount = 0; + // Propagates consumer cancellation all the way to the upstream fetch. + // Without this, a client disconnect mid-reconnect would leave the new + // fetch untouched and the pull loop happily reading from it, so the + // caller (e.g. an API endpoint returning run.getReadable()) would keep + // running well after its own client hung up. + let cancelled = false; + const abortController = new AbortController(); + const connect = async (): Promise< ReadableStreamDefaultReader > => { @@ -274,6 +282,7 @@ export function createStreamer(config?: APIConfig): Streamer { url.searchParams.set('startIndex', String(currentStartIndex)); const response = await fetch(url, { headers: httpConfig.headers, + signal: abortController.signal, }); if (!response.ok) { throw new Error(`Failed to fetch stream: ${response.status}`); @@ -300,10 +309,15 @@ export function createStreamer(config?: APIConfig): Streamer { // yielding, rather than returning and relying on the runtime to // re-invoke pull when nothing was enqueued. for (;;) { + if (cancelled) return; + let result: { done: boolean; value?: Uint8Array }; try { result = await reader.read(); } catch (err) { + // A cancel during an in-flight read can surface here as an + // AbortError — swallow it, since the consumer asked for it. + if (cancelled) return; // Network error — not a clean close. Flush any buffered // bytes and surface the error so consumers know the stream // was truncated. @@ -315,6 +329,8 @@ export function createStreamer(config?: APIConfig): Streamer { return; } + if (cancelled) return; + if (!result.done) { // Append new data and forward everything except the last // STREAM_CONTROL_FRAME_SIZE bytes. @@ -356,7 +372,13 @@ export function createStreamer(config?: APIConfig): Streamer { return; } currentStartIndex = control.nextIndex; - reader = await connect(); + try { + reader = await connect(); + } catch (err) { + if (cancelled) return; + controller.error(err); + return; + } continue; } @@ -370,7 +392,16 @@ export function createStreamer(config?: APIConfig): Streamer { } }, cancel: async () => { - await reader.cancel(); + cancelled = true; + console.log('Cancelling stream'); + abortController.abort(); + // Best-effort cancel of the current reader. `reader` is a closure + // variable, so this sees whichever connection is active right now; + // the `cancelled` flag guards against a concurrent reconnect + // swapping in a fresh reader after this runs. + await reader.cancel().catch((err) => { + console.warn('Failed to cancel reader:', err); + }); }, }); },