diff --git a/src/telemetry/export/otlp.ts b/src/telemetry/export/otlp.ts new file mode 100644 index 000000000..233369a1c --- /dev/null +++ b/src/telemetry/export/otlp.ts @@ -0,0 +1,334 @@ +import type { TelemetryEvent } from "../event"; + +type JsonPrimitive = string | number | boolean | null; +type JsonValue = + | JsonPrimitive + | JsonValue[] + | { readonly [key: string]: JsonValue }; +type JsonObject = Record; + +const STATUS_CODE_UNSET = 0; +const STATUS_CODE_OK = 1; +const STATUS_CODE_ERROR = 2; +const SPAN_KIND_INTERNAL = 1; + +const SEVERITY_NUMBER_INFO = 9; +const SEVERITY_NUMBER_ERROR = 17; + +const AGGREGATION_TEMPORALITY_DELTA = 1; + +const METRIC_EVENT_NAMES = new Set([ + "http.requests", + "ssh.network.info", + "ssh.network.sampled", +]); + +export function isMetricEvent(event: TelemetryEvent): boolean { + return METRIC_EVENT_NAMES.has(event.eventName); +} + +export function toOtlpLogResource(event: TelemetryEvent): JsonObject { + return { + resource: { attributes: resourceAttributes(event) }, + scopeLogs: [ + { + scope: instrumentationScope(), + logRecords: [toLogRecord(event)], + }, + ], + }; +} + +export function toOtlpSpanResource(event: TelemetryEvent): JsonObject { + return { + resource: { attributes: resourceAttributes(event) }, + scopeSpans: [ + { + scope: instrumentationScope(), + spans: [toSpan(event)], + }, + ], + }; +} + +export function toOtlpMetricResource(event: TelemetryEvent): JsonObject { + return { + resource: { attributes: resourceAttributes(event) }, + scopeMetrics: [ + { + scope: instrumentationScope(), + metrics: toMetrics(event), + }, + ], + }; +} + +function toLogRecord(event: TelemetryEvent): JsonObject { + const timeUnixNano = toUnixNano(event.timestamp); + return { + timeUnixNano, + observedTimeUnixNano: timeUnixNano, + severityNumber: + event.error === undefined ? SEVERITY_NUMBER_INFO : SEVERITY_NUMBER_ERROR, + severityText: event.error === undefined ? "INFO" : "ERROR", + body: { stringValue: event.eventName }, + attributes: eventAttributes(event), + }; +} + +function toSpan(event: TelemetryEvent): JsonObject { + const endTimeUnixNano = toUnixNano(event.timestamp); + const startTimeUnixNano = toSpanStartUnixNano(event, endTimeUnixNano); + return { + traceId: event.traceId ?? "", + spanId: event.eventId, + ...(event.parentEventId !== undefined && { + parentSpanId: event.parentEventId, + }), + name: spanName(event.eventName), + kind: SPAN_KIND_INTERNAL, + startTimeUnixNano, + endTimeUnixNano, + attributes: spanAttributes(event), + status: spanStatus(event), + ...(event.error !== undefined && { + events: [exceptionSpanEvent(event, endTimeUnixNano)], + }), + }; +} + +function toMetrics(event: TelemetryEvent): JsonObject[] { + if (event.eventName === "http.requests") { + return toHttpRequestMetrics(event); + } + return toGaugeMetrics(event, Object.entries(event.measurements)); +} + +function toHttpRequestMetrics(event: TelemetryEvent): JsonObject[] { + const windowSeconds = event.measurements.window_seconds; + const measurements = Object.entries(event.measurements).filter( + ([name]) => name !== "window_seconds", + ); + const countMetrics = measurements.filter(([name]) => + name.startsWith("count_"), + ); + const gaugeMetrics = measurements.filter( + ([name]) => !name.startsWith("count_"), + ); + const timeUnixNano = toUnixNano(event.timestamp); + return [ + ...countMetrics.map(([name, value]) => + toSumMetric(event, name, value, timeUnixNano, windowSeconds), + ), + ...toGaugeMetrics(event, gaugeMetrics, { + startTimeUnixNano: windowStartUnixNano(timeUnixNano, windowSeconds), + timeUnixNano, + }), + ]; +} + +function toSumMetric( + event: TelemetryEvent, + measurementName: string, + value: number, + timeUnixNano: string, + windowSeconds: number | undefined, +): JsonObject { + return { + name: `${event.eventName}.${measurementName}`, + description: event.eventName, + unit: "{request}", + sum: { + aggregationTemporality: AGGREGATION_TEMPORALITY_DELTA, + isMonotonic: true, + dataPoints: [ + { + attributes: metricAttributes(event), + startTimeUnixNano: windowStartUnixNano(timeUnixNano, windowSeconds), + timeUnixNano, + asInt: String(Math.trunc(value)), + }, + ], + }, + }; +} + +function toGaugeMetrics( + event: TelemetryEvent, + measurements: Array<[string, number]>, + times: { + readonly startTimeUnixNano?: string; + readonly timeUnixNano: string; + } = { + timeUnixNano: toUnixNano(event.timestamp), + }, +): JsonObject[] { + return measurements.map(([name, value]) => ({ + name: `${event.eventName}.${name}`, + description: event.eventName, + unit: metricUnit(name), + gauge: { + dataPoints: [ + { + attributes: metricAttributes(event), + ...(times.startTimeUnixNano !== undefined && { + startTimeUnixNano: times.startTimeUnixNano, + }), + timeUnixNano: times.timeUnixNano, + asDouble: value, + }, + ], + }, + })); +} + +function resourceAttributes(event: TelemetryEvent): JsonObject[] { + return keyValues({ + "service.name": "coder-vscode-extension", + "service.version": event.context.extensionVersion, + "coder.machine.id": event.context.machineId, + "coder.session.id": event.context.sessionId, + "os.type": event.context.osType, + "os.version": event.context.osVersion, + "host.arch": event.context.hostArch, + "vscode.platform.name": event.context.platformName, + "vscode.platform.version": event.context.platformVersion, + "coder.deployment.url": event.context.deploymentUrl, + }); +} + +function eventAttributes(event: TelemetryEvent): JsonObject[] { + return keyValues({ + ...event.properties, + ...event.measurements, + ...(event.error !== undefined && { + "exception.message": event.error.message, + ...(event.error.type !== undefined && { + "exception.type": event.error.type, + }), + ...(event.error.code !== undefined && { + "exception.code": event.error.code, + }), + }), + }); +} + +function spanAttributes(event: TelemetryEvent): JsonObject[] { + return keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + ...Object.fromEntries( + Object.entries(event.measurements).filter( + ([name]) => name !== "durationMs", + ), + ), + }); +} + +function metricAttributes(event: TelemetryEvent): JsonObject[] { + return keyValues({ + "coder.event_name": event.eventName, + ...event.properties, + }); +} + +function keyValues( + values: Readonly>, +): JsonObject[] { + return Object.entries(values).map(([key, value]) => { + const otlpValue: JsonObject = + typeof value === "number" + ? { doubleValue: value } + : { stringValue: value }; + return { key, value: otlpValue }; + }); +} + +function instrumentationScope(): JsonObject { + return { + name: "coder.vscode-coder.telemetry.export", + }; +} + +function spanStatus(event: TelemetryEvent): JsonObject { + if (event.properties.result === "success") { + return { code: STATUS_CODE_OK }; + } + if (event.properties.result === "error" || event.error !== undefined) { + return { + code: STATUS_CODE_ERROR, + ...(event.error !== undefined && { message: event.error.message }), + }; + } + return { code: STATUS_CODE_UNSET }; +} + +function exceptionSpanEvent( + event: TelemetryEvent, + timeUnixNano: string, +): JsonObject { + const error = event.error; + if (error === undefined) { + throw new Error("Cannot build exception event without an error."); + } + return { + name: "exception", + timeUnixNano, + attributes: keyValues({ + "exception.message": error.message, + ...(error.type !== undefined && { "exception.type": error.type }), + ...(error.code !== undefined && { "exception.code": error.code }), + }), + }; +} + +function toSpanStartUnixNano( + event: TelemetryEvent, + endTimeUnixNano: string, +): string { + const durationMs = event.measurements.durationMs; + if (durationMs === undefined) { + return endTimeUnixNano; + } + return String(BigInt(endTimeUnixNano) - msToNanos(durationMs)); +} + +function windowStartUnixNano( + timeUnixNano: string, + windowSeconds: number | undefined, +): string { + if (windowSeconds === undefined) { + return timeUnixNano; + } + return String(BigInt(timeUnixNano) - secondsToNanos(windowSeconds)); +} + +function toUnixNano(timestamp: string): string { + const ms = Date.parse(timestamp); + if (!Number.isFinite(ms)) { + throw new Error(`Invalid telemetry timestamp '${timestamp}'.`); + } + return String(BigInt(ms) * 1_000_000n); +} + +function msToNanos(ms: number): bigint { + return BigInt(Math.max(0, Math.round(ms * 1_000_000))); +} + +function secondsToNanos(seconds: number): bigint { + return BigInt(Math.max(0, Math.round(seconds * 1_000_000_000))); +} + +function spanName(eventName: string): string { + return eventName.split(".").at(-1) ?? eventName; +} + +function metricUnit(measurementName: string): string { + if (measurementName.endsWith("_ms") || measurementName.endsWith("Ms")) { + return "ms"; + } + if (measurementName.endsWith("Mbits")) { + return "Mbit/s"; + } + return "1"; +} diff --git a/src/telemetry/export/writers.ts b/src/telemetry/export/writers.ts index 4c6e97af8..1d9b50860 100644 --- a/src/telemetry/export/writers.ts +++ b/src/telemetry/export/writers.ts @@ -1,17 +1,34 @@ +import { zip } from "fflate"; import { createWriteStream } from "node:fs"; +import * as fs from "node:fs/promises"; +import * as path from "node:path"; import { Readable } from "node:stream"; import { pipeline } from "node:stream/promises"; import { writeAtomically } from "../../util/fs"; import { serializeTelemetryEvent } from "../wireFormat"; +import { + isMetricEvent, + toOtlpLogResource, + toOtlpMetricResource, + toOtlpSpanResource, +} from "./otlp"; + import type { TelemetryEvent } from "../event"; +export interface OtlpExportCounts { + readonly events: number; + readonly logs: number; + readonly traces: number; + readonly metrics: number; +} + /** * Writes `events` as a JSON array to `outputPath` via a temp file + atomic * rename, so a partial write never replaces the destination. Streams chunks - * with backpressure so memory stays flat even for large exports. - * Returns the number of events written. + * with backpressure so memory stays flat even for large exports. Returns + * the number of events written. */ export async function writeJsonArrayExport( outputPath: string, @@ -35,3 +52,139 @@ export async function writeJsonArrayExport( }); return count; } + +/** + * Writes `events` as an OTLP/JSON zip (`logs.json`, `traces.json`, + * `metrics.json`) to `outputPath`. Events stream into three JSON files in a + * sibling staging directory; once all are written, the files are packed into + * the zip and the result is renamed onto `outputPath`. + */ +export async function writeOtlpZipExport( + outputPath: string, + events: AsyncIterable, +): Promise { + return writeAtomically(outputPath, async (zipPath) => { + const stagingDir = await fs.mkdtemp(`${outputPath}.staging-`); + try { + const counts = await writeOtlpJsonFiles(stagingDir, events); + await packZip(zipPath, stagingDir, [ + "logs.json", + "traces.json", + "metrics.json", + ]); + return counts; + } finally { + await fs.rm(stagingDir, { recursive: true, force: true }); + } + }); +} + +async function writeOtlpJsonFiles( + dir: string, + events: AsyncIterable, +): Promise { + const logs = openEnvelope( + path.join(dir, "logs.json"), + '{"resourceLogs":[', + "]}\n", + ); + const traces = openEnvelope( + path.join(dir, "traces.json"), + '{"resourceSpans":[', + "]}\n", + ); + const metrics = openEnvelope( + path.join(dir, "metrics.json"), + '{"resourceMetrics":[', + "]}\n", + ); + + let total = 0; + try { + for await (const event of events) { + total += 1; + if (isMetricEvent(event)) { + await metrics.write(toOtlpMetricResource(event)); + } else if (event.traceId !== undefined) { + await traces.write(toOtlpSpanResource(event)); + } else { + await logs.write(toOtlpLogResource(event)); + } + } + } finally { + await Promise.all([logs.close(), traces.close(), metrics.close()]); + } + + return { + events: total, + logs: logs.count, + traces: traces.count, + metrics: metrics.count, + }; +} + +interface EnvelopeWriter { + readonly count: number; + write(value: unknown): Promise; + close(): Promise; +} + +/** Streams a `v1,v2,...` JSON envelope to disk. */ +function openEnvelope( + filePath: string, + prefix: string, + suffix: string, +): EnvelopeWriter { + const stream = createWriteStream(filePath, { encoding: "utf8" }); + const opened = writeChunk(stream, prefix); + let count = 0; + + return { + get count() { + return count; + }, + async write(value) { + await opened; + await writeChunk( + stream, + (count === 0 ? "" : ",") + JSON.stringify(value), + ); + count += 1; + }, + async close() { + await opened; + await writeChunk(stream, suffix); + await new Promise((resolve, reject) => { + stream.end((err?: Error | null) => (err ? reject(err) : resolve())); + }); + }, + }; +} + +function writeChunk( + stream: NodeJS.WritableStream, + chunk: string, +): Promise { + return new Promise((resolve, reject) => { + stream.write(chunk, "utf8", (err) => (err ? reject(err) : resolve())); + }); +} + +async function packZip( + outputPath: string, + sourceDir: string, + names: readonly string[], +): Promise { + const entries = await Promise.all( + names.map( + async (name) => + [name, await fs.readFile(path.join(sourceDir, name))] as const, + ), + ); + const archive = await new Promise((resolve, reject) => { + zip(Object.fromEntries(entries), (err, data) => + err ? reject(err) : resolve(data), + ); + }); + await fs.writeFile(outputPath, archive); +} diff --git a/test/unit/telemetry/export/writers.test.ts b/test/unit/telemetry/export/writers.test.ts index dd07c2df6..2cd05adca 100644 --- a/test/unit/telemetry/export/writers.test.ts +++ b/test/unit/telemetry/export/writers.test.ts @@ -1,7 +1,11 @@ +import { unzipSync } from "fflate"; import { vol } from "memfs"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { writeJsonArrayExport } from "@/telemetry/export/writers"; +import { + writeJsonArrayExport, + writeOtlpZipExport, +} from "@/telemetry/export/writers"; import { serializeTelemetryEvent } from "@/telemetry/wireFormat"; import { createTelemetryEventFactory } from "../../../mocks/telemetry"; @@ -90,6 +94,143 @@ describe("writeJsonArrayExport", () => { }); }); +describe("writeOtlpZipExport", () => { + it("writes a zip with OTLP/JSON logs, traces, and metrics files", async () => { + const outputPath = `${DIR}/telemetry.otlp.zip`; + const events = [ + makeEvent({ + eventName: "log.info", + properties: { source: "unit" }, + }), + makeEvent({ + eventName: "remote.setup.workspace_ready", + traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + properties: { result: "success" }, + measurements: { durationMs: 250 }, + }), + makeEvent({ + eventName: "http.requests", + properties: { method: "GET", route: "/api/v2/workspaces/{id}" }, + measurements: { + window_seconds: 60, + count_2xx: 2, + count_5xx: 1, + p95_duration_ms: 42, + }, + }), + makeEvent({ + eventName: "ssh.network.sampled", + properties: { p2p: "true" }, + measurements: { latencyMs: 35, downloadMbits: 10, uploadMbits: 5 }, + }), + ]; + + const counts = await writeOtlpZipExport(outputPath, asyncIterable(events)); + + expect(counts).toEqual({ events: 4, logs: 1, traces: 1, metrics: 2 }); + const entries = unzipSync(vol.readFileSync(outputPath) as Uint8Array); + expect(Object.keys(entries).sort()).toEqual([ + "logs.json", + "metrics.json", + "traces.json", + ]); + + const logs = decodeJson(entries["logs.json"]) as { + resourceLogs: Array<{ + scopeLogs: Array<{ + logRecords: Array<{ body: { stringValue: string } }>; + }>; + }>; + }; + const traces = decodeJson(entries["traces.json"]) as { + resourceSpans: Array<{ + scopeSpans: Array<{ + spans: Array<{ + traceId: string; + spanId: string; + name: string; + kind: number; + status: { code: number }; + startTimeUnixNano: string; + endTimeUnixNano: string; + }>; + }>; + }>; + }; + const metrics = decodeJson(entries["metrics.json"]) as { + resourceMetrics: Array<{ + scopeMetrics: Array<{ + metrics: Array<{ + name: string; + sum?: { dataPoints: Array<{ asInt: string }> }; + gauge?: { + dataPoints: Array<{ + asDouble: number; + startTimeUnixNano?: string; + timeUnixNano: string; + }>; + }; + }>; + }>; + }>; + }; + + expect( + logs.resourceLogs[0].scopeLogs[0].logRecords[0].body.stringValue, + ).toBe("log.info"); + const span = traces.resourceSpans[0].scopeSpans[0].spans[0]; + expect(span).toMatchObject({ + traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + name: "workspace_ready", + kind: 1, + status: { code: 1 }, + }); + expect(BigInt(span.endTimeUnixNano) - BigInt(span.startTimeUnixNano)).toBe( + 250_000_000n, + ); + const metricNames = metrics.resourceMetrics.flatMap((resource) => + resource.scopeMetrics.flatMap((scope) => + scope.metrics.map((metric) => metric.name), + ), + ); + expect(metricNames).toEqual([ + "http.requests.count_2xx", + "http.requests.count_5xx", + "http.requests.p95_duration_ms", + "ssh.network.sampled.latencyMs", + "ssh.network.sampled.downloadMbits", + "ssh.network.sampled.uploadMbits", + ]); + const httpP95 = metrics.resourceMetrics[0].scopeMetrics[0].metrics[2]; + expect( + metrics.resourceMetrics[0].scopeMetrics[0].metrics[0].sum?.dataPoints[0] + .asInt, + ).toBe("2"); + expect( + BigInt(httpP95.gauge?.dataPoints[0].timeUnixNano ?? "0") - + BigInt(httpP95.gauge?.dataPoints[0].startTimeUnixNano ?? "0"), + ).toBe(60_000_000_000n); + }); + + it("cleans up the staging directory when zipping fails", async () => { + const outputPath = `${DIR}/telemetry.otlp.zip`; + + const events = (async function* () { + yield makeEvent(); + await Promise.resolve(); + throw new Error("boom"); + })(); + + await expect(writeOtlpZipExport(outputPath, events)).rejects.toThrow( + /boom/, + ); + + // Output file never appears, and no staging dir leaks in the parent. + expect(vol.existsSync(outputPath)).toBe(false); + expect(vol.readdirSync(DIR)).toEqual([]); + }); +}); + async function* asyncIterable( events: readonly TelemetryEvent[], ): AsyncGenerator { @@ -102,3 +243,7 @@ async function* asyncIterable( function readJson(filePath: string): unknown { return JSON.parse(vol.readFileSync(filePath, "utf8") as string); } + +function decodeJson(bytes: Uint8Array): unknown { + return JSON.parse(Buffer.from(bytes).toString()); +}