From 45465375c3bd3c9dcb38a0cdd0c7933a29be1096 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Thu, 19 Feb 2026 07:39:20 +0100 Subject: [PATCH] feat: afterExecution hook. Signed-off-by: Paolo Insogna --- README.md | 36 ++++++++++ src/consumer.ts | 97 +++++++++++++++++++++----- src/index.ts | 3 + src/producer.ts | 37 +++++++++- src/queue.ts | 16 ++++- src/types.ts | 38 ++++++++++ test/queue.test.ts | 169 +++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 376 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 2cf34f1..c98483b 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,7 @@ const queue = new Queue(config) | `blockTimeout` | `number` | `5` | Seconds to wait when polling for jobs | | `visibilityTimeout` | `number` | `30000` | Milliseconds before a processing job is considered stalled | | `resultTTL` | `number` | `3600000` | Milliseconds to cache job results (1 hour) | +| `afterExecution` | `AfterExecutionHook` | `undefined` | Hook called after execution and before persisting terminal state | | `payloadSerde` | `Serde` | `JsonSerde` | Custom serializer for job payloads | | `resultSerde` | `Serde` | `JsonSerde` | Custom serializer for job results | @@ -145,6 +146,25 @@ Throws `TimeoutError` if the job doesn't complete within the timeout. - If omitted, the producer uses the queue default `resultTTL` at enqueue time. - For duplicate IDs, the first accepted enqueue defines the TTL for that job. +`afterExecution` hook behavior: +- Runs after a successful execution or after the final failed attempt, before terminal state is persisted. +- Receives a mutable context object (passed by reference), including `result`/`error` and `ttl`. +- You can update `context.ttl` to change the stored result/error TTL dynamically. +- You can replace `context.result` / `context.error` to affect what is persisted. +- `id` and `status` are informational; changing them has no effect on persistence. + +```typescript +const queue = new Queue<{ url: string }, { body: string; maxAgeMs?: number }>({ + storage, + resultTTL: 60_000, + afterExecution: async (context) => { + if (context.status === 'completed' && context.result?.maxAgeMs) { + context.ttl = context.result.maxAgeMs + } + } +}) +``` + ##### `queue.cancel(id): Promise` Cancel a queued job. @@ -163,6 +183,22 @@ const result = await queue.cancel('job-123') Get the cached result of a completed job. +##### `queue.updateResultTTL(id, ttlMs): Promise` + +Update TTL for a terminal job payload: +- completed jobs: updates cached result TTL +- failed jobs: updates cached error TTL + +```typescript +const update = await queue.updateResultTTL('job-123', 5 * 60_000) + +// update.status can be: +// - 'updated' +// - 'not_found' +// - 'not_terminal' +// - 'missing_payload' +``` + ##### `queue.getStatus(id): Promise` Get the current status of a job. diff --git a/src/consumer.ts b/src/consumer.ts index 8cc9a64..b0891ef 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -1,7 +1,7 @@ import { EventEmitter } from 'node:events' import type { Storage } from './storage/types.ts' import type { Serde } from './serde/index.ts' -import type { QueueMessage, Job, JobHandler } from './types.ts' +import type { QueueMessage, Job, JobHandler, AfterExecutionHook, AfterExecutionContext } from './types.ts' import { MaxRetriesError } from './errors.ts' import { createJsonSerde } from './serde/index.ts' @@ -15,6 +15,7 @@ interface ConsumerConfig { maxRetries?: number resultTTL?: number visibilityTimeout?: number + afterExecution?: AfterExecutionHook } interface ConsumerEvents { @@ -27,6 +28,10 @@ interface ConsumerEvents { type ExtendedError = Error & { code?: string; toJSON?: () => Record } +const noopAfterExecution = async ( + _context: AfterExecutionContext +): Promise => {} + /** * Consumer handles processing jobs from the queue */ @@ -40,6 +45,7 @@ export class Consumer extends EventEmitter #handler: JobHandler | null = null #running = false @@ -58,6 +64,7 @@ export class Consumer extends EventEmitter extends EventEmitter { const queueMessage = this.#deserializeMessage(message) - const { id, payload, attempts, maxAttempts } = queueMessage - const resultTTL = queueMessage.resultTTL ?? this.#resultTTL + const { id, payload, attempts, maxAttempts, createdAt } = queueMessage + const resolvedTTL = queueMessage.resultTTL ?? this.#resultTTL // Check if job was cancelled (deleted from jobs hash) const state = await this.#storage.getJobState(id) @@ -185,16 +192,18 @@ export class Consumer extends EventEmitter = { id, payload, - attempts: attempts + 1, + attempts: currentAttempts, signal: jobAbortController.signal } @@ -203,17 +212,34 @@ export class Consumer extends EventEmitter extends EventEmitter extends EventEmitter + ): Promise> { + const originalTTL = context.ttl + + try { + await this.#afterExecution(context) + } catch (err) { + this.emit('error', err as Error) + context.ttl = originalTTL + } + + if (!Number.isFinite(context.ttl) || !Number.isInteger(context.ttl) || context.ttl <= 0) { + this.emit('error', new TypeError('resultTTL must be a positive integer in milliseconds')) + context.ttl = originalTTL + } + + return context + } + /** * Execute the job handler (supports both async and callback styles) */ diff --git a/src/index.ts b/src/index.ts index 9275feb..a3f6e9e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,11 +3,14 @@ export type { QueueMessage, MessageState, MessageStatus, + UpdateResultTTLResult, EnqueueOptions, EnqueueAndWaitOptions, EnqueueResult, CancelResult, Job, + AfterExecutionContext, + AfterExecutionHook, JobHandler, QueueConfig, QueueEvents diff --git a/src/producer.ts b/src/producer.ts index a289e02..9f431c5 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -9,7 +9,8 @@ import type { EnqueueResult, MessageStatus, QueueMessage, - SerializedError + SerializedError, + UpdateResultTTLResult } from './types.ts' import { parseState } from './utils/state.ts' @@ -172,6 +173,40 @@ export class Producer { return this.#resultSerde.deserialize(resultBuffer) } + /** + * Update TTL for a terminal job payload (result for completed jobs, error for failed jobs). + */ + async updateResultTTL (id: string, ttlMs: number): Promise { + this.#validateResultTTL(ttlMs) + + const state = await this.#storage.getJobState(id) + if (!state) { + return { status: 'not_found' } + } + + const { status } = parseState(state) + + if (status !== 'completed' && status !== 'failed') { + return { status: 'not_terminal' } + } + + if (status === 'completed') { + const existingResult = await this.#storage.getResult(id) + if (!existingResult) { + return { status: 'missing_payload' } + } + await this.#storage.setResult(id, existingResult, ttlMs) + return { status: 'updated' } + } + + const existingError = await this.#storage.getError(id) + if (!existingError) { + return { status: 'missing_payload' } + } + await this.#storage.setError(id, existingError, ttlMs) + return { status: 'updated' } + } + /** * Get the status of a job */ diff --git a/src/queue.ts b/src/queue.ts index c5f8fe8..da65514 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -9,8 +9,10 @@ import type { EnqueueResult, CancelResult, MessageStatus, + UpdateResultTTLResult, JobHandler, - QueueEvents + QueueEvents, + AfterExecutionHook } from './types.ts' import { Producer } from './producer.ts' import { Consumer } from './consumer.ts' @@ -34,6 +36,7 @@ export class Queue extends EventEmitter | undefined constructor (config: QueueConfig) { super() @@ -47,6 +50,7 @@ export class Queue extends EventEmitter({ storage: this.#storage, @@ -138,6 +142,13 @@ export class Queue extends EventEmitter { + return this.#producer.updateResultTTL(id, ttlMs) + } + /** * Get the status of a job */ @@ -157,7 +168,8 @@ export class Queue extends EventEmitter { error?: SerializedError } +/** + * Result of updating TTL for a terminal job payload (result/error) + */ +export type UpdateResultTTLResult = + | { status: 'updated' } + | { status: 'not_found' } + | { status: 'not_terminal' } + | { status: 'missing_payload' } + /** * Options for enqueue operation */ @@ -82,6 +91,32 @@ export interface Job { signal: AbortSignal } +/** + * Context passed to afterExecution hook. + */ +export interface AfterExecutionContext { + id: string + payload: TPayload + attempts: number + maxAttempts: number + createdAt: number + status: 'completed' | 'failed' + result?: TResult + error?: Error + ttl: number + workerId: string + startedAt: number + finishedAt: number + durationMs: number +} + +/** + * Hook executed after handler execution and before writing terminal state. + */ +export type AfterExecutionHook = ( + context: AfterExecutionContext +) => void | Promise + /** * Job handler function */ @@ -96,6 +131,9 @@ export interface QueueConfig { /** Storage backend (required) */ storage: Storage + /** Hook called after execution and before persisting terminal state */ + afterExecution?: AfterExecutionHook + /** Payload serializer (default: JSON) */ payloadSerde?: Serde diff --git a/test/queue.test.ts b/test/queue.test.ts index deef685..f5ae9fd 100644 --- a/test/queue.test.ts +++ b/test/queue.test.ts @@ -446,6 +446,175 @@ describe('Queue', () => { }) }) + describe('afterExecution hook', () => { + it('should allow overriding TTL and replacing result in afterExecution', async () => { + const localStorage = new MemoryStorage() + const localQueue = new Queue<{ value: number }, { result: number }>({ + storage: localStorage, + resultTTL: 5000, + visibilityTimeout: 5000, + afterExecution: context => { + assert.strictEqual(context.status, 'completed') + assert.strictEqual(context.id, 'job-1') + assert.strictEqual(context.payload.value, 21) + assert.strictEqual(context.attempts, 1) + assert.strictEqual(context.maxAttempts, 3) + assert.ok(context.durationMs >= 0) + + context.ttl = 20 + context.result = { result: 777 } + } + }) + + localQueue.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + await localQueue.start() + await localQueue.enqueue('job-1', { value: 21 }) + const [, completedResult] = await once(localQueue, 'completed') + + assert.deepStrictEqual(completedResult, { result: 777 }) + assert.deepStrictEqual(await localQueue.getResult('job-1'), { result: 777 }) + + await sleep(60) + assert.strictEqual(await localQueue.getResult('job-1'), null) + + await localQueue.stop() + }) + + it('should support async afterExecution hook on failed jobs', async () => { + const localStorage = new MemoryStorage() + const localQueue = new Queue<{ value: number }, { result: number }>({ + storage: localStorage, + resultTTL: 20, + visibilityTimeout: 5000, + afterExecution: async context => { + await sleep(5) + assert.strictEqual(context.status, 'failed') + context.ttl = 200 + context.error = new Error('updated boom') + } + }) + + localQueue.execute(async () => { + throw new Error('boom') + }) + + await localQueue.start() + await localQueue.enqueue('job-1', { value: 21 }, { maxAttempts: 1 }) + await once(localQueue, 'failed') + + await sleep(60) + const error = await localStorage.getError('job-1') + assert.ok(error) + assert.match(error.toString(), /updated boom/) + + await localQueue.stop() + }) + }) + + describe('updateResultTTL', () => { + it('should update TTL for completed jobs', async () => { + const localStorage = new MemoryStorage() + const localQueue = new Queue<{ value: number }, { result: number }>({ + storage: localStorage, + resultTTL: 20, + visibilityTimeout: 5000 + }) + + localQueue.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + await localQueue.start() + await localQueue.enqueue('job-1', { value: 21 }) + await once(localQueue, 'completed') + + const updateResult = await localQueue.updateResultTTL('job-1', 200) + assert.deepStrictEqual(updateResult, { status: 'updated' }) + + await sleep(60) + assert.deepStrictEqual(await localQueue.getResult('job-1'), { result: 42 }) + + await localQueue.stop() + }) + + it('should update TTL for failed jobs', async () => { + const localStorage = new MemoryStorage() + const localQueue = new Queue<{ value: number }, { result: number }>({ + storage: localStorage, + resultTTL: 20, + visibilityTimeout: 5000 + }) + + localQueue.execute(async () => { + throw new Error('boom') + }) + + await localQueue.start() + await localQueue.enqueue('job-1', { value: 1 }, { maxAttempts: 1 }) + await once(localQueue, 'failed') + + const updateResult = await localQueue.updateResultTTL('job-1', 200) + assert.deepStrictEqual(updateResult, { status: 'updated' }) + + await sleep(60) + const error = await localStorage.getError('job-1') + assert.ok(error) + + await localQueue.stop() + }) + + it('should return not_found when job does not exist', async () => { + await queue.start() + + const updateResult = await queue.updateResultTTL('missing-job', 100) + assert.deepStrictEqual(updateResult, { status: 'not_found' }) + }) + + it('should return not_terminal for queued jobs', async () => { + await queue.start() + await queue.enqueue('job-1', { value: 21 }) + + const updateResult = await queue.updateResultTTL('job-1', 100) + assert.deepStrictEqual(updateResult, { status: 'not_terminal' }) + }) + + it('should return missing_payload when terminal payload has expired', async () => { + const localStorage = new MemoryStorage() + const localQueue = new Queue<{ value: number }, { result: number }>({ + storage: localStorage, + resultTTL: 20, + visibilityTimeout: 5000 + }) + + localQueue.execute(async (job: Job<{ value: number }>) => { + return { result: job.payload.value * 2 } + }) + + await localQueue.start() + await localQueue.enqueue('job-1', { value: 21 }) + await once(localQueue, 'completed') + + await sleep(60) + + const updateResult = await localQueue.updateResultTTL('job-1', 100) + assert.deepStrictEqual(updateResult, { status: 'missing_payload' }) + + await localQueue.stop() + }) + + it('should reject invalid TTL values', async () => { + await queue.start() + await assert.rejects(queue.updateResultTTL('job-1', 0), (err: Error) => { + assert.strictEqual(err.name, 'TypeError') + assert.match(err.message, /resultTTL must be a positive integer/) + return true + }) + }) + }) + describe('concurrency', () => { it('should process multiple jobs concurrently', async () => { const concurrentQueue = new Queue<{ value: number }, { result: number }>({