Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
#running = false
#activeJobs = 0
#abortController: AbortController | null = null
#workerLoops: Promise<void>[] = []
#loopsDone: Promise<void> | null = null
#jobAbortControllers: Map<string, AbortController> = new Map()

constructor (config: ConsumerConfig<TPayload, TResult>) {
Expand Down Expand Up @@ -92,6 +94,13 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
throw new Error('No handler registered. Call execute() first.')
}

// Ensure any previous worker loops have exited before starting a new run.
if (this.#loopsDone) {
await this.#loopsDone
this.#workerLoops = []
this.#loopsDone = null
}

this.#running = true
this.#abortController = new AbortController()

Expand All @@ -101,17 +110,18 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
await this.#storage.registerWorker(this.#workerId, this.#visibilityTimeout * 2)

// Start worker loops based on concurrency
const loops: Promise<void>[] = []
const abortSignal = this.#abortController.signal
this.#workerLoops = []
for (let i = 0; i < this.#concurrency; i++) {
loops.push(this.#workerLoop())
const loopPromise = this.#workerLoop(abortSignal).catch(err => {
const error = err instanceof Error ? err : new Error(String(err))
this.#logger.error({ err: ensureLoggableError(error) }, 'Worker loop terminated with error.')
this.emit('error', error)
})
this.#workerLoops.push(loopPromise)
}

// Don't await - let them run in background
Promise.all(loops).catch(err => {
const error = err instanceof Error ? err : new Error(String(err))
this.#logger.error({ err: ensureLoggableError(error) }, 'Worker loop terminated with error.')
this.emit('error', error)
})
this.#loopsDone = Promise.allSettled(this.#workerLoops).then(() => {})

this.#logger.debug('Consumer started.')
}
Expand All @@ -129,9 +139,9 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
this.#running = false

// Signal abort to worker loops
if (this.#abortController) {
this.#abortController.abort()
}
const abortController = this.#abortController
this.#abortController = null
abortController?.abort()

// Wait for active jobs to complete (with timeout)
const maxWait = this.#visibilityTimeout
Expand All @@ -151,6 +161,8 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
}
this.#jobAbortControllers.clear()

// Worker loops are awaited on the next start() call.

// Unregister worker
await this.#storage.unregisterWorker(this.#workerId)
this.#logger.debug('Consumer stopped.')
Expand All @@ -159,8 +171,8 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
/**
* Worker loop that continuously dequeues and processes jobs
*/
async #workerLoop (): Promise<void> {
while (this.#running) {
async #workerLoop (abortSignal: AbortSignal): Promise<void> {
while (this.#running && !abortSignal.aborted) {
try {
const message = await this.#storage.dequeue(this.#workerId, this.#blockTimeout)

Expand All @@ -172,7 +184,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
this.#logger.trace('Dequeued job message.')

// Check if aborted
if (this.#abortController?.signal.aborted) {
if (abortSignal.aborted) {
// Put message back
const queueMessage = this.#deserializeMessage(message)
this.#logger.warn({ id: queueMessage.id }, 'Consumer aborted while holding job, requeueing.')
Expand Down
1 change: 1 addition & 0 deletions src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TR

if (this.#consumer) {
await this.#consumer.stop()
this.#consumer = null
}

await this.#storage.disconnect()
Expand Down
23 changes: 23 additions & 0 deletions test/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ describe('Queue', () => {
await queue.stop()
await queue.stop()
})

it('should restart and keep processing jobs', async () => {
queue.execute(async (job: Job<{ value: number }>) => {
return { result: job.payload.value * 2 }
})

await queue.start()
await queue.stop()
await queue.start()

const completedPromise = Promise.race([
once(queue, 'completed'),
sleep(1000).then(() => {
throw new Error('Timed out waiting for completed event after restart')
})
])

await queue.enqueue('job-after-restart', { value: 21 })
const [completedId, completedResult] = await completedPromise

assert.strictEqual(completedId, 'job-after-restart')
assert.deepStrictEqual(completedResult, { result: 42 })
})
})

describe('enqueue', () => {
Expand Down