Skip to content

Commit 02d2334

Browse files
authored
fix(webapp): fix Redis connection leak in realtime streams and broken abort signal propagation (#3399)
Pool Redis connections for non-blocking ops (ingestData, appendPart, getLastChunkIndex) using a shared singleton instead of new Redis() per request. Use redis.disconnect() for immediate teardown in streamResponse cleanup. Add 15s inactivity timeout fallback. Fix broken request.signal in Remix/Express by wiring Express res.on('close') to an AbortController via httpAsyncStorage. All SSE/streaming routes now use getRequestAbortSignal() which fires reliably on client disconnect, bypassing the Node.js undici GC bug (nodejs/node#55428) that severs the signal chain.
1 parent f5b4d34 commit 02d2334

File tree

13 files changed

+85
-37
lines changed

13 files changed

+85
-37
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix Redis connection leak in realtime streams and broken abort signal propagation.
7+
8+
**Redis connections**: Non-blocking methods (ingestData, appendPart, getLastChunkIndex) now share a single Redis connection instead of creating one per request. streamResponse still uses dedicated connections (required for XREAD BLOCK) but now tears them down immediately via disconnect() instead of graceful quit(), with a 15s inactivity fallback.
9+
10+
**Abort signal**: request.signal is broken in Remix/Express due to a Node.js undici GC bug (nodejs/node#55428) that severs the signal chain when Remix clones the Request internally. Added getRequestAbortSignal() wired to Express res.on("close") via httpAsyncStorage, which fires reliably on client disconnect. All SSE/streaming routes updated to use it.

apps/webapp/CLAUDE.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ Use the `chrome-devtools` MCP server to visually verify local dashboard changes.
5959
Routes use Remix flat-file convention with dot-separated segments:
6060
`api.v1.tasks.$taskId.trigger.ts` -> `/api/v1/tasks/:taskId/trigger`
6161

62+
## Abort Signals
63+
64+
**Never use `request.signal`** for detecting client disconnects. It is broken due to a Node.js bug ([nodejs/node#55428](https://github.com/nodejs/node/issues/55428)) where the AbortSignal chain is severed when Remix internally clones the Request object. Instead, use `getRequestAbortSignal()` from `app/services/httpAsyncStorage.server.ts`, which is wired directly to Express `res.on("close")` and fires reliably.
65+
66+
```typescript
67+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
68+
69+
// In route handlers, SSE streams, or any server-side code:
70+
const signal = getRequestAbortSignal();
71+
```
72+
6273
## Environment Variables
6374

6475
Access via `env` export from `app/env.server.ts`. **Never use `process.env` directly.**

apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { type TaskRunAttempt } from "@trigger.dev/database";
22
import { eventStream } from "remix-utils/sse/server";
33
import { type PrismaClient, prisma } from "~/db.server";
4+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
45
import { logger } from "~/services/logger.server";
56
import { projectPubSub } from "~/v3/services/projectPubSub.server";
67

@@ -63,7 +64,9 @@ export class TasksStreamPresenter {
6364

6465
const subscriber = await projectPubSub.subscribe(`project:${project.id}:*`);
6566

66-
return eventStream(request.signal, (send, close) => {
67+
const signal = getRequestAbortSignal();
68+
69+
return eventStream(signal, (send, close) => {
6770
const safeSend = (args: { event?: string; data: string }) => {
6871
try {
6972
send(args);
@@ -95,7 +98,7 @@ export class TasksStreamPresenter {
9598
});
9699

97100
pinger = setInterval(() => {
98-
if (request.signal.aborted) {
101+
if (signal.aborted) {
99102
return close();
100103
}
101104

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { type ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
45
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
56
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
67
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
@@ -129,7 +130,7 @@ export const loader = createLoaderApiRoute(
129130
run.realtimeStreamsVersion
130131
);
131132

132-
return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, request.signal, {
133+
return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, getRequestAbortSignal(), {
133134
lastEventId,
134135
timeoutInSeconds,
135136
});

apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
45
import {
56
getInputStreamWaitpoint,
67
deleteInputStreamWaitpoint,
@@ -162,7 +163,7 @@ const loader = createLoaderApiRoute(
162163
request,
163164
run.friendlyId,
164165
`$trigger.input:${params.streamId}`,
165-
request.signal,
166+
getRequestAbortSignal(),
166167
{
167168
lastEventId,
168169
timeoutInSeconds,

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { $replica } from "~/db.server";
2121
import { useEnvironment } from "~/hooks/useEnvironment";
2222
import { useOrganization } from "~/hooks/useOrganizations";
2323
import { useProject } from "~/hooks/useProject";
24+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
2425
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
2526
import { requireUserId } from "~/services/session.server";
2627
import { cn } from "~/utils/cn";
@@ -89,7 +90,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
8990
run.realtimeStreamsVersion
9091
);
9192

92-
return realtimeStream.streamResponse(request, run.friendlyId, streamKey, request.signal, {
93+
return realtimeStream.streamResponse(request, run.friendlyId, streamKey, getRequestAbortSignal(), {
9394
lastEventId,
9495
});
9596
};

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { z } from "zod";
55
import { env } from "~/env.server";
66
import { findProjectBySlug } from "~/models/project.server";
77
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
8+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
89
import { requireUserId } from "~/services/session.server";
910
import { EnvironmentParamSchema } from "~/utils/pathBuilder";
1011
import { inflate } from "node:zlib";
@@ -92,7 +93,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
9293
const result = streamText({
9394
model: openai(env.AI_RUN_FILTER_MODEL ?? "gpt-5-mini"),
9495
temperature: 1,
95-
abortSignal: request.signal,
96+
abortSignal: getRequestAbortSignal(),
9697
system: systemPrompt,
9798
prompt,
9899
tools: {

apps/webapp/app/services/httpAsyncStorage.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export type HttpLocalStorage = {
55
path: string;
66
host: string;
77
method: string;
8+
abortController: AbortController;
89
};
910

1011
const httpLocalStorage = new AsyncLocalStorage<HttpLocalStorage>();
@@ -18,3 +19,15 @@ export function runWithHttpContext<T>(context: HttpLocalStorage, fn: () => T): T
1819
export function getHttpContext(): HttpLocalStorage | undefined {
1920
return httpLocalStorage.getStore();
2021
}
22+
23+
// Fallback signal that is never aborted, safe for tests and non-Express contexts.
24+
const neverAbortedSignal = new AbortController().signal;
25+
26+
/**
27+
* Returns an AbortSignal wired to the Express response's "close" event.
28+
* This bypasses the broken request.signal chain in @remix-run/express
29+
* (caused by Node.js undici GC bug nodejs/node#55428).
30+
*/
31+
export function getRequestAbortSignal(): AbortSignal {
32+
return httpLocalStorage.getStore()?.abortController.signal ?? neverAbortedSignal;
33+
}

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export type RealtimeStreamsOptions = {
77
redis: RedisOptions | undefined;
88
logger?: Logger;
99
logLevel?: LogLevel;
10-
inactivityTimeoutMs?: number; // Close stream after this many ms of no new data (default: 60000)
10+
inactivityTimeoutMs?: number; // Close stream after this many ms of no new data (default: 15000)
1111
};
1212

1313
// Legacy constant for backward compatibility (no longer written, but still recognized when reading)
@@ -23,10 +23,23 @@ type StreamChunk =
2323
export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
2424
private logger: Logger;
2525
private inactivityTimeoutMs: number;
26+
// Shared connection for short-lived non-blocking operations (XADD, XREVRANGE, EXPIRE).
27+
// Lazily created on first use so we don't open a connection if only streamResponse is called.
28+
private _sharedRedis: Redis | undefined;
2629

2730
constructor(private options: RealtimeStreamsOptions) {
2831
this.logger = options.logger ?? new Logger("RedisRealtimeStreams", options.logLevel ?? "info");
29-
this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 60000; // Default: 60 seconds
32+
this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 15000; // Default: 15 seconds
33+
}
34+
35+
private get sharedRedis(): Redis {
36+
if (!this._sharedRedis) {
37+
this._sharedRedis = new Redis({
38+
...this.options.redis,
39+
connectionName: "realtime:shared",
40+
});
41+
}
42+
return this._sharedRedis;
3043
}
3144

3245
async initializeStream(
@@ -43,7 +56,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
4356
signal: AbortSignal,
4457
options?: StreamResponseOptions
4558
): Promise<Response> {
46-
const redis = new Redis(this.options.redis ?? {});
59+
const redis = new Redis({ ...this.options.redis, connectionName: "realtime:streamResponse" });
4760
const streamKey = `stream:${runId}:${streamId}`;
4861
let isCleanedUp = false;
4962

@@ -269,7 +282,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
269282
async function cleanup() {
270283
if (isCleanedUp) return;
271284
isCleanedUp = true;
272-
await redis.quit().catch(console.error);
285+
// disconnect() tears down the TCP socket immediately, which causes any
286+
// pending XREAD BLOCK to reject right away instead of waiting for the
287+
// block timeout to elapse. quit() would queue behind the blocking command.
288+
redis.disconnect();
273289
}
274290

275291
signal.addEventListener("abort", cleanup, { once: true });
@@ -290,22 +306,12 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
290306
clientId: string,
291307
resumeFromChunk?: number
292308
): Promise<Response> {
293-
const redis = new Redis(this.options.redis ?? {});
309+
const redis = this.sharedRedis;
294310
const streamKey = `stream:${runId}:${streamId}`;
295311
const startChunk = resumeFromChunk ?? 0;
296312
// Start counting from the resume point, not from 0
297313
let currentChunkIndex = startChunk;
298314

299-
const self = this;
300-
301-
async function cleanup() {
302-
try {
303-
await redis.quit();
304-
} catch (error) {
305-
self.logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error });
306-
}
307-
}
308-
309315
try {
310316
const textStream = stream.pipeThrough(new TextDecoderStream());
311317
const reader = textStream.getReader();
@@ -361,13 +367,11 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
361367
this.logger.error("[RealtimeStreams][ingestData] Error in ingestData:", { error });
362368

363369
return new Response(null, { status: 500 });
364-
} finally {
365-
await cleanup();
366370
}
367371
}
368372

369373
async appendPart(part: string, partId: string, runId: string, streamId: string): Promise<void> {
370-
const redis = new Redis(this.options.redis ?? {});
374+
const redis = this.sharedRedis;
371375
const streamKey = `stream:${runId}:${streamId}`;
372376

373377
await redis.xadd(
@@ -386,12 +390,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
386390

387391
// Set TTL for cleanup when stream is done
388392
await redis.expire(streamKey, env.REALTIME_STREAM_TTL);
389-
390-
await redis.quit();
391393
}
392394

393395
async getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number> {
394-
const redis = new Redis(this.options.redis ?? {});
396+
const redis = this.sharedRedis;
395397
const streamKey = `stream:${runId}:${streamId}`;
396398

397399
try {
@@ -460,10 +462,6 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
460462
});
461463
// Return -1 to indicate we don't know what the server has
462464
return -1;
463-
} finally {
464-
await redis.quit().catch((err) => {
465-
this.logger.error("[RedisRealtimeStreams][getLastChunkIndex] Error in cleanup:", { err });
466-
});
467465
}
468466
}
469467

apps/webapp/app/utils/sse.server.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { eventStream } from "remix-utils/sse/server";
22
import { env } from "~/env.server";
3+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
34
import { logger } from "~/services/logger.server";
45

56
type SseProps = {
@@ -22,6 +23,8 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
2223
return new Response("SSE disabled", { status: 200 });
2324
}
2425

26+
const signal = getRequestAbortSignal();
27+
2528
let pinger: NodeJS.Timeout | undefined = undefined;
2629
let updater: NodeJS.Timeout | undefined = undefined;
2730
let timeout: NodeJS.Timeout | undefined = undefined;
@@ -32,7 +35,7 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
3235
clearTimeout(timeout);
3336
};
3437

35-
return eventStream(request.signal, (send, close) => {
38+
return eventStream(signal, (send, close) => {
3639
const safeSend = (args: { event?: string; data: string }) => {
3740
try {
3841
send(args);
@@ -60,15 +63,15 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
6063
};
6164

6265
pinger = setInterval(() => {
63-
if (request.signal.aborted) {
66+
if (signal.aborted) {
6467
return abort();
6568
}
6669

6770
safeSend({ event: "ping", data: new Date().toISOString() });
6871
}, pingInterval);
6972

7073
updater = setInterval(() => {
71-
if (request.signal.aborted) {
74+
if (signal.aborted) {
7275
return abort();
7376
}
7477

0 commit comments

Comments
 (0)