Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/mollifier-drainer-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Mollifier drainer replay: replay buffered entries into `engine.trigger`, stale-entry sweep, a drainer-health gauge, and run-engine cancelled/failed run APIs.
2 changes: 2 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server";
import { PassThrough } from "stream";
import * as Worker from "~/services/worker.server";
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server";
import { bootstrap } from "./bootstrap";
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
import {
Expand Down Expand Up @@ -228,6 +229,7 @@ Worker.init().catch((error) => {
});

initMollifierDrainerWorker();
initMollifierStaleSweepWorker();

bootstrap().catch((error) => {
logError(error);
Expand Down
37 changes: 30 additions & 7 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1063,13 +1063,16 @@ const EnvironmentSchema = z
// Separate switch for the drainer (consumer side) so it can be split
// off onto a dedicated worker service. Unset → inherits
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
// flip two switches. In multi-replica deployments, set this to "0"
// explicitly on every replica except the one dedicated drainer
// service — otherwise every replica's polling loop races for the
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a
// buffer when the system is off.
// flip two switches. Multi-replica drainers are correct — `popAndMarkDraining`
// is an atomic ZPOPMIN + status flip in one Lua call, so only one replica
// can win any given entry — but inefficient: polling load (SMEMBERS +
// per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY`
// is per-process so engine load also multiplies. Splitting the drainer
// onto a dedicated worker keeps that traffic off the request-serving
// replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch;
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a buffer
// when the system is off.
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
TRIGGER_MOLLIFIER_REDIS_HOST: z
Expand Down Expand Up @@ -1098,6 +1101,26 @@ const EnvironmentSchema = z
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
// Periodic sweep that scans buffer queue ZSETs for entries whose
// dwell exceeds the stale threshold. Independent of the drainer —
// its job is exactly to make a stuck/offline drainer visible to
// ops. Defaults: enabled when the mollifier is enabled, run every
// 5 minutes, alert on anything that's been dwelling for 5+ minutes
// (matches the sweep interval — "anything still here when we
// check" is the simplest threshold that converges).
TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z
.string()
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),
TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce
.number()
.int()
.positive()
.default(5 * 60_000),

BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
Expand Down
27 changes: 27 additions & 0 deletions apps/webapp/app/runEngine/services/triggerFailedTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { getEventRepository } from "~/v3/eventRepository/index.server";
import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server";
import { DefaultQueueManager } from "../concerns/queues.server";
import type { TriggerTaskRequest } from "../types";

Expand Down Expand Up @@ -176,6 +177,14 @@ export class TriggerFailedTaskService {
event.setAttribute("runId", failedRunFriendlyId);
event.failWithError(taskRunError);

// `emitRunFailedEvent: false` because this call site owns the
// trace-event lifecycle via the outer `traceEvent({
// incomplete: false, isError: true })`. Letting the engine
// emit `runFailed` here would race the
// `completeFailedRunEvent` listener against the outer trace
// event's own completion write for the same (traceId, spanId).
// We re-trigger the alerts side directly after the trace
// event closes, below.
return await this.engine.createFailedTaskRun({
friendlyId: failedRunFriendlyId,
environment: {
Expand All @@ -200,12 +209,30 @@ export class TriggerFailedTaskService {
spanId: event.spanId,
traceContext: traceContext as Record<string, unknown>,
taskEventStore: store,
emitRunFailedEvent: false,
...(queueName !== undefined && { queue: queueName }),
...(lockedQueueId !== undefined && { lockedQueueId }),
});
}
);

// Alerts side of `runFailed` — the engine emit was suppressed
// above so the trace-event completion isn't double-written; we
// still need the alert pipeline to fire so customers' ERROR
// channels see the failure. Best-effort: a failed enqueue logs
// but doesn't block returning the friendlyId, mirroring the
// engine handler's behaviour at runEngineHandlers.server.ts:81.
try {
await PerformTaskRunAlertsService.enqueue(failedRun.id);
} catch (alertsError) {
logger.warn("TriggerFailedTaskService: alert enqueue failed", {
taskId: request.taskId,
friendlyId: failedRun.friendlyId,
error:
alertsError instanceof Error ? alertsError.message : String(alertsError),
});
}

return failedRun.friendlyId;
} catch (createError) {
const createErrorMsg =
Expand Down
48 changes: 13 additions & 35 deletions apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import { createHash } from "node:crypto";
import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker";
import { MollifierDrainer } from "@trigger.dev/redis-worker";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { engine as runEngine } from "~/v3/runEngine.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { getMollifierBuffer } from "./mollifierBuffer.server";
import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server";
import {
createDrainerHandler,
isRetryablePgError,
} from "./mollifierDrainerHandler.server";
import type { MollifierSnapshot } from "./mollifierSnapshot.server";

// Distinct error class for the deterministic "fail loud at boot" throws
// below. The bootstrap in `mollifierDrainerWorker.server.ts` catches
Expand All @@ -25,7 +30,7 @@ export class MollifierConfigurationError extends Error {
}
}

function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> {
function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
const buffer = getMollifierBuffer();
if (!buffer) {
// Unreachable in normal config: getMollifierDrainer() gates on the
Expand Down Expand Up @@ -68,40 +73,13 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
});

// Phase 1 handler: no-op ack. The trigger has ALREADY been written to
// Postgres via engine.trigger (dual-write at the call site). Popping +
// acking here proves the dequeue mechanism works end-to-end without
// duplicating the work. Phase 2 will replace this with an engine.trigger
// replay that performs the actual Postgres write.
const drainer = new MollifierDrainer<BufferedTriggerPayload>({
const drainer = new MollifierDrainer<MollifierSnapshot>({
buffer,
handler: async (input) => {
// Hash the (re-serialised, canonical) payload on the drain side rather
// than on the trigger hot path. Burst-time CPU stays with engine.trigger;
// the drainer is the natural place for the audit-equivalence checksum.
// Re-serialisation is identity for the BufferedTriggerPayload shape
// (only strings/numbers/plain objects), so this hash matches what the
// call site wrote into Redis.
const reserialised = serialiseSnapshot(input.payload);
const payloadHash = createHash("sha256").update(reserialised).digest("hex");
logger.info("mollifier.drained", {
runId: input.runId,
envId: input.envId,
orgId: input.orgId,
taskId: input.payload.taskId,
attempts: input.attempts,
ageMs: Date.now() - input.createdAt.getTime(),
payloadBytes: reserialised.length,
payloadHash,
});
},
handler: createDrainerHandler({ engine: runEngine, prisma }),
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
// A no-op handler shouldn't throw, but if something does (e.g. an
// unexpected deserialise failure), don't loop — let it FAIL terminally
// so the entry is observable in metrics.
isRetryable: () => false,
isRetryable: isRetryablePgError,
});

return drainer;
Expand All @@ -114,7 +92,7 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
// handler registration, leaving a narrow window where a SIGTERM landing
// between `start()` and `process.once("SIGTERM", ...)` would skip the
// graceful stop. The split is intentional.
export function getMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> | null {
export function getMollifierDrainer(): MollifierDrainer<MollifierSnapshot> | null {
if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null;
return singleton("mollifierDrainer", initializeMollifierDrainer);
}
Loading