Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stream-cancel-propagation-main.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-vercel': patch
---

Propagate consumer cancellation to upstream fetch in `streams.get`
134 changes: 134 additions & 0 deletions packages/world-vercel/src/streamer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,137 @@ describe('writeMulti pagination', () => {
]);
});
});

// Regression tests for consumer-cancel propagation on streams.get.
//
// Before the fix, cancelling the ReadableStream returned by streams.get()
// only called reader.cancel() on the currently-captured fetch body reader.
// The pull loop kept running, and if the upstream had emitted a timeout
// control frame, pull would happily call connect() again and keep reading
// — so a consumer disconnect (e.g. an HTTP client hanging up on
// `run.getReadable()`) would leave the server still fetching in the
// background.
describe('streams.get consumer cancel', () => {
async function getStreamer() {
const { createStreamer } = await import('./streamer.js');
return createStreamer();
}

function chunkedStream(chunks: Uint8Array[]): ReadableStream<Uint8Array> {
let i = 0;
return new ReadableStream({
pull(controller) {
if (i < chunks.length) {
controller.enqueue(chunks[i++]);
} else {
controller.close();
}
},
});
}

function streamResponse(...chunks: Uint8Array[]): Response {
return new Response(chunkedStream(chunks), {
status: 200,
headers: { 'Content-Type': 'application/octet-stream' },
});
}

afterEach(() => {
vi.restoreAllMocks();
});

it('aborts the in-flight upstream fetch via AbortSignal', async () => {
const capturedSignals: (AbortSignal | null | undefined)[] = [];
let resolveFirstFetch: (r: Response) => void;
const firstFetchPromise = new Promise<Response>((resolve) => {
resolveFirstFetch = resolve;
});
vi.spyOn(globalThis, 'fetch').mockImplementation(async (_url, init) => {
capturedSignals.push(init?.signal);
return firstFetchPromise;
});

const streamer = await getStreamer();
// streams.get kicks off the fetch synchronously via connect().
const streamPromise = streamer.streams.get('run-1', 'my-stream');
// Give connect() a tick to start the fetch.
await new Promise((r) => setTimeout(r, 0));

// Resolve the fetch so the ReadableStream construction can complete.
resolveFirstFetch!(streamResponse(new Uint8Array(0)));
const stream = await streamPromise;

await stream.cancel();

expect(capturedSignals.length).toBeGreaterThan(0);
const signal = capturedSignals[0];
expect(signal).toBeDefined();
expect(signal?.aborted).toBe(true);
});

it('does not reconnect after the consumer cancels mid-timeout', async () => {
const chunk1 = new TextEncoder().encode('hello, world');
const timeout = buildControlFrame(false, 1);

let fetchCount = 0;
vi.spyOn(globalThis, 'fetch').mockImplementation(async (_url, init) => {
fetchCount++;
if (fetchCount === 1) {
return streamResponse(chunk1, timeout);
}
// A reconnect should never happen after cancel. If it does, hang
// indefinitely so the test can assert fetchCount === 1.
return new Promise<Response>((_, reject) => {
init?.signal?.addEventListener('abort', () =>
reject(new DOMException('aborted', 'AbortError'))
);
});
});

const streamer = await getStreamer();
const stream = await streamer.streams.get('run-1', 'my-stream');
const reader = stream.getReader();

// Drain the data bytes — this forces pull to observe the upstream
// close and enter the reconnect branch.
const first = await reader.read();
expect(first.done).toBe(false);

// Cancel while (or right after) pull is attempting to reconnect.
await reader.cancel();

// Give any lingering pull work a chance to misbehave.
await new Promise((r) => setTimeout(r, 50));

// Only the initial connection should ever have been made.
expect(fetchCount).toBe(1);
});

it('cancels the active reader when cancel is called during a read', async () => {
// Upstream body that hangs — so pull is blocked in reader.read() when
// cancel arrives. A correct implementation cancels the body reader and
// lets pull exit cleanly; the broken one would continue spinning.
const hangingBody = new ReadableStream<Uint8Array>({
start() {
// Never enqueue or close; we want pull to be stuck in read().
},
});
vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce(
new Response(hangingBody, { status: 200 })
);

const streamer = await getStreamer();
const stream = await streamer.streams.get('run-1', 'my-stream');
const reader = stream.getReader();

// Schedule a read so pull starts and gets parked in reader.read().
const readPromise = reader.read();
await new Promise((r) => setTimeout(r, 10));

await reader.cancel();

const result = await readPromise;
expect(result.done).toBe(true);
});
});
34 changes: 32 additions & 2 deletions packages/world-vercel/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ export function createStreamer(config?: APIConfig): Streamer {
const MAX_RECONNECTS = 50;
let reconnectCount = 0;

// Propagates consumer cancellation all the way to the upstream fetch.
// Without this, a client disconnect mid-reconnect would leave the new
// fetch untouched and the pull loop happily reading from it, so the
// caller (e.g. an API endpoint returning run.getReadable()) would keep
// running well after its own client hung up.
let cancelled = false;
const abortController = new AbortController();

const connect = async (): Promise<
ReadableStreamDefaultReader<Uint8Array>
> => {
Expand All @@ -270,6 +278,7 @@ export function createStreamer(config?: APIConfig): Streamer {
url.searchParams.set('startIndex', String(currentStartIndex));
const response = await fetch(url, {
headers: httpConfig.headers,
signal: abortController.signal,
});
if (!response.ok) {
throw new Error(`Failed to fetch stream: ${response.status}`);
Expand All @@ -289,10 +298,15 @@ export function createStreamer(config?: APIConfig): Streamer {
return new ReadableStream<Uint8Array>({
pull: async (controller) => {
for (;;) {
if (cancelled) return;

let result: { done: boolean; value?: Uint8Array };
try {
result = await reader.read();
} catch (err) {
// A cancel during an in-flight read can surface here as an
// AbortError — swallow it, since the consumer asked for it.
if (cancelled) return;
// Network error — not a clean close. Forward any buffered
// data and propagate the error so consumers know the stream
// was truncated.
Expand All @@ -304,6 +318,8 @@ export function createStreamer(config?: APIConfig): Streamer {
return;
}

if (cancelled) return;

if (!result.done) {
// Append new data to tail buffer, forward everything except
// the last STREAM_CONTROL_FRAME_SIZE bytes.
Expand Down Expand Up @@ -351,7 +367,13 @@ export function createStreamer(config?: APIConfig): Streamer {
return;
}
currentStartIndex = control.nextIndex;
reader = await connect();
try {
reader = await connect();
} catch (err) {
if (cancelled) return;
controller.error(err);
return;
}
continue;
}

Expand All @@ -366,7 +388,15 @@ export function createStreamer(config?: APIConfig): Streamer {
}
},
cancel: async () => {
await reader.cancel();
cancelled = true;
abortController.abort();
// Best-effort cancel of the current reader. `reader` is a closure
// variable, so this sees whichever connection is active right now;
// the `cancelled` flag guards against a concurrent reconnect
// swapping in a fresh reader after this runs.
await reader.cancel().catch((err) => {
console.warn('Failed to cancel reader:', err);
});
},
});
},
Expand Down
Loading