Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions src/queue.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import { EventEmitter } from 'node:events'
import { randomUUID } from 'node:crypto'
import { EventEmitter } from 'node:events'
import type { Logger } from 'pino'
import type { Storage } from './storage/types.ts'
import { Consumer } from './consumer.ts'
import { Producer } from './producer.ts'
import type { Serde } from './serde/index.ts'
import { createJsonSerde } from './serde/index.ts'
import type { Storage } from './storage/types.ts'
import type {
QueueConfig,
EnqueueOptions,
AfterExecutionHook,
CancelResult,
EnqueueAndWaitOptions,
EnqueueOptions,
EnqueueResult,
CancelResult,
MessageStatus,
UpdateResultTTLResult,
JobHandler,
MessageStatus,
QueueConfig,
QueueEvents,
AfterExecutionHook
UpdateResultTTLResult
} from './types.ts'
import { Producer } from './producer.ts'
import { Consumer } from './consumer.ts'
import { createJsonSerde } from './serde/index.ts'
import { abstractLogger, ensureLoggableError } from './utils/logging.ts'

/**
Expand Down Expand Up @@ -204,8 +204,7 @@ export class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TR

// Forward consumer events
this.#consumer.on('error', error => {
this.#logger.error({ err: ensureLoggableError(error) }, 'Consumer emitted error.')
this.emit('error', error)
this.#emitError(error, 'Consumer emitted error.')
})

this.#consumer.on('completed', (id, result) => {
Expand All @@ -230,9 +229,18 @@ export class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TR

this.#consumer.execute(this.#handler)
this.#consumer.start().catch(err => {
const error = err instanceof Error ? err : new Error(String(err))
this.#logger.error({ err: ensureLoggableError(error) }, 'Failed to start consumer.')
this.emit('error', error)
this.#emitError(err, 'Failed to start consumer.')
})
}

#emitError (err: unknown, message: string): void {
const error = err instanceof Error ? err : new Error(String(err))

if (this.listenerCount('error') > 0) {
this.emit('error', error)
return
}

this.#logger.error({ err: ensureLoggableError(error) }, message)
}
}
34 changes: 25 additions & 9 deletions src/reaper.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { EventEmitter } from 'node:events'
import { randomUUID } from 'node:crypto'
import type { Storage } from './storage/types.ts'
import { EventEmitter } from 'node:events'
import type { Logger } from 'pino'
import type { Serde } from './serde/index.ts'
import type { QueueMessage } from './types.ts'
import { createJsonSerde } from './serde/index.ts'
import type { Storage } from './storage/types.ts'
import type { QueueMessage } from './types.ts'
import { abstractLogger, ensureLoggableError } from './utils/logging.ts'
import { parseState } from './utils/state.ts'

interface LeaderElectionConfig {
Expand All @@ -18,6 +20,7 @@ interface ReaperConfig<TPayload> {
payloadSerde?: Serde<TPayload>
visibilityTimeout?: number
leaderElection?: LeaderElectionConfig
logger?: Logger
}

interface ReaperEvents {
Expand Down Expand Up @@ -46,6 +49,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
#payloadSerde: Serde<TPayload>
#visibilityTimeout: number
#leaderElection: LeaderElectionConfig
#logger: Logger

#running = false
#unsubscribe: (() => Promise<void>) | null = null
Expand All @@ -63,6 +67,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
this.#visibilityTimeout = config.visibilityTimeout ?? 30000
this.#leaderElection = config.leaderElection ?? { enabled: false }
this.#reaperId = randomUUID()
this.#logger = (config.logger ?? abstractLogger).child({ component: 'reaper', reaperId: this.#reaperId })
}

/**
Expand Down Expand Up @@ -201,7 +206,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
}
}
} catch (err) {
this.emit('error', err as Error)
this.#emitError(err, 'Leadership check failed.')
}
}, interval)
}
Expand All @@ -212,7 +217,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
async #tryAcquireLock (ttlMs: number): Promise<boolean> {
if (!this.#storage.acquireLeaderLock) {
// Storage doesn't support leader election
this.emit('error', new Error('Storage does not support leader election'))
this.#emitError(new Error('Storage does not support leader election'))
return false
}

Expand Down Expand Up @@ -294,7 +299,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
const timer = setTimeout(() => {
this.#processingTimers.delete(id)
this.#checkJob(id).catch(err => {
this.emit('error', err)
this.#emitError(err, 'Failed checking job after visibility timer.')
})
}, this.#visibilityTimeout)

Expand Down Expand Up @@ -337,7 +342,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
const timer = setTimeout(() => {
this.#processingTimers.delete(id)
this.#checkJob(id).catch(err => {
this.emit('error', err)
this.#emitError(err, 'Failed re-checking job after visibility timeout.')
})
}, remaining)
this.#processingTimers.set(id, timer)
Expand All @@ -353,7 +358,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
*/
async #recoverStalledJob (id: string, workerId?: string): Promise<void> {
if (!workerId) {
this.emit('error', new Error(`Cannot recover stalled job ${id}: no workerId in state`))
this.#emitError(new Error(`Cannot recover stalled job ${id}: no workerId in state`))
return
}

