Skip to content

Commit bbf59b5

Browse files
d-csclaude
andcommitted
fix(webapp): stale-sweep stop() awaits in-flight tick before closing Redis
Devin review on PR #3754: `stop()` previously called `deps.state.close()` immediately after `clearInterval`, but `tick()` only checks `stopped` at entry. A tick that was already past that guard would keep making `state.*` calls against an ioredis client that `stop()` had already `quit()`ed — those calls would throw, the tick's own try/catch would swallow them as `mollifier.stale_sweep.failed` warnings, and every graceful shutdown would emit spurious noise. Track the current tick promise as `currentTick`. `stop()` awaits it (if present) before invoking `state.close()`, so the tick's last state call lands BEFORE the Redis client is quit. The tick's own try/catch handles the (unexpected) case where it rejects; the await in `stop()` is solely for ordering. Also drop the `instanceof MollifierStaleSweepState` guard around `state.close()` — `close()` is part of the `StaleSweepStateStore` contract, so unconditional invocation is correct. Test fake states implement `close()` as a no-op. Test: `stop() waits for an in-flight tick to finish before closing the state` — gates a fake state's `readCursor` on a Deferred, kicks off the interval, waits for the tick to start, then races `stop()` against the gate. Asserts the stop promise stays unresolved while the tick is mid-flight and that the tick's final state operation lands before `close()`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 70611c2 commit bbf59b5

2 files changed

Lines changed: 158 additions & 16 deletions

File tree

apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -198,20 +198,33 @@ export function startStaleSweepInterval(
198198
): StaleSweepIntervalHandle {
199199
let stopped = false;
200200
let inFlight = false;
201+
// Tracks the current tick so `stop()` can await it before closing the
202+
// state's Redis client. Without this, a tick that's already past the
203+
// `stopped` guard at entry would continue making `state.*` calls
204+
// against an ioredis client that `stop()` has already `quit()`ed,
205+
// raising errors that the tick's own try/catch then logs as
206+
// `mollifier.stale_sweep.failed` warnings — spurious noise on every
207+
// graceful shutdown.
208+
let currentTick: Promise<void> | null = null;
201209

202210
const tick = async () => {
203211
if (stopped || inFlight) return;
204212
inFlight = true;
205-
try {
206-
await runStaleSweepOnce(config, deps);
207-
} catch (err) {
208-
const log = deps.logger ?? defaultLogger;
209-
log.warn("mollifier.stale_sweep.failed", {
210-
err: err instanceof Error ? err.message : String(err),
211-
});
212-
} finally {
213-
inFlight = false;
214-
}
213+
const run = (async () => {
214+
try {
215+
await runStaleSweepOnce(config, deps);
216+
} catch (err) {
217+
const log = deps.logger ?? defaultLogger;
218+
log.warn("mollifier.stale_sweep.failed", {
219+
err: err instanceof Error ? err.message : String(err),
220+
});
221+
} finally {
222+
inFlight = false;
223+
currentTick = null;
224+
}
225+
})();
226+
currentTick = run;
227+
await run;
215228
};
216229

217230
const timer = setInterval(() => {
@@ -222,12 +235,22 @@ export function startStaleSweepInterval(
222235
stop: async () => {
223236
stopped = true;
224237
clearInterval(timer);
225-
// Close the durable-state Redis client if the deps own a real
226-
// `MollifierStaleSweepState`. Tests may inject a fake without a
227-
// `close()`; guard accordingly.
228-
if (deps.state instanceof MollifierStaleSweepState) {
229-
await deps.state.close();
238+
// Drain any tick that started before `stopped` flipped. Its
239+
// `state.*` calls must land before we close the Redis client.
240+
if (currentTick) {
241+
try {
242+
await currentTick;
243+
} catch {
244+
// tick has its own catch — this await is just to ensure
245+
// ordering, not to surface errors that have already been
246+
// logged inside the tick.
247+
}
230248
}
249+
// Close the state's underlying resource. The `close()` method is
250+
// part of the `StaleSweepStateStore` contract — production's
251+
// `MollifierStaleSweepState` shuts down its ioredis client; fake
252+
// test states implement a no-op.
253+
await deps.state.close();
231254
},
232255
};
233256
}

apps/webapp/test/mollifierStaleSweep.test.ts

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import { MollifierBuffer } from "@trigger.dev/redis-worker";
44

55
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
66

7-
import { runStaleSweepOnce } from "~/v3/mollifier/mollifierStaleSweep.server";
7+
import {
8+
runStaleSweepOnce,
9+
startStaleSweepInterval,
10+
} from "~/v3/mollifier/mollifierStaleSweep.server";
811
import { MollifierStaleSweepState } from "~/v3/mollifier/mollifierStaleSweepState.server";
912

1013
const SNAPSHOT = {
@@ -799,3 +802,119 @@ describe("MollifierStaleSweepState — direct unit tests", () => {
799802
},
800803
);
801804
});
805+
806+
describe("startStaleSweepInterval — lifecycle", () => {
807+
it("stop() waits for an in-flight tick to finish before closing the state", async () => {
808+
// Devin's BUG report on PR #3754: `stop()` previously called
809+
// `deps.state.close()` immediately after `clearInterval`, but the
810+
// `tick` function only checks `stopped` at entry. A tick that was
811+
// already past that check would keep making `state.*` Redis calls
812+
// against a now-closed ioredis client, throw, get caught by tick's
813+
// own try/catch, and log a `mollifier.stale_sweep.failed` warning
814+
// for every graceful shutdown.
815+
//
816+
// The fix tracks the current tick promise so `stop()` can await it
817+
// before closing. This test pins that order by gating one of the
818+
// tick's state calls on a Deferred — until we resolve it, the tick
819+
// can't progress, and `stop()` must hang in the meantime.
820+
let resolveGate: () => void = () => {};
821+
const gate = new Promise<void>((r) => {
822+
resolveGate = r;
823+
});
824+
825+
const callOrder: string[] = [];
826+
let closeCalled = false;
827+
const state = {
828+
readCursor: async () => {
829+
callOrder.push("readCursor:start");
830+
await gate;
831+
callOrder.push("readCursor:end");
832+
return 0;
833+
},
834+
writeCursor: async () => {
835+
callOrder.push("writeCursor");
836+
},
837+
rebuildOrgList: async () => {
838+
callOrder.push("rebuildOrgList");
839+
},
840+
readOrgListSlice: async () => {
841+
callOrder.push("readOrgListSlice");
842+
// Return zero orgs so the org loop is a no-op — we only care
843+
// about ordering of state calls vs close, not the work.
844+
return { orgs: [] as string[], total: 0 };
845+
},
846+
setEnvStaleCount: async () => {
847+
callOrder.push("setEnvStaleCount");
848+
},
849+
readAllEnvStaleCounts: async () => {
850+
callOrder.push("readAllEnvStaleCounts");
851+
return new Map<string, number>();
852+
},
853+
markEnvVisited: async () => {
854+
callOrder.push("markEnvVisited");
855+
},
856+
reconcileVisited: async () => {
857+
callOrder.push("reconcileVisited");
858+
},
859+
clearAll: async () => {
860+
callOrder.push("clearAll");
861+
},
862+
close: async () => {
863+
callOrder.push("close");
864+
closeCalled = true;
865+
},
866+
};
867+
868+
const fakeBuffer = {
869+
listOrgs: async () => [],
870+
listEnvsForOrg: async () => [],
871+
listEntriesForEnv: async () => [],
872+
} as any;
873+
874+
const handle = startStaleSweepInterval(
875+
{
876+
intervalMs: 20,
877+
staleThresholdMs: 60_000,
878+
maxOrgsPerPass: 10,
879+
},
880+
{
881+
state,
882+
getBuffer: () => fakeBuffer,
883+
recordStaleEntry: () => {},
884+
reportStaleEntrySnapshot: () => {},
885+
logger: { warn: () => {} },
886+
now: () => Date.now(),
887+
},
888+
);
889+
890+
// Wait for the interval to fire one tick. The tick will start, call
891+
// readCursor, and then block on `gate`.
892+
await new Promise((r) => setTimeout(r, 80));
893+
expect(callOrder).toContain("readCursor:start");
894+
expect(closeCalled).toBe(false);
895+
896+
// Call stop() concurrently — its promise MUST NOT resolve while the
897+
// tick is still mid-flight.
898+
let stopResolved = false;
899+
const stopPromise = handle.stop().then(() => {
900+
stopResolved = true;
901+
});
902+
await new Promise((r) => setTimeout(r, 50));
903+
expect(stopResolved).toBe(false);
904+
expect(closeCalled).toBe(false);
905+
906+
// Release the gate. The tick can now finish, and only then should
907+
// stop() resolve and close the state.
908+
resolveGate();
909+
await stopPromise;
910+
expect(stopResolved).toBe(true);
911+
expect(closeCalled).toBe(true);
912+
913+
// The tick's readCursor:end MUST appear before the close — otherwise
914+
// we closed the Redis client out from under an in-flight tick.
915+
expect(callOrder.indexOf("readCursor:end")).toBeGreaterThan(-1);
916+
expect(callOrder.indexOf("close")).toBeGreaterThan(
917+
callOrder.indexOf("readCursor:end"),
918+
);
919+
});
920+
});

0 commit comments

Comments
 (0)