[world-vercel] Propagate cancel from streams.get to upstream fetch#1802
[world-vercel] Propagate cancel from streams.get to upstream fetch#1802VaguelySerious wants to merge 2 commits intomainfrom
Conversation
When a consumer cancels the ReadableStream returned by streams.get (e.g. an HTTP client hanging up on an endpoint that pipes `run.getReadable()`), the pull loop could continue running and even trigger a fresh reconnect via `connect()` — the new fetch was never tied to the cancellation, so the request kept running in the background. Fix: - Track a `cancelled` flag that `pull` checks before and after each read and before each reconnect. - Plumb an `AbortController.signal` into `fetch` so the in-flight upstream request (including a pending reconnect) is aborted when the consumer cancels. Regression tests cover the abort-signal contract, the mid-timeout reconnect race, and cancel-during-read. Port of #1801 (targeting `stable`). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> Signed-off-by: Peter Wielander <[email protected]>
🦋 Changeset detectedLatest commit: 9de9271 The changes in this PR will be included in the next version bump. This PR includes changesets to release 18 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests💻 Local Development (1 failed)vite-stable (1 failed):
Details by Category✅ ▲ Vercel Production
❌ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 📋 Other
❌ Some E2E test jobs failed:
Check the workflow run for details. |
📊 Benchmark Results
workflow with no steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) workflow with 1 step💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Next.js (Turbopack) | Nitro workflow with 10 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro workflow with 25 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) workflow with 50 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Next.js (Turbopack) | Nitro Promise.all with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) Promise.all with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.all with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.race with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Next.js (Turbopack) | Nitro Promise.race with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) Promise.race with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) workflow with 10 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro | Express workflow with 25 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro | Express workflow with 50 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) workflow with 10 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) workflow with 25 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) workflow with 50 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro stream pipeline with 5 transform steps (1MB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Next.js (Turbopack) | Nitro 10 parallel streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) fan-out fan-in 10 streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
❌ Some benchmark jobs failed:
Check the workflow run for details. |
Co-authored-by: Peter Wielander <[email protected]> Signed-off-by: Peter Wielander <[email protected]>
TooTallNate
left a comment
There was a problem hiding this comment.
Review
Clean, well-scoped fix. Forward-port of #1801 for main. The approach — cancelled flag as the source of truth plus a single shared AbortController for the whole stream lifetime — is the right pattern for this class of bug.
What looks good
-
Cancellation checks at all the right points: before
reader.read(), afterreader.read()(covers the case where read completed naturally between abort and flag check), and beforereconnect(). Combined with the AbortController plumbed intofetch, this covers all the race windows:- Cancel before any pull → initial reader gets cancelled, pull is never invoked
- Cancel during active read → AbortError swallowed, exits cleanly
- Cancel during reconnect → in-flight fetch aborted, caught, exits cleanly
- Cancel between reconnect completion and next read → flag checked at top of loop
-
Shared
AbortControlleracross all reconnects is correct. Once aborted, it stays aborted, which means all futureconnect()calls abort instantly. Right primitive for whole-stream cancellation. -
Reconnect try/catch properly distinguishes abort-from-cancel (return silently) vs. real errors (propagate via
controller.error). -
reader.cancel().catch()on the closure variable — the comment explicitly acknowledges the TOCTOU window and explains why thecancelledflag is the real guarantee. Good defense-in-depth comment. -
Changeset correctly scoped to
@workflow/world-vercelonly.
Test coverage observations
The PR description calls out test 1 (aborts the in-flight upstream fetch via AbortSignal) as failing on main without the fix — I agree, that's the strongest test.
Tests 2 and 3 are more like documentation/regression guards than fix verifications:
- Test 2 (
does not reconnect after the consumer cancels mid-timeout): the test never callsread()after the first one, so pull is never re-invoked — no reconnect path is actually exercised. Pre-fix,fetchCountwould also be1simply because there's no trigger for a second pull. To really exercise the race, the test would need to let pull enter the reconnect branch before cancel (e.g. drain all data + timeout frame before cancelling). That said, it's still useful as a guard against future regressions where someone might trigger reconnects asynchronously. - Test 3 (
cancels the active reader when cancel is called during a read): WHATWG streams semantics already makereader.cancel()propagate to the upstream body reader, so this likely passed pre-fix too.
Not blocking — the primary fix is well-verified by test 1.
Summary
Forward-port of #1801 onto
main. Same class of bug, different function shape (streams.get(runId, name, startIndex)here vs.readFromStream(name, startIndex)onstable).When a consumer cancels the
ReadableStreamreturned bystreams.get()(e.g. an HTTP client disconnects from an API route that pipesrun.getReadable()), the pull loop in the streamer kept running and could trigger further reconnects in the background. The in-flight upstream fetch was never aborted, so the endpoint kept fetching long after the client had gone away.Root cause
cancelledflag —pullhad no way to know the consumer asked to stop.AbortSignalon the upstreamfetch()— even if we noticed, there was no way to unblock a pending request.The
cancelhandler only calledreader.cancel()on whichever reader was captured at that moment. Ifpullwas mid-reconnect (reader = await connect()),cancelcancelled the stale reader; the new fetch then connected and kept streaming.Fix
cancelledflag thatpullchecks before and after eachreader.read()and before eachconnect().AbortController.signalintofetch, aborted fromcancel, so the in-flight request (including a pending reconnect) unblocks.connect()failures triggered by abort are swallowed silently whencancelled.Tests
Three new tests under a new
streams.get consumer canceldescribe block:aborts the in-flight upstream fetch via AbortSignal— fails onmainwithout the fix; assertsfetchwas called with anAbortSignaland that the signal is aborted afterstream.cancel().does not reconnect after the consumer cancels mid-timeout— documents that no further fetch is initiated after cancel.cancels the active reader when cancel is called during a read— documents that cancel unblocks a pendingreader.read().All 32 world-vercel streamer tests pass locally.
Companion
stable(ships in the 4.2.x line).Test plan
pnpm vitest run— all world-vercel tests pass🤖 Generated with Claude Code