Expand Down Expand Up @@ -435,7 +440,7 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
const timer = setTimeout(() => {
this.#processingTimers.delete(queueMessage.id)
this.#checkJob(queueMessage.id).catch(err => {
this.emit('error', err)
this.#emitError(err, 'Failed checking worker processing job.')
})
}, remaining)
this.#processingTimers.set(queueMessage.id, timer)
Expand All @@ -446,4 +451,15 @@ export class Reaper<TPayload> extends EventEmitter<ReaperEvents> {
}
}
}

#emitError (err: unknown, message = 'Reaper emitted error.'): void {
const error = err instanceof Error ? err : new Error(String(err))

if (this.listenerCount('error') > 0) {
this.emit('error', error)
return
}

this.#logger.error({ err: ensureLoggableError(error) }, message)
}
}
41 changes: 41 additions & 0 deletions test/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,47 @@ describe('Queue', () => {

await localQueue.stop()
})

it('should log errors when no queue error listeners are registered', async t => {
const { promise, resolve } = Promise.withResolvers<void>()

const localStorage = new MemoryStorage()
const logs: string[] = []
const logger: Logger = {
fatal: () => {},
error: () => {
resolve()
logs.push('error')
},
warn: () => {},
info: () => {},
debug: () => {},
trace: () => {},
child () {
return this
}
} as unknown as Logger

const localQueue = new Queue<{ value: number }, { result: number }>({
storage: localStorage,
visibilityTimeout: 5000,
logger,
afterExecution: () => {
throw new Error('hook-error')
}
})

localQueue.execute(async (job: Job<{ value: number }>) => {
return { result: job.payload.value * 2 }
})

await localQueue.start()
t.after(() => localQueue.stop())
await localQueue.enqueue('job-1', { value: 21 })
await promise

assert.ok(logs.includes('error'))
})
})

describe('updateResultTTL', () => {
Expand Down
31 changes: 31 additions & 0 deletions test/reaper.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { describe, it, beforeEach, afterEach } from 'node:test'
import assert from 'node:assert'
import { setTimeout as sleep } from 'node:timers/promises'
import type { Logger } from 'pino'
import { Queue, Reaper, MemoryStorage, type Job } from '../src/index.ts'
import { once, waitForEvents } from './helpers/events.ts'

Expand Down Expand Up @@ -51,6 +52,36 @@ describe('Reaper', () => {
await reaper.stop()
await reaper.stop()
})

it('should log errors when no reaper error listeners are registered', async () => {
const logs: string[] = []
const logger: Logger = {
fatal: () => {},
error: () => {
logs.push('error')
},
warn: () => {},
info: () => {},
debug: () => {},
trace: () => {},
child () {
return this
}
} as unknown as Logger

const localReaper = new Reaper<{ value: number }>({
storage,
visibilityTimeout: 100,
leaderElection: { enabled: true },
logger
})

await storage.connect()
await localReaper.start()
await localReaper.stop()

assert.ok(logs.includes('error'))
})
})

describe('stalled job detection', () => {
Expand Down