From 8b4340cebd4c065139f550ab1f59d9fa991730af Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Fri, 20 Feb 2026 11:27:50 +0100 Subject: [PATCH] feat: Revisit queue start and stop. Signed-off-by: Paolo Insogna --- src/consumer.ts | 40 ++++++++++++++++++++++++++-------------- src/queue.ts | 1 + test/queue.test.ts | 23 +++++++++++++++++++++++ 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/src/consumer.ts b/src/consumer.ts index c223f73..fd834ef 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -55,6 +55,8 @@ export class Consumer extends EventEmitter[] = [] + #loopsDone: Promise | null = null #jobAbortControllers: Map = new Map() constructor (config: ConsumerConfig) { @@ -92,6 +94,13 @@ export class Consumer extends EventEmitter extends EventEmitter[] = [] + const abortSignal = this.#abortController.signal + this.#workerLoops = [] for (let i = 0; i < this.#concurrency; i++) { - loops.push(this.#workerLoop()) + const loopPromise = this.#workerLoop(abortSignal).catch(err => { + const error = err instanceof Error ? err : new Error(String(err)) + this.#logger.error({ err: ensureLoggableError(error) }, 'Worker loop terminated with error.') + this.emit('error', error) + }) + this.#workerLoops.push(loopPromise) } - // Don't await - let them run in background - Promise.all(loops).catch(err => { - const error = err instanceof Error ? err : new Error(String(err)) - this.#logger.error({ err: ensureLoggableError(error) }, 'Worker loop terminated with error.') - this.emit('error', error) - }) + this.#loopsDone = Promise.allSettled(this.#workerLoops).then(() => {}) this.#logger.debug('Consumer started.') } @@ -129,9 +139,9 @@ export class Consumer extends EventEmitter extends EventEmitter extends EventEmitter { - while (this.#running) { + async #workerLoop (abortSignal: AbortSignal): Promise { + while (this.#running && !abortSignal.aborted) { try { const message = await this.#storage.dequeue(this.#workerId, this.#blockTimeout) @@ -172,7 +184,7 @@ export class Consumer extends EventEmitter extends EventEmitter { await queue.stop() await queue.stop() }) + + it('should restart and keep processing jobs', async () => { + queue.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + await queue.start() + await queue.stop() + await queue.start() + + const completedPromise = Promise.race([ + once(queue, 'completed'), + sleep(1000).then(() => { + throw new Error('Timed out waiting for completed event after restart') + }) + ]) + + await queue.enqueue('job-after-restart', { value: 21 }) + const [completedId, completedResult] = await completedPromise + + assert.strictEqual(completedId, 'job-after-restart') + assert.deepStrictEqual(completedResult, { result: 42 }) + }) }) describe('enqueue', () => {