diff --git a/src/queue.ts b/src/queue.ts index 49f73a0..b4936a6 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,23 +1,23 @@ -import { EventEmitter } from 'node:events' import { randomUUID } from 'node:crypto' +import { EventEmitter } from 'node:events' import type { Logger } from 'pino' -import type { Storage } from './storage/types.ts' +import { Consumer } from './consumer.ts' +import { Producer } from './producer.ts' import type { Serde } from './serde/index.ts' +import { createJsonSerde } from './serde/index.ts' +import type { Storage } from './storage/types.ts' import type { - QueueConfig, - EnqueueOptions, + AfterExecutionHook, + CancelResult, EnqueueAndWaitOptions, + EnqueueOptions, EnqueueResult, - CancelResult, - MessageStatus, - UpdateResultTTLResult, JobHandler, + MessageStatus, + QueueConfig, QueueEvents, - AfterExecutionHook + UpdateResultTTLResult } from './types.ts' -import { Producer } from './producer.ts' -import { Consumer } from './consumer.ts' -import { createJsonSerde } from './serde/index.ts' import { abstractLogger, ensureLoggableError } from './utils/logging.ts' /** @@ -204,8 +204,7 @@ export class Queue extends EventEmitter { - this.#logger.error({ err: ensureLoggableError(error) }, 'Consumer emitted error.') - this.emit('error', error) + this.#emitError(error, 'Consumer emitted error.') }) this.#consumer.on('completed', (id, result) => { @@ -230,9 +229,18 @@ export class Queue extends EventEmitter { - const error = err instanceof Error ? err : new Error(String(err)) - this.#logger.error({ err: ensureLoggableError(error) }, 'Failed to start consumer.') - this.emit('error', error) + this.#emitError(err, 'Failed to start consumer.') }) } + + #emitError (err: unknown, message: string): void { + const error = err instanceof Error ? err : new Error(String(err)) + + if (this.listenerCount('error') > 0) { + this.emit('error', error) + return + } + + this.#logger.error({ err: ensureLoggableError(error) }, message) + } } diff --git a/src/reaper.ts b/src/reaper.ts index 8dbc55e..48486f0 100644 --- a/src/reaper.ts +++ b/src/reaper.ts @@ -1,9 +1,11 @@ -import { EventEmitter } from 'node:events' import { randomUUID } from 'node:crypto' -import type { Storage } from './storage/types.ts' +import { EventEmitter } from 'node:events' +import type { Logger } from 'pino' import type { Serde } from './serde/index.ts' -import type { QueueMessage } from './types.ts' import { createJsonSerde } from './serde/index.ts' +import type { Storage } from './storage/types.ts' +import type { QueueMessage } from './types.ts' +import { abstractLogger, ensureLoggableError } from './utils/logging.ts' import { parseState } from './utils/state.ts' interface LeaderElectionConfig { @@ -18,6 +20,7 @@ interface ReaperConfig { payloadSerde?: Serde visibilityTimeout?: number leaderElection?: LeaderElectionConfig + logger?: Logger } interface ReaperEvents { @@ -46,6 +49,7 @@ export class Reaper extends EventEmitter { #payloadSerde: Serde #visibilityTimeout: number #leaderElection: LeaderElectionConfig + #logger: Logger #running = false #unsubscribe: (() => Promise) | null = null @@ -63,6 +67,7 @@ export class Reaper extends EventEmitter { this.#visibilityTimeout = config.visibilityTimeout ?? 30000 this.#leaderElection = config.leaderElection ?? { enabled: false } this.#reaperId = randomUUID() + this.#logger = (config.logger ?? abstractLogger).child({ component: 'reaper', reaperId: this.#reaperId }) } /** @@ -201,7 +206,7 @@ export class Reaper extends EventEmitter { } } } catch (err) { - this.emit('error', err as Error) + this.#emitError(err, 'Leadership check failed.') } }, interval) } @@ -212,7 +217,7 @@ export class Reaper extends EventEmitter { async #tryAcquireLock (ttlMs: number): Promise { if (!this.#storage.acquireLeaderLock) { // Storage doesn't support leader election - this.emit('error', new Error('Storage does not support leader election')) + this.#emitError(new Error('Storage does not support leader election')) return false } @@ -294,7 +299,7 @@ export class Reaper extends EventEmitter { const timer = setTimeout(() => { this.#processingTimers.delete(id) this.#checkJob(id).catch(err => { - this.emit('error', err) + this.#emitError(err, 'Failed checking job after visibility timer.') }) }, this.#visibilityTimeout) @@ -337,7 +342,7 @@ export class Reaper extends EventEmitter { const timer = setTimeout(() => { this.#processingTimers.delete(id) this.#checkJob(id).catch(err => { - this.emit('error', err) + this.#emitError(err, 'Failed re-checking job after visibility timeout.') }) }, remaining) this.#processingTimers.set(id, timer) @@ -353,7 +358,7 @@ export class Reaper extends EventEmitter { */ async #recoverStalledJob (id: string, workerId?: string): Promise { if (!workerId) { - this.emit('error', new Error(`Cannot recover stalled job ${id}: no workerId in state`)) + this.#emitError(new Error(`Cannot recover stalled job ${id}: no workerId in state`)) return } @@ -435,7 +440,7 @@ export class Reaper extends EventEmitter { const timer = setTimeout(() => { this.#processingTimers.delete(queueMessage.id) this.#checkJob(queueMessage.id).catch(err => { - this.emit('error', err) + this.#emitError(err, 'Failed checking worker processing job.') }) }, remaining) this.#processingTimers.set(queueMessage.id, timer) @@ -446,4 +451,15 @@ export class Reaper extends EventEmitter { } } } + + #emitError (err: unknown, message = 'Reaper emitted error.'): void { + const error = err instanceof Error ? err : new Error(String(err)) + + if (this.listenerCount('error') > 0) { + this.emit('error', error) + return + } + + this.#logger.error({ err: ensureLoggableError(error) }, message) + } } diff --git a/test/queue.test.ts b/test/queue.test.ts index 48f84a2..fba808f 100644 --- a/test/queue.test.ts +++ b/test/queue.test.ts @@ -542,6 +542,47 @@ describe('Queue', () => { await localQueue.stop() }) + + it('should log errors when no queue error listeners are registered', async t => { + const { promise, resolve } = Promise.withResolvers() + + const localStorage = new MemoryStorage() + const logs: string[] = [] + const logger: Logger = { + fatal: () => {}, + error: () => { + resolve() + logs.push('error') + }, + warn: () => {}, + info: () => {}, + debug: () => {}, + trace: () => {}, + child () { + return this + } + } as unknown as Logger + + const localQueue = new Queue<{ value: number }, { result: number }>({ + storage: localStorage, + visibilityTimeout: 5000, + logger, + afterExecution: () => { + throw new Error('hook-error') + } + }) + + localQueue.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + await localQueue.start() + t.after(() => localQueue.stop()) + await localQueue.enqueue('job-1', { value: 21 }) + await promise + + assert.ok(logs.includes('error')) + }) }) describe('updateResultTTL', () => { diff --git a/test/reaper.test.ts b/test/reaper.test.ts index 2204e21..b16995f 100644 --- a/test/reaper.test.ts +++ b/test/reaper.test.ts @@ -1,6 +1,7 @@ import { describe, it, beforeEach, afterEach } from 'node:test' import assert from 'node:assert' import { setTimeout as sleep } from 'node:timers/promises' +import type { Logger } from 'pino' import { Queue, Reaper, MemoryStorage, type Job } from '../src/index.ts' import { once, waitForEvents } from './helpers/events.ts' @@ -51,6 +52,36 @@ describe('Reaper', () => { await reaper.stop() await reaper.stop() }) + + it('should log errors when no reaper error listeners are registered', async () => { + const logs: string[] = [] + const logger: Logger = { + fatal: () => {}, + error: () => { + logs.push('error') + }, + warn: () => {}, + info: () => {}, + debug: () => {}, + trace: () => {}, + child () { + return this + } + } as unknown as Logger + + const localReaper = new Reaper<{ value: number }>({ + storage, + visibilityTimeout: 100, + leaderElection: { enabled: true }, + logger + }) + + await storage.connect() + await localReaper.start() + await localReaper.stop() + + assert.ok(logs.includes('error')) + }) }) describe('stalled job detection', () => {