Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 137 additions & 1 deletion apps/bot/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

import { execFileSync } from 'child_process'
import { Bot, InlineKeyboard, type Context } from 'grammy'
import { JOB_STATUS, type Job, type JobEvent } from '@wright/shared'
import { JOB_STATUS, REAPER_INTERVAL_MS, STALE_HEARTBEAT_MS, STALE_CLAIMED_MS, type Job, type JobEvent } from '@wright/shared'
import {
getSupabase,
insertJob,
getJob,
getJobByPrefix,
Expand Down Expand Up @@ -889,6 +890,138 @@ bot.catch((err) => {
console.error('Unhandled bot error:', err)
})

// ---------------------------------------------------------------------------
// Stale job reaper — detects dead workers via heartbeat expiry
// ---------------------------------------------------------------------------

function startReaper(): void {
const sb = getSupabase()

setInterval(async () => {
try {
const cutoff = new Date(Date.now() - STALE_HEARTBEAT_MS).toISOString()

// Find running jobs with stale or missing heartbeats
const { data: staleJobs, error } = await sb
.from('job_queue')
.select('id, attempt, max_attempts, worker_id, telegram_chat_id, task, heartbeat_at, started_at')
.eq('status', 'running')
.or(`heartbeat_at.lt.${cutoff},and(heartbeat_at.is.null,started_at.lt.${cutoff})`)

if (error) {
console.error('[reaper] Query error:', error.message)
return
}

if (!staleJobs || staleJobs.length === 0) return

console.log(`[reaper] Found ${staleJobs.length} stale running job(s)`)

for (const job of staleJobs) {
if (job.attempt < job.max_attempts) {
// Re-queue for retry
const { data: updated } = await sb
.from('job_queue')
.update({
status: 'queued',
worker_id: null,
claimed_at: null,
started_at: null,
heartbeat_at: null,
attempt: job.attempt + 1,
error: `Re-queued by reaper: worker stopped responding (attempt ${job.attempt + 1}/${job.max_attempts})`,
})
.eq('id', job.id)
.eq('status', 'running') // CAS: only if still running
.select('id')

if (updated && updated.length > 0) {
console.log(`[reaper] Re-queued job ${job.id} (attempt ${job.attempt + 1}/${job.max_attempts})`)

if (job.telegram_chat_id) {
try {
await bot.api.sendMessage(
job.telegram_chat_id,
`<b>[${job.id.slice(0, 8)}]</b> Worker stopped responding. `
+ `Re-queuing automatically (attempt ${job.attempt + 1}/${job.max_attempts}).`,
{ parse_mode: 'HTML' },
)
} catch {
// Best effort notification
}
}

wakeWorker()
}
} else {
// Max attempts exceeded — mark as permanently failed
const { data: updated } = await sb
.from('job_queue')
.update({
status: 'failed',
completed_at: new Date().toISOString(),
heartbeat_at: null,
error: `Failed: worker stopped responding after ${job.max_attempts} attempts`,
})
.eq('id', job.id)
.eq('status', 'running') // CAS
.select('id')

if (updated && updated.length > 0) {
console.log(`[reaper] Job ${job.id} permanently failed (max attempts)`)

if (job.telegram_chat_id) {
try {
await bot.api.sendMessage(
job.telegram_chat_id,
`<b>[${job.id.slice(0, 8)}]</b> Worker stopped responding. `
+ `Job has failed permanently after ${job.max_attempts} attempts.`,
{ parse_mode: 'HTML' },
)
} catch {
// Best effort notification
}
}
}
}
}

// Also check for stale claimed jobs (worker died before transitioning to running)
const claimedCutoff = new Date(Date.now() - STALE_CLAIMED_MS).toISOString()
const { data: staleClaimed } = await sb
.from('job_queue')
.select('id, worker_id')
.eq('status', 'claimed')
.lt('claimed_at', claimedCutoff)

if (staleClaimed && staleClaimed.length > 0) {
for (const job of staleClaimed) {
await sb
.from('job_queue')
.update({
status: 'queued',
worker_id: null,
claimed_at: null,
heartbeat_at: null,
error: `Re-queued by reaper: claimed by ${job.worker_id} but never started`,
})
.eq('id', job.id)
.eq('status', 'claimed') // CAS

console.log(`[reaper] Reset stale claimed job ${job.id}`)
}
wakeWorker()
}
} catch (err) {
console.error('[reaper] Unexpected error:', err)
}
}, REAPER_INTERVAL_MS)

console.log(
`[reaper] Stale job reaper started (interval: ${REAPER_INTERVAL_MS}ms, staleness: ${STALE_HEARTBEAT_MS}ms)`,
)
}

// ---------------------------------------------------------------------------
// Startup
// ---------------------------------------------------------------------------
Expand All @@ -901,6 +1034,9 @@ async function main(): Promise<void> {
// intentional -- we want a loud failure at startup.
startRealtimeBridge()

// Start the stale job reaper — detects dead workers via heartbeat expiry
startReaper()

// Start long polling. This will block until the process is stopped.
console.log('Bot is now polling for updates.')
await bot.start({
Expand Down
65 changes: 63 additions & 2 deletions apps/worker/src/queue-poller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createClient, type SupabaseClient } from '@supabase/supabase-js'
import type { Job } from '@wright/shared'
import { POLL_INTERVAL_MS, STALE_CLAIMED_MS, STALE_RUNNING_MS } from '@wright/shared'
import { POLL_INTERVAL_MS, STALE_CLAIMED_MS, STALE_RUNNING_MS, HEARTBEAT_INTERVAL_MS } from '@wright/shared'
import { runDevLoop } from './dev-loop.js'

// Worker identity — use Fly machine ID if available, otherwise hostname
Expand Down Expand Up @@ -120,6 +120,7 @@ export async function requeueCurrentJob(): Promise<string | null> {
worker_id: null,
claimed_at: null,
started_at: null,
heartbeat_at: null,
attempt: job.attempt + 1,
error: `Re-queued: worker shutdown (SIGTERM), attempt ${job.attempt + 1}/${job.max_attempts}`,
})
Expand All @@ -141,6 +142,7 @@ export async function requeueCurrentJob(): Promise<string | null> {
.update({
status: 'failed',
completed_at: new Date().toISOString(),
heartbeat_at: null,
error: `Failed after ${job.max_attempts} attempts (worker restarts)`,
})
.eq('id', job.id)
Expand Down Expand Up @@ -237,13 +239,26 @@ async function processJob(job: Job): Promise<void> {
`[queue-poller] Processing job ${job.id} (attempt ${job.attempt})`,
)

// Start heartbeat interval — proves this worker is alive while processing
const heartbeatTimer = setInterval(async () => {
try {
await supabase!
.from('job_queue')
.update({ heartbeat_at: new Date().toISOString() })
.eq('id', job.id)
} catch (err) {
console.error(`[heartbeat] Failed to update heartbeat for ${job.id}:`, err)
}
}, HEARTBEAT_INTERVAL_MS)

try {
// Mark as running
// Mark as running with initial heartbeat
await supabase
.from('job_queue')
.update({
status: 'running',
started_at: new Date().toISOString(),
heartbeat_at: new Date().toISOString(),
})
.eq('id', job.id)

Expand Down Expand Up @@ -297,6 +312,7 @@ async function processJob(job: Job): Promise<void> {
})
.eq('id', job.id)
} finally {
clearInterval(heartbeatTimer)
if (onJobEnd) onJobEnd(job.id)
currentJob = null
currentAbortController = null
Expand Down Expand Up @@ -329,11 +345,53 @@ async function startupCleanup(): Promise<void> {
status: 'queued',
worker_id: null,
claimed_at: null,
heartbeat_at: null,
})
.eq('id', job.id)
}
}

