Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@ import { context, type Context } from '@opentelemetry/api';
import type { ReadableSpan, SpanProcessor, SpanExporter } from '@opentelemetry/sdk-trace-base';
import logger from '../utils/logging';

/** Default grace period (ms) to wait for child spans after root span ends */
const DEFAULT_FLUSH_GRACE_MS = 250;

/** Default maximum age (ms) for a trace before forcing flush */
const DEFAULT_MAX_TRACE_AGE_MS = 30000;

/** Guardrails to prevent unbounded memory growth / export bursts */
const DEFAULT_MAX_BUFFERED_TRACES = 1000;
const DEFAULT_MAX_SPANS_PER_TRACE = 5000;
const DEFAULT_MAX_CONCURRENT_EXPORTS = 20;
const DEFAULT_MAX_BATCH_SIZE = 64;

function readEnvInt(name: string, fallback: number): number {
const raw = process.env[name];
Expand All @@ -29,252 +24,241 @@ function isRootSpan(span: ReadableSpan): boolean {
return !span.parentSpanContext;
}

type TraceBuffer = {
spans: ReadableSpan[];
openCount: number;
rootEnded: boolean;
rootCtx?: Context; // holds the request Context (with token in ALS)
startedAtMs: number;
rootEndedAtMs?: number;
droppedSpans: number;
type TraceEntry = {
rootCtx?: Context; // request Context (token via ALS)
fallbackCtx?: Context; // first-seen context as fallback
queue: ReadableSpan[]; // small queue for micro-batching
flushScheduled: boolean; // whether a micro-batch flush is scheduled
lastActivityMs: number; // for LRU eviction
exportedCount: number; // total spans exported for this trace
droppedSpans: number; // spans dropped due to maxSpansPerTrace
};

type FlushReason = 'trace_completed' | 'root_ended_grace' | 'max_trace_age' | 'force_flush';

/**
* Buffers spans per trace and exports once the request completes.
* Exports spans on-end with micro-batching to avoid dropping late-ending spans.
* Token is not stored; we export under the saved request Context so that getExportToken()
* can read the token from the active OpenTelemetry Context at export time.
*
* Memory considerations:
* - Trace entries are retained (not deleted after flush) to handle late-ending spans
* - When maxBufferedTraces is reached, LRU eviction removes the least-recently-active entry
* - Expected memory footprint: ~1-2KB per trace entry × maxBufferedTraces (default 1000)
* - Queue sizes are small (bounded by maxBatchSize=64) to minimize buffering
*/
export class PerRequestSpanProcessor implements SpanProcessor {
private traces = new Map<string, TraceBuffer>();
private sweepTimer?: NodeJS.Timeout;
private isSweeping = false;
private traces = new Map<string, TraceEntry>();

private readonly maxBufferedTraces: number;
private readonly maxSpansPerTrace: number;
private readonly maxConcurrentExports: number;
private readonly maxBatchSize: number;

private inFlightExports = 0;
private exportWaiters: Array<() => void> = [];

constructor(
private readonly exporter: SpanExporter,
private readonly flushGraceMs: number = DEFAULT_FLUSH_GRACE_MS,
private readonly maxTraceAgeMs: number = DEFAULT_MAX_TRACE_AGE_MS
_flushGraceMs?: number, // kept for backward compatibility
_maxTraceAgeMs?: number // kept for backward compatibility
) {
// Defaults are intentionally high but bounded; override via env vars if needed.
// Set to 0 (or negative) to disable a guardrail.
this.maxBufferedTraces = readEnvInt('A365_PER_REQUEST_MAX_TRACES', DEFAULT_MAX_BUFFERED_TRACES);
this.maxSpansPerTrace = readEnvInt('A365_PER_REQUEST_MAX_SPANS_PER_TRACE', DEFAULT_MAX_SPANS_PER_TRACE);
this.maxConcurrentExports = readEnvInt('A365_PER_REQUEST_MAX_CONCURRENT_EXPORTS', DEFAULT_MAX_CONCURRENT_EXPORTS);
this.maxBatchSize = readEnvInt('A365_PER_REQUEST_MAX_BATCH_SIZE', DEFAULT_MAX_BATCH_SIZE);
}

onStart(span: ReadableSpan, ctx: Context): void {
const traceId = span.spanContext().traceId;
let buf = this.traces.get(traceId);
if (!buf) {
if (this.traces.size >= this.maxBufferedTraces) {
logger.warn(
`[PerRequestSpanProcessor] Dropping new trace due to maxBufferedTraces=${this.maxBufferedTraces} traceId=${traceId}`
);
return;
let entry = this.traces.get(traceId);
if (!entry) {
// Enforce maxBufferedTraces with LRU eviction
if (this.traces.size >= this.maxBufferedTraces && this.maxBufferedTraces > 0) {
this.evictLeastRecentlyUsed();
}

buf = {
spans: [],
openCount: 0,
rootEnded: false,
entry = {
queue: [],
flushScheduled: false,
rootCtx: undefined,
startedAtMs: Date.now(),
fallbackCtx: undefined,
lastActivityMs: Date.now(),
exportedCount: 0,
droppedSpans: 0,
};
this.traces.set(traceId, buf);
this.traces.set(traceId, entry);

this.ensureSweepTimer();

logger.info(
`[PerRequestSpanProcessor] Trace started traceId=${traceId} maxTraceAgeMs=${this.maxTraceAgeMs}`
);
logger.info(`[PerRequestSpanProcessor] Trace started traceId=${traceId}`);
}
buf.openCount += 1;

entry.lastActivityMs = Date.now();

// Debug lifecycle: span started
logger.info(
`[PerRequestSpanProcessor] Span start name=${span.name} traceId=${traceId} spanId=${span.spanContext().spanId}` +
` root=${isRootSpan(span)} openCount=${buf.openCount}`
` root=${isRootSpan(span)}`
);

// Capture a context to export under.
// - Use the first seen context as a fallback.
// - If/when the root span starts, prefer its context (contains token via ALS).
// Capture context for export:
// - If this is the root span, use its context (contains token via ALS)
// - Otherwise, use the first seen context as a fallback
// - Both rootCtx and fallbackCtx may be set; rootCtx takes precedence during export
if (isRootSpan(span)) {
buf.rootCtx = ctx;
entry.rootCtx = ctx;
} else {
buf.rootCtx ??= ctx;
entry.fallbackCtx ??= ctx;
}
}

onEnd(span: ReadableSpan): void {
const traceId = span.spanContext().traceId;
const buf = this.traces.get(traceId);
if (!buf) return;
const entry = this.traces.get(traceId);
if (!entry) {
// Span ended for an evicted or unknown trace; export without saved context
logger.warn(
`[PerRequestSpanProcessor] Span ended for unknown trace (likely evicted) traceId=${traceId} spanId=${span.spanContext().spanId}` +
` - exporting without saved context`
);
void this.exportBatch([span], context.active());
return;
}

entry.lastActivityMs = Date.now();

if (buf.spans.length >= this.maxSpansPerTrace) {
buf.droppedSpans += 1;
if (buf.droppedSpans === 1 || buf.droppedSpans % 100 === 0) {
// Check maxSpansPerTrace guardrail
const totalSpans = entry.exportedCount + entry.queue.length;
if (this.maxSpansPerTrace > 0 && totalSpans >= this.maxSpansPerTrace) {
entry.droppedSpans += 1;
if (entry.droppedSpans === 1 || entry.droppedSpans % 100 === 0) {
logger.warn(
`[PerRequestSpanProcessor] Dropping ended span due to maxSpansPerTrace=${this.maxSpansPerTrace} ` +
`traceId=${traceId} droppedSpans=${buf.droppedSpans}`
`traceId=${traceId} droppedSpans=${entry.droppedSpans}`
);
}
} else {
buf.spans.push(span);
}
buf.openCount -= 1;
if (buf.openCount < 0) {
logger.warn(
`[PerRequestSpanProcessor] openCount underflow traceId=${traceId} spanId=${span.spanContext().spanId} resettingToZero`
);
buf.openCount = 0;
return;
}

// Enqueue the span
entry.queue.push(span);

// Debug lifecycle: span ended
logger.info(
`[PerRequestSpanProcessor] Span end name=${span.name} traceId=${traceId} spanId=${span.spanContext().spanId}` +
` root=${isRootSpan(span)} openCount=${buf.openCount} rootEnded=${buf.rootEnded}`
` root=${isRootSpan(span)} queued=${entry.queue.length}`
);

if (isRootSpan(span)) {
buf.rootEnded = true;
buf.rootEndedAtMs = Date.now();
if (buf.openCount === 0) {
// Trace completed: root ended and no open spans remain.
this.flushTrace(traceId, 'trace_completed');
}
} else if (buf.rootEnded && buf.openCount === 0) {
// Common case: root ends first, then children finish shortly after.
// Flush immediately when the last child ends instead of waiting for grace/max timers.
this.flushTrace(traceId, 'trace_completed');
// Flush immediately if batch size reached or exceeded, otherwise schedule micro-batch
if (entry.queue.length >= this.maxBatchSize) {
// Reset flushScheduled flag before immediate flush
entry.flushScheduled = false;
void this.flushTrace(traceId);
} else if (!entry.flushScheduled) {
this.scheduleMicroBatchFlush(traceId);
}
}

async forceFlush(): Promise<void> {
await Promise.all([...this.traces.keys()].map((id) => this.flushTrace(id, 'force_flush')));
await Promise.all([...this.traces.keys()].map((id) => this.flushTrace(id)));
}

async shutdown(): Promise<void> {
await this.forceFlush();
this.stopSweepTimerIfIdle();
await this.exporter.shutdown?.();
}

private ensureSweepTimer(): void {
if (this.sweepTimer) return;

// Keep one lightweight sweeper. Interval is derived from grace/max-age to keep responsiveness reasonable.
const intervalMs = Math.max(10, Math.min(this.flushGraceMs, 250));
this.sweepTimer = setInterval(() => {
void this.sweep();
}, intervalMs);
private scheduleMicroBatchFlush(traceId: string): void {
const entry = this.traces.get(traceId);
if (!entry) return;

this.sweepTimer.unref?.();
}
// Avoid scheduling duplicate flushes
if (entry.flushScheduled) return;

private stopSweepTimerIfIdle(): void {
if (this.traces.size !== 0) return;
if (!this.sweepTimer) return;
clearInterval(this.sweepTimer);
this.sweepTimer = undefined;
entry.flushScheduled = true;
setImmediate(() => {
// Entry may have been evicted before callback executes
if (!this.traces.has(traceId)) return;
void this.flushTrace(traceId);
});
}

private async sweep(): Promise<void> {
if (this.isSweeping) return;
this.isSweeping = true;
try {
if (this.traces.size === 0) {
this.stopSweepTimerIfIdle();
return;
}

const now = Date.now();
const toFlush: Array<{ traceId: string; reason: FlushReason }> = [];
private evictLeastRecentlyUsed(): void {
// Linear scan to find LRU entry. With maxBufferedTraces=1000, this is acceptable.
// For higher limits, consider using a min-heap or doubly-linked list.
let oldestEntry: { traceId: string; lastActivityMs: number } | undefined;

for (const [traceId, trace] of this.traces.entries()) {
// 1) Max age safety flush (clears buffers even if spans never end)
if (now - trace.startedAtMs >= this.maxTraceAgeMs) {
toFlush.push({ traceId, reason: 'max_trace_age' });
continue;
}

// 2) Root ended grace window flush (clears buffers if children never end)
if (trace.rootEnded && trace.openCount > 0 && trace.rootEndedAtMs) {
if (now - trace.rootEndedAtMs >= this.flushGraceMs) {
toFlush.push({ traceId, reason: 'root_ended_grace' });
}
}
for (const [traceId, entry] of this.traces.entries()) {
if (!oldestEntry || entry.lastActivityMs < oldestEntry.lastActivityMs) {
oldestEntry = { traceId, lastActivityMs: entry.lastActivityMs };
}
}

// Flush in parallel; flushTrace removes entries eagerly.
await Promise.all(toFlush.map((x) => this.flushTrace(x.traceId, x.reason)));
this.stopSweepTimerIfIdle();
} finally {
this.isSweeping = false;
if (oldestEntry) {
logger.warn(
`[PerRequestSpanProcessor] Evicting LRU trace traceId=${oldestEntry.traceId} due to maxBufferedTraces=${this.maxBufferedTraces}`
);

// Flush any pending spans before eviction to avoid data loss
void this.flushTrace(oldestEntry.traceId);

// Delete the entry after flushing
this.traces.delete(oldestEntry.traceId);
}
}

private async flushTrace(traceId: string, reason: FlushReason): Promise<void> {
const trace = this.traces.get(traceId);
if (!trace) return;
private async flushTrace(traceId: string): Promise<void> {
const entry = this.traces.get(traceId);
if (!entry) return;

this.traces.delete(traceId);
this.stopSweepTimerIfIdle();
entry.flushScheduled = false;

const spans = trace.spans;
// Splice the queue to avoid re-exporting the same spans
const spans = entry.queue.splice(0, entry.queue.length);
if (spans.length === 0) return;

entry.exportedCount += spans.length;

logger.info(
`[PerRequestSpanProcessor] Flushing trace traceId=${traceId} reason=${reason} spans=${spans.length} rootEnded=${trace.rootEnded}`
`[PerRequestSpanProcessor] Flushing trace traceId=${traceId} spans=${spans.length} totalExported=${entry.exportedCount}`
);

// Must have captured the root context to access the token
if (!trace.rootCtx) {
logger.error(`[PerRequestSpanProcessor] Missing rootCtx for trace ${traceId}, cannot export spans`);
return;
// Select export context: rootCtx if available, fallback otherwise
const exportCtx = entry.rootCtx ?? entry.fallbackCtx ?? context.active();
if (!entry.rootCtx && !entry.fallbackCtx) {
logger.warn(`[PerRequestSpanProcessor] Missing saved context for trace ${traceId}, using context.active()`);
}

await this.exportBatch(spans, exportCtx);
}

private async exportBatch(spans: ReadableSpan[], exportCtx: Context): Promise<void> {
await this.acquireExportSlot();

try {
// Export under the original request Context so exporter can read the token from context.active()
// Export under the saved Context so exporter can read the token from context.active()
await new Promise<void>((resolve) => {
try {
context.with(trace.rootCtx as Context, () => {
context.with(exportCtx, () => {
try {
this.exporter.export(spans, (result) => {
// Log export failures but still resolve to avoid blocking processor
if (result.code !== 0) {
logger.error(
`[PerRequestSpanProcessor] Export failed traceId=${traceId} reason=${reason} code=${result.code}`,
`[PerRequestSpanProcessor] Export failed spans=${spans.length} code=${result.code}`,
result.error
);
} else {
logger.info(
`[PerRequestSpanProcessor] Export succeeded traceId=${traceId} reason=${reason} spans=${spans.length}`
);
logger.info(`[PerRequestSpanProcessor] Export succeeded spans=${spans.length}`);
}
resolve();
});
} catch (err) {
logger.error(
`[PerRequestSpanProcessor] Export threw traceId=${traceId} reason=${reason} spans=${spans.length}`,
err
);
logger.error(`[PerRequestSpanProcessor] Export threw spans=${spans.length}`, err);
resolve();
}
});
} catch (err) {
logger.error(`[PerRequestSpanProcessor] context.with threw traceId=${traceId} reason=${reason}`, err);
logger.error(`[PerRequestSpanProcessor] context.with threw spans=${spans.length}`, err);
resolve();
}
});
Expand Down
Loading