Skip to content

Commit 7d47c84

Browse files
committed
fix(webapp): dedupe realtimeStreams array push on stream create
The PUT handler at /realtime/v1/streams/:runId/:target/:streamId unconditionally appended streamId to TaskRun.realtimeStreams on every call. SDK call patterns that re-initialize the same stream key on every chunk turned that into a per-write row UPDATE, bloating the array and contending on the row lock. Read the array first and only push when the streamId isn't already present, matching the existing append handler. First-time inits behave identically; repeat inits short-circuit to a single indexed read.
1 parent a8280f1 commit 7d47c84

2 files changed

Lines changed: 25 additions & 9 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Dedupe the `realtimeStreams` array push on `PUT /realtime/v1/streams/:runId/:target/:streamId` so repeat stream-init calls for the same `(run, streamId)` skip the row UPDATE, mirroring the existing append handler.

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,39 +62,49 @@ const { action } = createActionApiRoute(
6262

6363
if (request.method === "PUT") {
6464
// This is the "create" endpoint
65-
const updatedRun = await prisma.taskRun.update({
65+
const target = await prisma.taskRun.findFirst({
6666
where: {
6767
friendlyId: targetId,
6868
runtimeEnvironmentId: authentication.environment.id,
6969
},
70-
data: {
71-
realtimeStreams: {
72-
push: params.streamId,
73-
},
74-
},
7570
select: {
71+
id: true,
72+
realtimeStreams: true,
7673
realtimeStreamsVersion: true,
7774
completedAt: true,
7875
},
7976
});
8077

81-
if (updatedRun.completedAt) {
78+
if (!target) {
79+
return new Response("Run not found", { status: 404 });
80+
}
81+
82+
if (target.completedAt) {
8283
return new Response("Cannot initialize a realtime stream on a completed run", {
8384
status: 400,
8485
});
8586
}
8687

88+
if (!target.realtimeStreams.includes(params.streamId)) {
89+
await prisma.taskRun.update({
90+
where: { id: target.id },
91+
data: {
92+
realtimeStreams: { push: params.streamId },
93+
},
94+
});
95+
}
96+
8797
const realtimeStream = getRealtimeStreamInstance(
8898
authentication.environment,
89-
updatedRun.realtimeStreamsVersion,
99+
target.realtimeStreamsVersion,
90100
basinContext
91101
);
92102

93103
const { responseHeaders } = await realtimeStream.initializeStream(targetId, params.streamId);
94104

95105
return json(
96106
{
97-
version: updatedRun.realtimeStreamsVersion,
107+
version: target.realtimeStreamsVersion,
98108
},
99109
{ status: 202, headers: responseHeaders }
100110
);

0 commit comments

Comments
 (0)