// 1b. Reset jobs still 'running' for this worker (interrupted by crash)
const { data: staleRunningThisWorker } = await supabase
.from('job_queue')
.select('id, attempt, max_attempts')
.eq('status', 'running')
.eq('worker_id', WORKER_ID)

if (staleRunningThisWorker && staleRunningThisWorker.length > 0) {
console.log(
`[queue-poller] Found ${staleRunningThisWorker.length} running job(s) from this worker (crash recovery)`,
)
for (const job of staleRunningThisWorker) {
if (job.attempt < job.max_attempts) {
await supabase
.from('job_queue')
.update({
status: 'queued',
worker_id: null,
claimed_at: null,
started_at: null,
heartbeat_at: null,
attempt: job.attempt + 1,
error: `Re-queued: worker crash recovery on startup (attempt ${job.attempt + 1}/${job.max_attempts})`,
})
.eq('id', job.id)
console.log(`[queue-poller] Re-queued running job ${job.id} (attempt ${job.attempt + 1}/${job.max_attempts})`)
} else {
await supabase
.from('job_queue')
.update({
status: 'failed',
completed_at: new Date().toISOString(),
heartbeat_at: null,
error: `Failed: worker crashed after ${job.max_attempts} attempts`,
})
.eq('id', job.id)
console.log(`[queue-poller] Job ${job.id} permanently failed (max attempts exceeded)`)
}
}
}

