Skip to content

Commit f8f9898

Browse files
committed
fix(core,sdk,cli): handover-hang, buffer-seq desync, MCP legacy filter
Three follow-ups from PR review. `chat-server.ts stitchHandoverStream` was blocking ~60s after turn- complete because control records never enqueue into the SSE data stream, so the `for await` loop had no way to wake up and S2's long- poll held the connection open until it timed out. A dedicated `subscriptionAbort` AbortController is now aborted from `onControl` when turn-complete fires, the for-await exits via AbortError, and the catch treats that as the expected exit path so the surrounding session-state emission and close still run. `SessionStreamManager` was breaking its parallel-array invariant on the (theoretical) `undefined`-seqNum path: a record with NaN-parsing `part.id` would push to `buffer` but skip `bufferSeqNums`, drifting the arrays so the next `once()` / `shiftBuffer` shifted a stale seqNum into `lastDispatchedSeqNum`. `bufferSeqNums` now stores `number | undefined` and always pushes; the shift sites already gate the cursor advance on `seqNum !== undefined`. `cli-v3/mcp/tools/agentChat.ts` picks up the same belt-and-suspenders filter the dashboard's `AgentView.tsx` already carries: skip any data chunk whose `type` starts with `trigger:`. Covers any in-flight session whose `.out` was populated by an older SDK that emitted turn-complete / upgrade-required as data records.
1 parent 0dc7142 commit f8f9898

3 files changed

Lines changed: 50 additions & 16 deletions

File tree

packages/cli-v3/src/mcp/tools/agentChat.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,15 @@ async function collectAgentResponse(
443443
if (value.chunk != null && typeof value.chunk === "object") {
444444
const chunk = value.chunk as Record<string, unknown>;
445445

446+
// Legacy belt-and-suspenders: prior SDK versions emitted
447+
// `trigger:turn-complete` / `trigger:upgrade-required` as
448+
// data records (`chunk.type`) instead of header-form control
449+
// records. Filter them so an in-flight session whose `.out`
450+
// was populated by an older agent doesn't stall this loop.
451+
if (typeof chunk.type === "string" && chunk.type.startsWith("trigger:")) {
452+
continue;
453+
}
454+
446455
if (chunk.type === "text-delta" && typeof chunk.delta === "string") {
447456
text += chunk.delta;
448457
// Accumulate into a text part

packages/core/src/v3/sessionStreams/manager.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,13 @@ export class StandardSessionStreamManager implements SessionStreamManager {
4949
// a buffered record into a waiter, the cursor (`lastDispatchedSeqNums`)
5050
// can advance to that record's seq. Kept as a separate map so the
5151
// existing `peek()` shape (returns `unknown`) stays unchanged.
52-
private bufferSeqNums = new Map<string, number[]>();
52+
//
53+
// Entries are `number | undefined` so the array stays length-locked
54+
// with `buffer` even if a record arrives without a parseable seq —
55+
// shifting `undefined` is just a no-op for the cursor advance, but
56+
// the slot still gets consumed. Drifting lengths would map seq_nums
57+
// to the wrong records on subsequent shifts.
58+
private bufferSeqNums = new Map<string, Array<number | undefined>>();
5359
private tails = new Map<string, TailState>();
5460
// Per-stream lower-bound timestamp filter. When set, records whose
5561
// SSE timestamp is <= the bound are dropped before dispatch — used by
@@ -503,14 +509,16 @@ export class StandardSessionStreamManager implements SessionStreamManager {
503509
this.buffer.set(key, buffered);
504510
}
505511
buffered.push(data);
506-
if (seqNum !== undefined) {
507-
let bufferedSeqs = this.bufferSeqNums.get(key);
508-
if (!bufferedSeqs) {
509-
bufferedSeqs = [];
510-
this.bufferSeqNums.set(key, bufferedSeqs);
511-
}
512-
bufferedSeqs.push(seqNum);
512+
let bufferedSeqs = this.bufferSeqNums.get(key);
513+
if (!bufferedSeqs) {
514+
bufferedSeqs = [];
515+
this.bufferSeqNums.set(key, bufferedSeqs);
513516
}
517+
// Always push, even when `seqNum` is undefined (e.g. NaN from a
518+
// malformed `part.id`). Skipping the push here would drift the two
519+
// arrays apart and misattribute seq_nums to records on the next
520+
// shift.
521+
bufferedSeqs.push(seqNum);
514522
}
515523

516524
#invokeHandlers(key: string, data: unknown): void {

packages/trigger-sdk/src/v3/chat-server.ts

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -567,14 +567,21 @@ async function openHandoverSession(opts: {
567567
// legacy chunk shape for the customer-server-to-browser hop).
568568
let latestEventId: string | undefined;
569569
let turnComplete = false;
570+
// Dedicated abort signal for this agent subscription. Aborted
571+
// from `onControl` the moment turn-complete fires so the
572+
// `for await` loop below exits immediately instead of blocking
573+
// until S2's long-poll closes (~60s). Combined with the outer
574+
// `abortController.signal` via `AbortSignal.any` so a request-
575+
// wide abort still tears the subscription down.
576+
const subscriptionAbort = new AbortController();
570577
const agentStream = await apiClient.subscribeToSessionStream<UIMessageChunk>(
571578
chatId,
572579
"out",
573580
{
574581
...(customerLastEventId != null
575582
? { lastEventId: customerLastEventId }
576583
: {}),
577-
signal: abortController.signal,
584+
signal: AbortSignal.any([abortController.signal, subscriptionAbort.signal]),
578585
onPart: (part) => {
579586
if (part.id) latestEventId = part.id;
580587
},
@@ -589,18 +596,28 @@ async function openHandoverSession(opts: {
589596
controller.enqueue({
590597
type: "trigger:turn-complete",
591598
} as unknown as UIMessageChunk);
599+
// Stop the SSE read now. Without this the `for await`
600+
// can't see the control event (control records are
601+
// never enqueued into the data stream) and would idle
602+
// until S2's long-poll timeout closes the connection.
603+
subscriptionAbort.abort();
592604
}
593605
},
594606
}
595607
);
596608

597-
for await (const chunk of agentStream) {
598-
// Data records only — control records are routed via
599-
// `onControl` above. Stop reading as soon as we see the
600-
// turn-complete control event (the loop may have one more
601-
// data record buffered, but that's fine — we break out).
602-
controller.enqueue(chunk);
603-
if (turnComplete) break;
609+
try {
610+
for await (const chunk of agentStream) {
611+
// Data records only — control records are routed via
612+
// `onControl` above and trigger the subscription abort.
613+
controller.enqueue(chunk);
614+
if (turnComplete) break;
615+
}
616+
} catch (err) {
617+
// AbortError from `subscriptionAbort` is the expected exit
618+
// path once turn-complete fires; surface anything else.
619+
const isAbort = err instanceof Error && err.name === "AbortError";
620+
if (!isAbort || !turnComplete) throw err;
604621
}
605622

606623
// Final control chunk: hand the browser transport the

0 commit comments

Comments
 (0)