From ca522cac0e2d081caf72f95d9900fba25b1c84f3 Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Fri, 19 Jun 2026 14:48:37 +0200 Subject: [PATCH 1/4] implement orchestrion-based ai v6 instrumentation --- .../vercelai/v6_v7/instrument-with-pii.mjs | 4 + .../tracing/vercelai/v6_v7/instrument.mjs | 4 + .../suites/tracing/vercelai/v6_v7/test.ts | 30 +- ...erimentalUseDiagnosticsChannelInjection.ts | 7 +- .../integrations/tracing-channel/vercel-ai.ts | 47 ++ .../server-utils/src/orchestrion/channels.ts | 12 + .../server-utils/src/orchestrion/config.ts | 26 ++ .../server-utils/src/orchestrion/index.ts | 1 + .../src/vercel-ai/vercel-ai-dc-subscriber.ts | 18 +- .../vercel-ai-orchestrion-v6-subscriber.ts | 411 ++++++++++++++++++ 10 files changed, 547 insertions(+), 13 deletions(-) create mode 100644 packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts create mode 100644 packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument-with-pii.mjs b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument-with-pii.mjs index 7a021162097c..a5a385355c79 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument-with-pii.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument-with-pii.mjs @@ -1,6 +1,10 @@ import * as Sentry from '@sentry/node'; import { loggingTransport } from '@sentry-internal/node-integration-tests'; +if (process.env.USE_ORCHESTRION) { + Sentry.experimentalUseDiagnosticsChannelInjection(); +} + Sentry.init({ dsn: 'https://public@dsn.ingest.sentry.io/1337', release: '1.0', diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument.mjs b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument.mjs index 46a27dd03b74..ce1e1b394c66 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument.mjs @@ -1,6 +1,10 @@ import * as Sentry from '@sentry/node'; import { loggingTransport } from '@sentry-internal/node-integration-tests'; +if (process.env.USE_ORCHESTRION) { + Sentry.experimentalUseDiagnosticsChannelInjection(); +} + Sentry.init({ dsn: 'https://public@dsn.ingest.sentry.io/1337', release: '1.0', diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts index bb579a8f9443..5811020ee218 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts @@ -24,9 +24,11 @@ import { import { cleanupChildProcesses, createEsmAndCjsTests, createEsmTests } from '../../../../utils/runner'; describe.each([ - ['6', '^6.0.0'], - ['7', '7.0.0-beta.179'], -])('Vercel AI integration (version %s)', (version, vercelAiVersion) => { + ['6', {}, '^6.0.0'], + ['6', { USE_ORCHESTRION: 'true' }, '^6.0.0'], + ['7', {}, '7.0.0-beta.179'], + ['7', { USE_ORCHESTRION: 'true' }, '7.0.0-beta.179'], +])('Vercel AI integration (version %s, env %o)', (version, env: Record, vercelAiVersion) => { afterAll(() => { cleanupChildProcesses(); }); @@ -36,9 +38,12 @@ describe.each([ const nodeVersion = NODE_VERSION.major; const failsOnCjs = version === '7' && nodeVersion === 18; - // v6 is instrumented via the OTel processor, v7 via the `ai:telemetry` tracing-channel subscriber, - // so the span origin differs by version. - const expectedOrigin = version === '7' ? 'auto.vercelai.channel' : 'auto.vercelai.otel'; + const useOrchestrion = env.USE_ORCHESTRION === 'true'; + const usesChannels = version === '7' || useOrchestrion; + + // in v7 and orchestrion mode, we use the channel-based integration + // else, we use the OTel processor + const expectedOrigin = usesChannels ? 'auto.vercelai.channel' : 'auto.vercelai.otel'; // We only run this in ESM and CJS to verify full support // Other suites we only run in ESM to simplify the test setup @@ -49,6 +54,7 @@ describe.each([ (createRunner, test) => { test('creates ai spans for dataCollection defaults', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -174,6 +180,7 @@ describe.each([ (createRunner, test) => { test('creates ai spans when dataCollection.genAi has inputs and outputs disabled', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -233,10 +240,10 @@ describe.each([ // On v6, vercel AI natively defaults to recording inputs and outputs by default when telemetry is enabled // On v7, we do not have access to this, so this defaults to false in this case expect(secondInvokeAgentSpan.attributes?.[GEN_AI_INPUT_MESSAGES_ATTRIBUTE]?.value).toEqual( - version === '6' ? '[{"role":"user","content":"Where is the second span?"}]' : undefined, + !usesChannels ? '[{"role":"user","content":"Where is the second span?"}]' : undefined, ); expect(secondInvokeAgentSpan.attributes?.[GEN_AI_OUTPUT_MESSAGES_ATTRIBUTE]?.value).toEqual( - version === '6' + !usesChannels ? '[{"role":"assistant","parts":[{"type":"text","content":"Second span here!"}],"finish_reason":"stop"}]' : undefined, ); @@ -300,6 +307,7 @@ describe.each([ let errorEvent: Event | undefined; await createRunner() + .withEnv(env) .expect({ transaction: transaction => { transactionEvent = transaction; @@ -369,6 +377,7 @@ describe.each([ (createRunner, test) => { test('creates ai related spans', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -411,6 +420,7 @@ describe.each([ (createRunner, test) => { test('creates spans for ToolLoopAgent with tool calls', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -474,6 +484,7 @@ describe.each([ (createRunner, test) => { test('parents concurrent calls that share one model instance correctly', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -529,6 +540,7 @@ describe.each([ 'creates streamText spans with the model call parented to invoke_agent', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -569,6 +581,7 @@ describe.each([ (createRunner, test) => { test('finishes spans with an error status when the operation rejects', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -609,6 +622,7 @@ describe.each([ (createRunner, test) => { test('derives provider-metadata token breakdown, conversation id and system instructions', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { diff --git a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts index 6bba862cdfc0..247f93cf3526 100644 --- a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts +++ b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts @@ -2,6 +2,7 @@ import { mysqlChannelIntegration, lruMemoizerChannelIntegration, detectOrchestrionSetup, + vercelAiChannelIntegration, } from '@sentry/server-utils/orchestrion'; import { registerDiagnosticsChannelInjection } from '@sentry/server-utils/orchestrion/register'; import type { DiagnosticsChannelInjection } from './diagnosticsChannelInjection'; @@ -41,7 +42,11 @@ import { setDiagnosticsChannelInjectionLoader } from './diagnosticsChannelInject */ export function experimentalUseDiagnosticsChannelInjection(): void { setDiagnosticsChannelInjectionLoader((): DiagnosticsChannelInjection => { - const integrations = [mysqlChannelIntegration(), lruMemoizerChannelIntegration()] as const; + const integrations = [ + mysqlChannelIntegration(), + lruMemoizerChannelIntegration(), + vercelAiChannelIntegration(), + ] as const; const replacedOtelIntegrationNames = integrations.map(i => i.name); return { diff --git a/packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts b/packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts new file mode 100644 index 000000000000..8edff2af5b74 --- /dev/null +++ b/packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts @@ -0,0 +1,47 @@ +import type { IntegrationFn } from '@sentry/core'; +import { defineIntegration, extendIntegration, waitForTracingChannelBinding } from '@sentry/core'; +import { vercelAiIntegration as baseVercelAiIntegration } from '../../vercel-ai'; +import * as dc from 'node:diagnostics_channel'; +import { subscribeVercelAiOrchestrionChannels } from '../../vercel-ai/vercel-ai-orchestrion-v6-subscriber'; + +type VercelAiOptions = Parameters[0]; + +// In channel-based (orchestrion) mode we emit our own `gen_ai.*` spans from the +// diagnostics channels. The `ai` SDK still emits its own native OpenTelemetry +// spans whenever the user enables `experimental_telemetry`, which would be +// duplicates. Every native `ai` span carries an `ai.operationId` attribute +// (e.g. `ai.generateText`, `ai.generateText.doGenerate`, `ai.toolCall`) at span +// start, whereas our channel spans use `vercel.ai.operationId` — so we drop the +// native ones up front via `ignoreSpans`, before any vercel-ai processing runs. +const NATIVE_VERCEL_AI_SPANS = { attributes: { 'ai.operationId': /^ai\./ } }; + +const _vercelAiChannelIntegration = ((options: VercelAiOptions = {}) => { + const parentIntegration = baseVercelAiIntegration(options); + + return extendIntegration(parentIntegration, { + options, + beforeSetup(client) { + // Ensure we drop spans emitted by ai v6 or below + // To avoid double-instrumentation - in this scenario, we only want to rely on our own spans + const options = client.getOptions(); + options.ignoreSpans = [...(options.ignoreSpans || []), NATIVE_VERCEL_AI_SPANS]; + }, + setupOnce() { + // Bail if this is not available + if (!dc.tracingChannel) { + return; + } + + waitForTracingChannelBinding(() => { + subscribeVercelAiOrchestrionChannels(dc.tracingChannel, options); + }); + }, + }); +}) satisfies IntegrationFn; + +/** + * Auto-instrument the `ai` SDK. Supported are: + * - v7 via native `ai:telemetry` tracing channel + * - v6 via orchestrion `orchestrion:ai:*` channels + */ +export const vercelAiChannelIntegration = defineIntegration(_vercelAiChannelIntegration); diff --git a/packages/server-utils/src/orchestrion/channels.ts b/packages/server-utils/src/orchestrion/channels.ts index ad2d8ccdd4dd..a82d2c00e313 100644 --- a/packages/server-utils/src/orchestrion/channels.ts +++ b/packages/server-utils/src/orchestrion/channels.ts @@ -14,6 +14,18 @@ export const CHANNELS = { MYSQL_QUERY: 'orchestrion:mysql:query', LRU_MEMOIZER_LOAD: 'orchestrion:lru-memoizer:load', + // Vercel AI (`ai`) v6: orchestrion injects these so the same channel-based + // integration that consumes `ai`'s native `ai:telemetry` channel (v7) can + // also instrument v6. Each maps to a top-level function in `ai`'s bundle. + VERCEL_AI_GENERATE_TEXT: 'orchestrion:ai:generateText', + VERCEL_AI_STREAM_TEXT: 'orchestrion:ai:streamText', + VERCEL_AI_EMBED: 'orchestrion:ai:embed', + VERCEL_AI_EXECUTE_TOOL_CALL: 'orchestrion:ai:executeToolCall', + // `resolveLanguageModel` is the single chokepoint every model call flows + // through; we wrap it to monkey-patch `doGenerate`/`doStream` on the returned + // model (the model-call site itself is an inline call with no injectable + // definition). + VERCEL_AI_RESOLVE_LANGUAGE_MODEL: 'orchestrion:ai:resolveLanguageModel', } as const; export type ChannelName = (typeof CHANNELS)[keyof typeof CHANNELS]; diff --git a/packages/server-utils/src/orchestrion/config.ts b/packages/server-utils/src/orchestrion/config.ts index 104df2185386..ba1784f0f977 100644 --- a/packages/server-utils/src/orchestrion/config.ts +++ b/packages/server-utils/src/orchestrion/config.ts @@ -11,6 +11,19 @@ import type { InstrumentationConfig } from '@apm-js-collab/code-transformer'; * `channelName` here is the unprefixed suffix; the actual diagnostics_channel * name is `orchestrion:${module.name}:${channelName}` (see `channels.ts`). */ +/** + * `ai` ships a single bundled entry per module system, so each instrumented + * function needs one config entry per file (the app loads whichever matches its + * module system). This expands a single target into both. + */ +function vercelAiV6Entries(channelName: string, functionName: string, kind: 'Async' | 'Sync'): InstrumentationConfig[] { + return ['dist/index.js', 'dist/index.mjs'].map(filePath => ({ + channelName, + module: { name: 'ai', versionRange: '>=6.0.0 <7.0.0', filePath }, + functionQuery: { functionName, kind }, + })); +} + export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [ { channelName: 'query', @@ -38,6 +51,19 @@ export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [ module: { name: 'lru-memoizer', versionRange: '>=2.1.0 <4', filePath: 'lib/async.js' }, functionQuery: { functionName: 'memoizedFunction', kind: 'Callback' }, }, + // Vercel AI v6: mirror the v7 native `ai:telemetry` channel by injecting + // channels into the top-level entry points. `resolveLanguageModel` is wrapped + // not to span it, but so the subscriber can monkey-patch `doGenerate`/ + // `doStream` on the returned model (the only way to span the model call, + // which is an inline call with no injectable definition in `ai`). + // `streamText` returns its result synchronously (streaming is lazy), so it's + // `Sync`; the subscriber binds the span via `bindTracingChannelToSpan`, which + // ends it when the (synchronous) call returns. + ...vercelAiV6Entries('generateText', 'generateText', 'Async'), + ...vercelAiV6Entries('streamText', 'streamText', 'Sync'), + ...vercelAiV6Entries('embed', 'embed', 'Async'), + ...vercelAiV6Entries('executeToolCall', 'executeToolCall', 'Async'), + ...vercelAiV6Entries('resolveLanguageModel', 'resolveLanguageModel', 'Sync'), ]; /** diff --git a/packages/server-utils/src/orchestrion/index.ts b/packages/server-utils/src/orchestrion/index.ts index 4b182e51ec13..23eb7a7ac4f7 100644 --- a/packages/server-utils/src/orchestrion/index.ts +++ b/packages/server-utils/src/orchestrion/index.ts @@ -1,3 +1,4 @@ export { detectOrchestrionSetup } from './detect'; export { mysqlChannelIntegration } from '../integrations/tracing-channel/mysql'; export { lruMemoizerChannelIntegration } from '../integrations/tracing-channel/lru-memoizer'; +export { vercelAiChannelIntegration } from '../integrations/tracing-channel/vercel-ai'; diff --git a/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts b/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts index f2b4b0debff9..74ff4ad6cc9c 100644 --- a/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts +++ b/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts @@ -95,11 +95,21 @@ export function clearOperationId(data: VercelAiChannelMessage): void { } const callId = asString(data.event.callId); if (callId) { - operationIdByCallId.delete(callId); - toolDescriptionsByCallId.delete(callId); + clearOperationCallId(callId); } } +/** + * Drop the per-operation `callId` maps for a single id. The v6 orchestrion adapter uses this to clear a + * `streamText` operation only after its lazily-run model call settles — the operation's own span ends + * synchronously (when `streamText` returns) but the model call runs later as the stream is consumed, and + * it still needs the operation's `operationId`/`isStream` entry to name itself `ai.streamText.doStream`. + */ +export function clearOperationCallId(callId: string): void { + operationIdByCallId.delete(callId); + toolDescriptionsByCallId.delete(callId); +} + /** Record tool name → description from an event's `tools`, so tool spans can backfill the description. */ function recordToolDescriptions(callId: string | undefined, tools: unknown): void { if (!callId || !Array.isArray(tools)) { @@ -172,10 +182,10 @@ export interface VercelAiChannelMessage { * nested AI SDK operations (model calls, tool calls) become children of the enclosing span without * any manual parent bookkeeping here. */ -type VercelAiTracingChannelFactory = (name: string) => TracingChannel; +export type VercelAiTracingChannelFactory = (name: string) => TracingChannel; /** Integration-level recording options, pinned at subscribe time so we never look the integration up per event. */ -interface VercelAiChannelOptions { +export interface VercelAiChannelOptions { recordInputs?: boolean; recordOutputs?: boolean; enableTruncation?: boolean; diff --git a/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts new file mode 100644 index 000000000000..dc374314544b --- /dev/null +++ b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts @@ -0,0 +1,411 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; +import type { Span } from '@sentry/core'; +import { debug, getActiveSpan, SPAN_STATUS_ERROR, withActiveSpan } from '@sentry/core'; +import { DEBUG_BUILD } from '../debug-build'; +import { CHANNELS } from '../orchestrion/channels'; +import { bindTracingChannelToSpan, type TracingChannelPayloadWithSpan } from '../tracing-channel'; +import { + clearOperationCallId, + clearOperationId, + createSpanFromMessage, + enrichSpanOnEnd, + type VercelAiChannelMessage, + type VercelAiChannelOptions, + type VercelAiTracingChannelFactory, +} from './vercel-ai-dc-subscriber'; + +/** + * v6 channel adapter for the Vercel AI (`ai`) SDK. + * + * `ai` >= 7 publishes a normalized `ai:telemetry` tracing channel natively + * (consumed by `subscribeVercelAiTracingChannel`). v6 has no such channel, so + * orchestrion injects `orchestrion:ai:*` channels around the top-level + * functions (see `orchestrion/config.ts`). The injected channels carry only the + * wrapped call's `{ arguments, result, error }` — NOT v7's normalized `event` + * object — so this adapter reconstructs an equivalent {@link VercelAiChannelMessage} + * from v6's argument/result shapes and delegates to the SAME span-building core + * (`createSpanFromMessage` / `enrichSpanOnEnd`) the v7 subscriber uses, so the + * emitted spans are identical between v6 and v7. + * + * Like the v7 subscriber, each operation channel is wired up via + * {@link bindTracingChannelToSpan}, which binds the opened span into the runtime's + * async context for the duration of the traced call and ends it when the call + * settles. That binding is what lets the model call below find its enclosing + * `invoke_agent` span via the active context (see {@link resolveModelCallParent}). + * + * The model call (`languageModelCall` / `generate_content` span) has no + * injectable definition in `ai`, so we instead wrap `resolveLanguageModel` (the + * single chokepoint every model call flows through) and monkey-patch + * `doGenerate`/`doStream` on the returned model. + */ + +/** Shape orchestrion's transform attaches to the tracing-channel context. */ +interface OrchestrionContext { + arguments: unknown[]; + result?: unknown; + error?: unknown; +} + +/** Builds the normalized message for a channel from the wrapped call's first-arg options. */ +type MessageBuilder = (options: Record, telemetry: Record) => VercelAiChannelMessage; + +/** A resolved `ai` language model — has `doGenerate`/`doStream` and identity fields. */ +interface ResolvedModel { + modelId?: string; + provider?: string; + doGenerate?: (...args: unknown[]) => Promise; + doStream?: (...args: unknown[]) => Promise; +} + +const PATCHED = Symbol('SentryVercelAiModelPatched'); +const PARENT = Symbol('SentryVercelAiModelParent'); + +/** A resolved model with our patch bookkeeping (idempotency flag + captured parent span). */ +type PatchableModel = ResolvedModel & { [PATCHED]?: boolean; [PARENT]?: Span }; + +// Per-operation correlation id. No Date/random (unavailable / non-deterministic) — a counter is enough. +let callIdCounter = 0; +function nextCallId(): string { + return `v6-${++callIdCounter}`; +} + +// The message built on `start` for each operation, keyed by the (stable-identity) channel context, so +// the `beforeSpanEnd` handler can enrich the span from the settled result and clear the `callId` maps. +const messages = new WeakMap(); +// The spans we opened for top-level operations, and each one's `callId`. A model call resolves its +// parent against this set (so it never mis-attributes to the enclosing `main`/user span) and reads the +// parent's `callId` so its span can be named after the operation (e.g. `ai.streamText.doStream`). +const operationSpans = new WeakSet(); +const callIdBySpan = new WeakMap(); +// Carries the enclosing operation span down to the patched `doGenerate`/`doStream`, where the active +// span is the `ai` SDK's own (ignored) model-call span rather than our operation span. It's bound onto +// the operation channel via `bindStore` (see `bindOperation`), so it's scoped per traced operation and +// propagates across the awaits inside it — which is what lets concurrent operations sharing a single +// model instance each resolve their own parent (the `[PARENT]` slot on the shared model cannot). +const operationParentStore = new AsyncLocalStorage(); + +let subscribed = false; + +/** + * Subscribe the v6 orchestrion channel adapter. Safe to always call: inert on + * `ai` >= 7 (those channels are never published) and when orchestrion injection + * isn't active. Idempotent. + * + * `tracingChannel` is the platform-provided factory (the same one passed to + * `subscribeVercelAiTracingChannel`); `options` pins the recording settings at + * subscribe time so we never look the integration up per event. + */ +export function subscribeVercelAiOrchestrionChannels( + tracingChannel: VercelAiTracingChannelFactory, + options: VercelAiChannelOptions = {}, +): void { + if (subscribed) { + return; + } + subscribed = true; + + try { + bindOperation(tracingChannel, CHANNELS.VERCEL_AI_GENERATE_TEXT, buildTextMessage('generateText'), options); + bindOperation(tracingChannel, CHANNELS.VERCEL_AI_STREAM_TEXT, buildTextMessage('streamText'), options); + bindOperation( + tracingChannel, + CHANNELS.VERCEL_AI_EMBED, + (callOptions, telemetry) => ({ + type: 'embed', + event: { + callId: nextCallId(), + ...modelFields(callOptions.model), + maxRetries: callOptions.maxRetries, + value: callOptions.value, + ...recording(telemetry), + }, + }), + options, + ); + bindOperation( + tracingChannel, + CHANNELS.VERCEL_AI_EXECUTE_TOOL_CALL, + (callOptions, telemetry) => ({ + type: 'executeTool', + // v6 carries the tool definitions on the executeToolCall args (a record keyed by name); + // the shared core reads the matching tool's `description` for the span. + event: { + callId: nextCallId(), + toolCall: callOptions.toolCall, + tools: callOptions.tools, + ...recording(telemetry), + }, + }), + options, + ); + subscribeResolveLanguageModel(tracingChannel, CHANNELS.VERCEL_AI_RESOLVE_LANGUAGE_MODEL, options); + } catch { + DEBUG_BUILD && debug.log('Vercel AI orchestrion channel subscription failed.'); + } +} + +/** + * Bind one operation channel: `getSpan` opens a span from the message reconstructed out of the wrapped + * call's first argument; `beforeSpanEnd` enriches it from the settled result (tokens, output messages, + * finish reasons, …) before the helper ends the span. + * + * An operation whose `experimental_telemetry.isEnabled` is explicitly `false` is skipped entirely (no + * span): the orchestrion channel fires regardless of that flag, whereas v7's native `ai:telemetry` + * channel is simply not published in that case — so we reproduce v7's "no telemetry → no span". + */ +function bindOperation( + tracingChannel: VercelAiTracingChannelFactory, + channelName: string, + build: MessageBuilder, + options: VercelAiChannelOptions, +): void { + const channel = tracingChannel(channelName); + + // Bind the operation span into our own async-context store. We bind it BEFORE `bindTracingChannelToSpan` + // so that — bound stores run last-in-first-out — this transform runs AFTER the helper's producer has + // stashed the span on `data._sentrySpan`. `bindStore` activates the store via `runStores` for the + // traced operation, and that propagates across the awaits inside it, so a model call awaited within the + // operation reads ITS operation's span — no leak across sequential calls, no clobbering across + // concurrent ones (which a single mutable slot on the shared model instance cannot achieve). + // `bindStore`'s store type is the channel's data type; our store value is the operation span, so cast + // (the runtime treats the store value opaquely — same as `bindTracingChannelToSpan` does internally). + channel.start.bindStore(operationParentStore as unknown as AsyncLocalStorage, data => { + return (data as TracingChannelPayloadWithSpan)._sentrySpan as unknown as OrchestrionContext; + }); + + bindTracingChannelToSpan( + channel, + (data: TracingChannelPayloadWithSpan) => { + const callOptions = isRecord(data.arguments[0]) ? data.arguments[0] : {}; + const telemetry = isRecord(callOptions.experimental_telemetry) ? callOptions.experimental_telemetry : {}; + if (telemetry.isEnabled === false) { + return undefined; + } + const message = build(callOptions, telemetry); + const span = createSpanFromMessage(message, options); + if (span) { + messages.set(data, message); + operationSpans.add(span); + const callId = asString(message.event.callId); + if (callId) { + callIdBySpan.set(span, callId); + } + } + return span; + }, + { + beforeSpanEnd: (span, data) => { + const message = messages.get(data); + if (!message) { + return; + } + // The helper's `error` handler already set the span status; only enrich from a successful result. + if (!('error' in data)) { + // v6's `executeToolCall` returns the tool result/error object directly, whereas the shared core + // (matching v7) expects it nested under `output`; wrap it so tool-error detection works. + message.result = message.type === 'executeTool' ? { output: data.result } : data.result; + enrichSpanOnEnd(span, message, options); + } + // A `streamText` model call runs lazily, after this (synchronously-returning) operation's span has + // already ended, so its `callId` entry must outlive the operation — it's cleared once the model + // call settles (see `patchModelMethod`). Every other operation can clear here. + if (message.type !== 'streamText') { + clearOperationId(message); + } + messages.delete(data); + }, + }, + ); +} + +/** + * `resolveLanguageModel` returns the model every call flows through. We don't span it — on `end` we + * monkey-patch `doGenerate`/`doStream` on the returned model so each invocation produces a + * `languageModelCall` span parented to the enclosing invoke_agent span. + */ +function subscribeResolveLanguageModel( + tracingChannel: VercelAiTracingChannelFactory, + channelName: string, + options: VercelAiChannelOptions, +): void { + tracingChannel(channelName).subscribe({ + end(rawCtx) { + const ctx = rawCtx as OrchestrionContext; + if (!isRecord(ctx.result)) { + return; + } + const model = ctx.result as PatchableModel; + // `resolveLanguageModel` runs synchronously inside the operation body, where the operation span is + // the active span. `generateText`/`embed` recover that parent from `operationParentStore` at model + // call time, but `streamText` runs its model call lazily — after the operation's async context (and + // thus `operationParentStore`) has unwound — so stash the operation span on the model as its + // fallback parent here. + const active = getActiveSpan(); + if (active && operationSpans.has(active)) { + model[PARENT] = active; + } + if (!model[PATCHED]) { + model[PATCHED] = true; + patchModelMethod(model, 'doGenerate', options); + patchModelMethod(model, 'doStream', options); + } + }, + start() { + /* no-op */ + }, + asyncStart() { + /* no-op */ + }, + asyncEnd() { + /* no-op */ + }, + error() { + /* no-op */ + }, + }); +} + +/** + * Pick the invoke_agent span a model call should hang under. + * + * Prefer the active span when it is an operation span: for `generateText`/`embed` the model call is + * awaited inside the operation body, so the async context still carries the right operation span — and + * crucially this disambiguates concurrent calls that share one model instance (where the captured + * `model[PARENT]` would be whichever operation resolved the model last). Fall back to the captured + * parent for `streamText`, whose model call runs after the operation returned and the bound context has + * unwound. Returns `undefined` when neither is an operation span — e.g. telemetry was disabled for the + * enclosing call — so the model call is skipped too. + */ +function resolveModelCallParent(model: PatchableModel): Span | undefined { + // The model call is awaited inside the operation body (`generateText`/`embed`), so the operation span + // bound onto `operationParentStore` for that operation is still in scope — and is per-operation, so it + // stays correct even when concurrent operations share one model instance. + const fromContext = operationParentStore.getStore(); + if (fromContext && operationSpans.has(fromContext)) { + return fromContext; + } + // Fallback for `streamText`, whose `doStream` runs after the operation's async context has unwound: + // the parent captured on the model at resolve time. + const captured = model[PARENT]; + return captured && operationSpans.has(captured) ? captured : undefined; +} + +function patchModelMethod( + model: PatchableModel, + method: 'doGenerate' | 'doStream', + options: VercelAiChannelOptions, +): void { + const original = model[method]; + if (typeof original !== 'function') { + return; + } + model[method] = function (this: unknown, ...args: unknown[]): Promise { + const parent = resolveModelCallParent(model); + // No enclosing operation span (e.g. telemetry disabled for the call) → don't open a model-call span. + if (!parent) { + return Promise.resolve(original.apply(this, args)); + } + + const callArgs = isRecord(args[0]) ? args[0] : {}; + // Carry the operation's `callId` so the shared core can name the span after it + // (`ai.generateText.doGenerate` / `ai.streamText.doStream`). + const callId = callIdBySpan.get(parent); + const message: VercelAiChannelMessage = { + type: 'languageModelCall', + event: { + callId, + provider: model.provider, + modelId: model.modelId, + tools: callArgs.tools, + messages: callArgs.prompt, + }, + }; + const span = withActiveSpan(parent, () => createSpanFromMessage(message, options)); + // `languageModelCall` always opens a span; the guard just keeps the wrapper safe if that changes. + if (!span) { + return Promise.resolve(original.apply(this, args)); + } + + // `streamText` ends its operation span synchronously, so its `callId` entry was deliberately left in + // place for this (lazy) model call; drop it now that we've used it. + const clearStreamCallId = (): void => { + if (method === 'doStream' && callId) { + clearOperationCallId(callId); + } + }; + + let result: Promise; + try { + result = Promise.resolve(original.apply(this, args)); + } catch (error) { + span.setStatus({ code: SPAN_STATUS_ERROR, message: error instanceof Error ? error.message : 'unknown_error' }); + span.end(); + clearStreamCallId(); + throw error; + } + // `doStream` resolves to `{ stream, ... }` before the stream is consumed; we end here (start/end + // bracket the call) to match the channel timing. + return result.then( + value => { + message.result = value; + enrichSpanOnEnd(span, message, options); + span.end(); + clearStreamCallId(); + return value; + }, + error => { + span.setStatus({ code: SPAN_STATUS_ERROR, message: error instanceof Error ? error.message : 'unknown_error' }); + span.end(); + clearStreamCallId(); + throw error; + }, + ); + }; +} + +function buildTextMessage(type: 'generateText' | 'streamText'): MessageBuilder { + return (options, telemetry) => ({ + type, + event: { + callId: nextCallId(), + operationId: type === 'streamText' ? 'ai.streamText' : 'ai.generateText', + functionId: asString(telemetry.functionId), + ...modelFields(options.model), + maxRetries: options.maxRetries, + // Normalize to the message-array shape the shared core (and v7's channel) expects: a bare string + // `prompt` becomes a single user message, matching the SDK's own normalization. + messages: normalizePromptMessages(options), + ...recording(telemetry), + }, + }); +} + +function normalizePromptMessages(options: Record): unknown { + if (Array.isArray(options.messages)) { + return options.messages; + } + if (typeof options.prompt === 'string') { + return [{ role: 'user', content: options.prompt }]; + } + return options.messages ?? options.prompt; +} + +function recording(telemetry: Record): { recordInputs: unknown; recordOutputs: unknown } { + return { recordInputs: telemetry.recordInputs, recordOutputs: telemetry.recordOutputs }; +} + +function modelFields(model: unknown): { provider?: string; modelId?: string } { + return { provider: modelField(model, 'provider'), modelId: modelField(model, 'modelId') }; +} + +function modelField(model: unknown, field: 'modelId' | 'provider'): string | undefined { + return isRecord(model) ? asString(model[field]) : undefined; +} + +function asString(value: unknown): string | undefined { + return typeof value === 'string' ? value : undefined; +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null; +} From b203879db07f0e5d7e53a77e1dd6c0aa80ffb16f Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Mon, 29 Jun 2026 14:46:08 +0200 Subject: [PATCH 2/4] cleanup --- .../v6_v7/scenario-concurrent-stream.mjs | 61 ++++++++++++++++++ .../suites/tracing/vercelai/v6_v7/test.ts | 51 +++++++++++++++ .../vercel-ai-orchestrion-v6-subscriber.ts | 62 ++++++++----------- 3 files changed, 137 insertions(+), 37 deletions(-) create mode 100644 dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-concurrent-stream.mjs diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-concurrent-stream.mjs b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-concurrent-stream.mjs new file mode 100644 index 000000000000..e384661b3a4a --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/scenario-concurrent-stream.mjs @@ -0,0 +1,61 @@ +import * as Sentry from '@sentry/node'; +import { streamText } from 'ai'; +import { MockLanguageModelV3, simulateReadableStream } from 'ai/test'; + +function makeStreamModel(text) { + return new MockLanguageModelV3({ + doStream: async () => ({ + stream: simulateReadableStream({ + chunks: [ + { type: 'stream-start', warnings: [] }, + { type: 'text-start', id: '0' }, + { type: 'text-delta', id: '0', delta: text }, + { type: 'text-end', id: '0' }, + { + type: 'finish', + finishReason: { unified: 'stop', raw: 'stop' }, + usage: { + inputTokens: { total: 10, noCache: 10, cached: 0 }, + outputTokens: { total: 20, noCache: 20, cached: 0 }, + totalTokens: { total: 30, noCache: 30, cached: 0 }, + }, + }, + ], + }), + }), + }); +} + +async function consume(result) { + for await (const _part of result.fullStream) { + void _part; + } +} + +async function run() { + // A single model instance shared by two *concurrent* streamText calls. The shared model carries + // only a single captured-parent slot, so naive parent tracking could attribute a model call to + // whichever operation resolved the model last. Each `generate_content` (doStream) must land under + // its own `invoke_agent`, and both invoke_agents under `main`. + const sharedModel = makeStreamModel('shared!'); + + await Sentry.startSpan({ op: 'function', name: 'main' }, async () => { + // Start both operations before consuming either, so both resolve the shared model first. + const streams = [ + streamText({ + experimental_telemetry: { isEnabled: true }, + model: sharedModel, + prompt: 'Concurrent stream A?', + }), + streamText({ + experimental_telemetry: { isEnabled: true }, + model: sharedModel, + prompt: 'Concurrent stream B?', + }), + ]; + + await Promise.all(streams.map(consume)); + }); +} + +run(); diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts index 5811020ee218..69ce2d91ce32 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts @@ -526,6 +526,57 @@ describe.each([ }, ); + createEsmTests( + __dirname, + 'scenario-concurrent-stream.mjs', + 'instrument.mjs', + (createRunner, test) => { + // A single model instance shared by two concurrent `streamText` calls carries only one + // captured-parent slot, so both model calls must still land under their own `invoke_agent` — not + // collapse onto whichever operation resolved the shared model last. + test.skipIf(version === '7' && nodeVersion === 18)( + 'parents concurrent streamText calls that share one model instance correctly', + async () => { + await createRunner() + .withEnv(env) + .expect({ transaction: { transaction: 'main' } }) + .expect({ + span: container => { + const invokeAgents = container.items.filter( + span => span.attributes?.['sentry.op']?.value === 'gen_ai.invoke_agent', + ); + const generateContents = container.items.filter( + span => span.attributes?.['sentry.op']?.value === 'gen_ai.generate_content', + ); + + // Two concurrent operations -> two invoke_agent + two generate_content spans. + expect(invokeAgents).toHaveLength(2); + expect(generateContents).toHaveLength(2); + + const agentSpanIds = new Set(invokeAgents.map(span => span.span_id)); + + // Each model call lands under an invoke_agent span... + for (const span of generateContents) { + expect(agentSpanIds.has(span.parent_span_id!)).toBe(true); + } + // ...a distinct one each (no cross-attribution despite the shared model instance)... + expect(new Set(generateContents.map(span => span.parent_span_id)).size).toBe(2); + // ...and both operations sit under the same `main` parent. + expect(new Set(invokeAgents.map(span => span.parent_span_id)).size).toBe(1); + }, + }) + .start() + .completed(); + }, + ); + }, + { + additionalDependencies: { + ai: vercelAiVersion, + }, + }, + ); + createEsmTests( __dirname, 'scenario-stream-text.mjs', diff --git a/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts index dc374314544b..c94d1ab416ab 100644 --- a/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts +++ b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts @@ -1,6 +1,6 @@ import { AsyncLocalStorage } from 'node:async_hooks'; import type { Span } from '@sentry/core'; -import { debug, getActiveSpan, SPAN_STATUS_ERROR, withActiveSpan } from '@sentry/core'; +import { debug, SPAN_STATUS_ERROR, withActiveSpan } from '@sentry/core'; import { DEBUG_BUILD } from '../debug-build'; import { CHANNELS } from '../orchestrion/channels'; import { bindTracingChannelToSpan, type TracingChannelPayloadWithSpan } from '../tracing-channel'; @@ -58,10 +58,9 @@ interface ResolvedModel { } const PATCHED = Symbol('SentryVercelAiModelPatched'); -const PARENT = Symbol('SentryVercelAiModelParent'); -/** A resolved model with our patch bookkeeping (idempotency flag + captured parent span). */ -type PatchableModel = ResolvedModel & { [PATCHED]?: boolean; [PARENT]?: Span }; +/** A resolved model with our patch bookkeeping (idempotency flag). */ +type PatchableModel = ResolvedModel & { [PATCHED]?: boolean }; // Per-operation correlation id. No Date/random (unavailable / non-deterministic) — a counter is enough. let callIdCounter = 0; @@ -80,8 +79,11 @@ const callIdBySpan = new WeakMap(); // Carries the enclosing operation span down to the patched `doGenerate`/`doStream`, where the active // span is the `ai` SDK's own (ignored) model-call span rather than our operation span. It's bound onto // the operation channel via `bindStore` (see `bindOperation`), so it's scoped per traced operation and -// propagates across the awaits inside it — which is what lets concurrent operations sharing a single -// model instance each resolve their own parent (the `[PARENT]` slot on the shared model cannot). +// propagates across the awaits inside it. This holds for `streamText` too: `ai` initiates the model +// stream synchronously inside `streamText` (within this bound context), so the later `doStream` — even +// though it runs after the operation's span has already ended — still restores this store and reads ITS +// operation's span. That per-operation scoping is the sole parent-resolution mechanism, and it is what +// keeps concurrent operations sharing a single model instance from cross-attributing their model calls. const operationParentStore = new AsyncLocalStorage(); let subscribed = false; @@ -206,7 +208,7 @@ function bindOperation( message.result = message.type === 'executeTool' ? { output: data.result } : data.result; enrichSpanOnEnd(span, message, options); } - // A `streamText` model call runs lazily, after this (synchronously-returning) operation's span has + // A `streamText` model call runs after this (synchronously-returning) operation's span has // already ended, so its `callId` entry must outlive the operation — it's cleared once the model // call settles (see `patchModelMethod`). Every other operation can clear here. if (message.type !== 'streamText') { @@ -235,15 +237,9 @@ function subscribeResolveLanguageModel( return; } const model = ctx.result as PatchableModel; - // `resolveLanguageModel` runs synchronously inside the operation body, where the operation span is - // the active span. `generateText`/`embed` recover that parent from `operationParentStore` at model - // call time, but `streamText` runs its model call lazily — after the operation's async context (and - // thus `operationParentStore`) has unwound — so stash the operation span on the model as its - // fallback parent here. - const active = getActiveSpan(); - if (active && operationSpans.has(active)) { - model[PARENT] = active; - } + // Patch the model's `doGenerate`/`doStream` once. The model call recovers its parent from + // `operationParentStore` at call time (set per-operation by `bindOperation`), which propagates into + // the model call for `streamText` too, so there is nothing to capture on the model here. if (!model[PATCHED]) { model[PATCHED] = true; patchModelMethod(model, 'doGenerate', options); @@ -266,28 +262,20 @@ function subscribeResolveLanguageModel( } /** - * Pick the invoke_agent span a model call should hang under. + * Pick the invoke_agent span a model call should hang under: the operation span bound onto + * `operationParentStore` for the enclosing operation. * - * Prefer the active span when it is an operation span: for `generateText`/`embed` the model call is - * awaited inside the operation body, so the async context still carries the right operation span — and - * crucially this disambiguates concurrent calls that share one model instance (where the captured - * `model[PARENT]` would be whichever operation resolved the model last). Fall back to the captured - * parent for `streamText`, whose model call runs after the operation returned and the bound context has - * unwound. Returns `undefined` when neither is an operation span — e.g. telemetry was disabled for the - * enclosing call — so the model call is skipped too. + * This covers `generateText`/`embed` (whose model call is awaited inside the operation body) and + * `streamText` alike — `ai` initiates the stream synchronously within the operation's bound context, so + * `doStream` restores the same per-operation store even though it runs after the operation's span has + * ended. Being per-operation, the store disambiguates concurrent calls that share one model instance (a + * single mutable slot on the shared model could not — it would hold whichever operation resolved the + * model last). Returns `undefined` when the store doesn't carry an operation span — e.g. telemetry was + * disabled for the enclosing call — so the model call is skipped too. */ -function resolveModelCallParent(model: PatchableModel): Span | undefined { - // The model call is awaited inside the operation body (`generateText`/`embed`), so the operation span - // bound onto `operationParentStore` for that operation is still in scope — and is per-operation, so it - // stays correct even when concurrent operations share one model instance. +function resolveModelCallParent(): Span | undefined { const fromContext = operationParentStore.getStore(); - if (fromContext && operationSpans.has(fromContext)) { - return fromContext; - } - // Fallback for `streamText`, whose `doStream` runs after the operation's async context has unwound: - // the parent captured on the model at resolve time. - const captured = model[PARENT]; - return captured && operationSpans.has(captured) ? captured : undefined; + return fromContext && operationSpans.has(fromContext) ? fromContext : undefined; } function patchModelMethod( @@ -300,7 +288,7 @@ function patchModelMethod( return; } model[method] = function (this: unknown, ...args: unknown[]): Promise { - const parent = resolveModelCallParent(model); + const parent = resolveModelCallParent(); // No enclosing operation span (e.g. telemetry disabled for the call) → don't open a model-call span. if (!parent) { return Promise.resolve(original.apply(this, args)); @@ -327,7 +315,7 @@ function patchModelMethod( } // `streamText` ends its operation span synchronously, so its `callId` entry was deliberately left in - // place for this (lazy) model call; drop it now that we've used it. + // place for this later model call; drop it now that we've used it. const clearStreamCallId = (): void => { if (method === 'doStream' && callId) { clearOperationCallId(callId); From a391fe010dcef97fd3b8e8126affba8cc83262b5 Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Mon, 29 Jun 2026 14:55:52 +0200 Subject: [PATCH 3/4] cleanup --- .../vercel-ai-orchestrion-v6-subscriber.ts | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts index c94d1ab416ab..bfdae04ff1ab 100644 --- a/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts +++ b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts @@ -322,32 +322,23 @@ function patchModelMethod( } }; - let result: Promise; try { - result = Promise.resolve(original.apply(this, args)); + const result = Promise.resolve(original.apply(this, args)); + // `doStream` resolves to `{ stream, ... }` before the stream is consumed; we end here (start/end + // bracket the call) to match the channel timing. + return result.then(value => { + message.result = value; + enrichSpanOnEnd(span, message, options); + span.end(); + clearStreamCallId(); + return value; + }); } catch (error) { span.setStatus({ code: SPAN_STATUS_ERROR, message: error instanceof Error ? error.message : 'unknown_error' }); span.end(); clearStreamCallId(); throw error; } - // `doStream` resolves to `{ stream, ... }` before the stream is consumed; we end here (start/end - // bracket the call) to match the channel timing. - return result.then( - value => { - message.result = value; - enrichSpanOnEnd(span, message, options); - span.end(); - clearStreamCallId(); - return value; - }, - error => { - span.setStatus({ code: SPAN_STATUS_ERROR, message: error instanceof Error ? error.message : 'unknown_error' }); - span.end(); - clearStreamCallId(); - throw error; - }, - ); }; } @@ -356,7 +347,7 @@ function buildTextMessage(type: 'generateText' | 'streamText'): MessageBuilder { type, event: { callId: nextCallId(), - operationId: type === 'streamText' ? 'ai.streamText' : 'ai.generateText', + operationId: `ai.${type}`, functionId: asString(telemetry.functionId), ...modelFields(options.model), maxRetries: options.maxRetries, From c3f5431c6b2a6763c20c2f9abb60656c1aaf9520 Mon Sep 17 00:00:00 2001 From: Francesco Novy Date: Mon, 29 Jun 2026 16:09:34 +0200 Subject: [PATCH 4/4] fixes --- .../vercel-ai-orchestrion-v6-subscriber.ts | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts index bfdae04ff1ab..074bf1a1304a 100644 --- a/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts +++ b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts @@ -322,6 +322,15 @@ function patchModelMethod( } }; + // Both the synchronous throw and the async rejection of the model call must end the span with an + // error status (an `async` `doGenerate`/`doStream` that throws rejects rather than throwing here). + const failSpan = (error: unknown): never => { + span.setStatus({ code: SPAN_STATUS_ERROR, message: error instanceof Error ? error.message : 'unknown_error' }); + span.end(); + clearStreamCallId(); + throw error; + }; + try { const result = Promise.resolve(original.apply(this, args)); // `doStream` resolves to `{ stream, ... }` before the stream is consumed; we end here (start/end @@ -332,12 +341,9 @@ function patchModelMethod( span.end(); clearStreamCallId(); return value; - }); + }, failSpan); } catch (error) { - span.setStatus({ code: SPAN_STATUS_ERROR, message: error instanceof Error ? error.message : 'unknown_error' }); - span.end(); - clearStreamCallId(); - throw error; + return failSpan(error); } }; }