// 2. Reset jobs claimed by ANY worker for too long
const staleClaimedCutoff = new Date(
Date.now() - STALE_CLAIMED_MS,
Expand All @@ -355,6 +413,7 @@ async function startupCleanup(): Promise<void> {
status: 'queued',
worker_id: null,
claimed_at: null,
heartbeat_at: null,
error: `Reset: claimed by ${job.worker_id} but never started`,
})
.eq('id', job.id)
Expand Down Expand Up @@ -384,6 +443,7 @@ async function startupCleanup(): Promise<void> {
worker_id: null,
claimed_at: null,
started_at: null,
heartbeat_at: null,
attempt: job.attempt + 1,
error: `Re-queued: abandoned running job (attempt ${job.attempt + 1}/${job.max_attempts})`,
})
Expand All @@ -394,6 +454,7 @@ async function startupCleanup(): Promise<void> {
.update({
status: 'failed',
completed_at: new Date().toISOString(),
heartbeat_at: null,
error: `Failed: abandoned after ${job.max_attempts} attempts`,
})
.eq('id', job.id)
Expand Down
19 changes: 19 additions & 0 deletions packages/shared/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,25 @@ export const STALE_CLAIMED_MS = 2 * 60 * 1000 // 2 minutes
*/
export const STALE_RUNNING_MS = 30 * 60 * 1000 // 30 minutes

/**
* How often the worker sends a heartbeat while processing a job (ms).
*/
export const HEARTBEAT_INTERVAL_MS = 30_000 // 30 seconds

/**
* How long a running job can go without a heartbeat before being
* considered stale (ms). Must be > HEARTBEAT_INTERVAL_MS.
*
* Set to 3x the heartbeat interval to tolerate transient delays
* (slow DB writes, GC pauses, etc.)
*/
export const STALE_HEARTBEAT_MS = 90_000 // 90 seconds

/**
* How often the bot checks for stale running jobs (ms).
*/
export const REAPER_INTERVAL_MS = 60_000 // 60 seconds

/**
* Supabase table names.
*/
Expand Down
1 change: 1 addition & 0 deletions packages/shared/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface Job {
claimed_at?: string
started_at?: string
completed_at?: string
heartbeat_at?: string

// Error details on failure
error?: string
Expand Down
5 changes: 5 additions & 0 deletions supabase/migrations/20260319000000_add_heartbeat_at.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Add heartbeat_at column for worker liveness detection.
-- Workers update this timestamp every 30s while processing a job.
-- The bot-side reaper checks for stale heartbeats every 60s and
-- re-queues jobs whose workers have stopped responding.
ALTER TABLE job_queue ADD COLUMN heartbeat_at TIMESTAMPTZ;
Loading