Skip to content

Commit 76ad59f

Browse files
fix(stream): Avoid bun memory leak bug from TransformStream (#4255)
* Avoid bun memory leak bug from TransformStream * fix(executor): skip content persistence when stream consumer exits early Previously, if the onStream consumer caught an internal error without re-throwing, the block-executor would treat the shortened accumulator as the complete response, persist a truncated string to memory via appendToMemory, and set it as executionOutput.content. Track whether the source ReadableStream actually closed (done=true) in the pull handler. If onStream returns before the source drains, skip content persistence and log a warning — the old tee()-based flow was immune to this because the executor branch drained independently of the client branch. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> * fix lint * fix(executor): early-return when no streamed content to make onFullContent symmetric Previously, executionOutput.content was guarded by `if (fullContent)` but `onFullContent` fired regardless. The agent-handler implementor defensively bails on empty/whitespace content, but that's a callee contract, not enforced at the call site — future implementors could spuriously persist empty assistant turns to memory. Hoist the `!fullContent` check to a single early return, so both the output write and the callback share the same precondition. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --------- Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
1 parent c32c1cb commit 76ad59f

4 files changed

Lines changed: 99 additions & 127 deletions

File tree

apps/sim/executor/execution/block-executor.ts

Lines changed: 84 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import {
3434
type ExecutionContext,
3535
getNextExecutionOrder,
3636
type NormalizedBlockOutput,
37+
type StreamingExecution,
3738
} from '@/executor/types'
3839
import { streamingResponseFormatProcessor } from '@/executor/utils'
3940
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
@@ -140,7 +141,7 @@ export class BlockExecutor {
140141

141142
let normalizedOutput: NormalizedBlockOutput
142143
if (isStreamingExecution) {
143-
const streamingExec = output as { stream: ReadableStream; execution: any }
144+
const streamingExec = output as StreamingExecution
144145

145146
if (ctx.onStream) {
146147
await this.handleStreamingExecution(
@@ -602,7 +603,7 @@ export class BlockExecutor {
602603
ctx: ExecutionContext,
603604
node: DAGNode,
604605
block: SerializedBlock,
605-
streamingExec: { stream: ReadableStream; execution: any },
606+
streamingExec: StreamingExecution,
606607
resolvedInputs: Record<string, any>,
607608
selectedOutputs: string[]
608609
): Promise<void> {
@@ -613,129 +614,115 @@ export class BlockExecutor {
613614
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
614615
(block.config as Record<string, any> | undefined)?.responseFormat
615616

616-
const stream = streamingExec.stream
617-
if (typeof stream.tee !== 'function') {
618-
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
619-
return
620-
}
617+
const sourceReader = streamingExec.stream.getReader()
618+
const decoder = new TextDecoder()
619+
const accumulated: string[] = []
620+
let drainError: unknown
621+
let sourceFullyDrained = false
621622

622-
const [clientStream, executorStream] = stream.tee()
623+
const clientSource = new ReadableStream<Uint8Array>({
624+
async pull(controller) {
625+
try {
626+
const { done, value } = await sourceReader.read()
627+
if (done) {
628+
const tail = decoder.decode()
629+
if (tail) accumulated.push(tail)
630+
sourceFullyDrained = true
631+
controller.close()
632+
return
633+
}
634+
accumulated.push(decoder.decode(value, { stream: true }))
635+
controller.enqueue(value)
636+
} catch (error) {
637+
drainError = error
638+
controller.error(error)
639+
}
640+
},
641+
async cancel(reason) {
642+
try {
643+
await sourceReader.cancel(reason)
644+
} catch {}
645+
},
646+
})
623647

624648
const processedClientStream = streamingResponseFormatProcessor.processStream(
625-
clientStream,
626-
blockId,
627-
selectedOutputs,
628-
responseFormat
629-
)
630-
631-
const clientStreamingExec = {
632-
...streamingExec,
633-
stream: processedClientStream,
634-
}
635-
636-
const executorConsumption = this.consumeExecutorStream(
637-
executorStream,
638-
streamingExec,
639-
blockId,
640-
responseFormat
641-
)
642-
643-
const clientConsumption = (async () => {
644-
try {
645-
await ctx.onStream?.(clientStreamingExec)
646-
} catch (error) {
647-
this.execLogger.error('Error in onStream callback', { blockId, error })
648-
// Cancel the client stream to release the tee'd buffer
649-
await processedClientStream.cancel().catch(() => {})
650-
}
651-
})()
652-
653-
await Promise.all([clientConsumption, executorConsumption])
654-
}
655-
656-
private async forwardStream(
657-
ctx: ExecutionContext,
658-
blockId: string,
659-
streamingExec: { stream: ReadableStream; execution: any },
660-
stream: ReadableStream,
661-
responseFormat: any,
662-
selectedOutputs: string[]
663-
): Promise<void> {
664-
const processedStream = streamingResponseFormatProcessor.processStream(
665-
stream,
649+
clientSource,
666650
blockId,
667651
selectedOutputs,
668652
responseFormat
669653
)
670654

671655
try {
672656
await ctx.onStream?.({
673-
...streamingExec,
674-
stream: processedStream,
657+
stream: processedClientStream,
658+
execution: streamingExec.execution,
675659
})
676660
} catch (error) {
677661
this.execLogger.error('Error in onStream callback', { blockId, error })
678-
await processedStream.cancel().catch(() => {})
679-
}
680-
}
681-
682-
private async consumeExecutorStream(
683-
stream: ReadableStream,
684-
streamingExec: { execution: any },
685-
blockId: string,
686-
responseFormat: any
687-
): Promise<void> {
688-
const reader = stream.getReader()
689-
const decoder = new TextDecoder()
690-
const chunks: string[] = []
691-
692-
try {
693-
while (true) {
694-
const { done, value } = await reader.read()
695-
if (done) break
696-
chunks.push(decoder.decode(value, { stream: true }))
697-
}
698-
const tail = decoder.decode()
699-
if (tail) chunks.push(tail)
700-
} catch (error) {
701-
this.execLogger.error('Error reading executor stream for block', { blockId, error })
662+
await processedClientStream.cancel().catch(() => {})
702663
} finally {
703664
try {
704-
await reader.cancel().catch(() => {})
665+
sourceReader.releaseLock()
705666
} catch {}
706667
}
707668

708-
const fullContent = chunks.join('')
669+
if (drainError) {
670+
this.execLogger.error('Error reading stream for block', { blockId, error: drainError })
671+
return
672+
}
673+
674+
// If the onStream consumer exited before the source drained (e.g. it caught
675+
// an internal error and returned normally), `accumulated` holds a truncated
676+
// response. Persisting that to memory or setting it as the block output
677+
// would corrupt downstream state — skip and log instead.
678+
if (!sourceFullyDrained) {
679+
this.execLogger.warn(
680+
'Stream consumer exited before source drained; skipping content persistence',
681+
{
682+
blockId,
683+
}
684+
)
685+
return
686+
}
687+
688+
const fullContent = accumulated.join('')
709689
if (!fullContent) {
710690
return
711691
}
712692

713693
const executionOutput = streamingExec.execution?.output
714-
if (!executionOutput || typeof executionOutput !== 'object') {
715-
return
694+
if (executionOutput && typeof executionOutput === 'object') {
695+
let parsedForFormat = false
696+
if (responseFormat) {
697+
try {
698+
const parsed = JSON.parse(fullContent.trim())
699+
streamingExec.execution.output = {
700+
...parsed,
701+
tokens: executionOutput.tokens,
702+
toolCalls: executionOutput.toolCalls,
703+
providerTiming: executionOutput.providerTiming,
704+
cost: executionOutput.cost,
705+
model: executionOutput.model,
706+
}
707+
parsedForFormat = true
708+
} catch (error) {
709+
this.execLogger.warn('Failed to parse streamed content for response format', {
710+
blockId,
711+
error,
712+
})
713+
}
714+
}
715+
if (!parsedForFormat) {
716+
executionOutput.content = fullContent
717+
}
716718
}
717719

718-
if (responseFormat) {
720+
if (streamingExec.onFullContent) {
719721
try {
720-
const parsed = JSON.parse(fullContent.trim())
721-
722-
streamingExec.execution.output = {
723-
...parsed,
724-
tokens: executionOutput.tokens,
725-
toolCalls: executionOutput.toolCalls,
726-
providerTiming: executionOutput.providerTiming,
727-
cost: executionOutput.cost,
728-
model: executionOutput.model,
729-
}
730-
return
722+
await streamingExec.onFullContent(fullContent)
731723
} catch (error) {
732-
this.execLogger.warn('Failed to parse streamed content for response format', {
733-
blockId,
734-
error,
735-
})
724+
this.execLogger.error('onFullContent callback failed', { blockId, error })
736725
}
737726
}
738-
739-
executionOutput.content = fullContent
740727
}
741728
}

apps/sim/executor/handlers/agent/agent-handler.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler {
958958
streamingExec: StreamingExecution
959959
): StreamingExecution {
960960
return {
961-
stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs),
961+
stream: streamingExec.stream,
962962
execution: streamingExec.execution,
963+
onFullContent: async (content: string) => {
964+
if (!content.trim()) return
965+
try {
966+
await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content })
967+
} catch (error) {
968+
logger.error('Failed to persist streaming response:', error)
969+
}
970+
},
963971
}
964972
}
965973

apps/sim/executor/handlers/agent/memory.ts

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -111,35 +111,6 @@ export class Memory {
111111
})
112112
}
113113

114-
wrapStreamForPersistence(
115-
stream: ReadableStream<Uint8Array>,
116-
ctx: ExecutionContext,
117-
inputs: AgentInputs
118-
): ReadableStream<Uint8Array> {
119-
const chunks: string[] = []
120-
const decoder = new TextDecoder()
121-
122-
const transformStream = new TransformStream<Uint8Array, Uint8Array>({
123-
transform: (chunk, controller) => {
124-
controller.enqueue(chunk)
125-
const decoded = decoder.decode(chunk, { stream: true })
126-
chunks.push(decoded)
127-
},
128-
129-
flush: () => {
130-
const content = chunks.join('')
131-
if (content.trim()) {
132-
this.appendToMemory(ctx, inputs, {
133-
role: 'assistant',
134-
content,
135-
}).catch((error) => logger.error('Failed to persist streaming response:', error))
136-
}
137-
},
138-
})
139-
140-
return stream.pipeThrough(transformStream)
141-
}
142-
143114
private requireWorkspaceId(ctx: ExecutionContext): string {
144115
if (!ctx.workspaceId) {
145116
throw new Error('workspaceId is required for memory operations')

apps/sim/executor/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,12 @@ export interface ExecutionResult {
359359
export interface StreamingExecution {
360360
stream: ReadableStream
361361
execution: ExecutionResult & { isStreaming?: boolean }
362+
/**
363+
* Invoked with the assembled response text after the stream drains. Lets agent
364+
* blocks persist the full response without interposing a TransformStream on a
365+
* fetch-backed source — that pattern amplifies memory on Bun via #28035.
366+
*/
367+
onFullContent?: (content: string) => void | Promise<void>
362368
}
363369

364370
export interface BlockExecutor {

0 commit comments

Comments
 (0)