Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const queue = new Queue<TPayload, TResult>(config)
| `blockTimeout` | `number` | `5` | Seconds to wait when polling for jobs |
| `visibilityTimeout` | `number` | `30000` | Milliseconds before a processing job is considered stalled |
| `resultTTL` | `number` | `3600000` | Milliseconds to cache job results (1 hour) |
| `afterExecution` | `AfterExecutionHook<TPayload, TResult>` | `undefined` | Hook called after execution and before persisting terminal state |
| `payloadSerde` | `Serde<TPayload>` | `JsonSerde` | Custom serializer for job payloads |
| `resultSerde` | `Serde<TResult>` | `JsonSerde` | Custom serializer for job results |

Expand Down Expand Up @@ -145,6 +146,25 @@ Throws `TimeoutError` if the job doesn't complete within the timeout.
- If omitted, the producer uses the queue default `resultTTL` at enqueue time.
- For duplicate IDs, the first accepted enqueue defines the TTL for that job.

`afterExecution` hook behavior:
- Runs after a successful execution or after the final failed attempt, before terminal state is persisted.
- Receives a mutable context object (passed by reference), including `result`/`error` and `ttl`.
- You can update `context.ttl` to change the stored result/error TTL dynamically.
- You can replace `context.result` / `context.error` to affect what is persisted.
- `id` and `status` are informational; changing them has no effect on persistence.

```typescript
const queue = new Queue<{ url: string }, { body: string; maxAgeMs?: number }>({
storage,
resultTTL: 60_000,
afterExecution: async (context) => {
if (context.status === 'completed' && context.result?.maxAgeMs) {
context.ttl = context.result.maxAgeMs
}
}
})
```

##### `queue.cancel(id): Promise<CancelResult>`

Cancel a queued job.
Expand All @@ -163,6 +183,22 @@ const result = await queue.cancel('job-123')

Get the cached result of a completed job.

##### `queue.updateResultTTL(id, ttlMs): Promise<UpdateResultTTLResult>`

Update TTL for a terminal job payload:
- completed jobs: updates cached result TTL
- failed jobs: updates cached error TTL

```typescript
const update = await queue.updateResultTTL('job-123', 5 * 60_000)

// update.status can be:
// - 'updated'
// - 'not_found'
// - 'not_terminal'
// - 'missing_payload'
```

##### `queue.getStatus(id): Promise<MessageStatus | null>`

Get the current status of a job.
Expand Down
97 changes: 80 additions & 17 deletions src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventEmitter } from 'node:events'
import type { Storage } from './storage/types.ts'
import type { Serde } from './serde/index.ts'
import type { QueueMessage, Job, JobHandler } from './types.ts'
import type { QueueMessage, Job, JobHandler, AfterExecutionHook, AfterExecutionContext } from './types.ts'
import { MaxRetriesError } from './errors.ts'
import { createJsonSerde } from './serde/index.ts'

Expand All @@ -15,6 +15,7 @@ interface ConsumerConfig<TPayload, TResult> {
maxRetries?: number
resultTTL?: number
visibilityTimeout?: number
afterExecution?: AfterExecutionHook<TPayload, TResult>
}

interface ConsumerEvents<TResult> {
Expand All @@ -27,6 +28,10 @@ interface ConsumerEvents<TResult> {

type ExtendedError = Error & { code?: string; toJSON?: () => Record<string, any> }

const noopAfterExecution = async <TPayload, TResult>(
_context: AfterExecutionContext<TPayload, TResult>
): Promise<void> => {}

/**
* Consumer handles processing jobs from the queue
*/
Expand All @@ -40,6 +45,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
#maxRetries: number
#resultTTL: number
#visibilityTimeout: number
#afterExecution: AfterExecutionHook<TPayload, TResult>

#handler: JobHandler<TPayload, TResult> | null = null
#running = false
Expand All @@ -58,6 +64,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
this.#maxRetries = config.maxRetries ?? 3
this.#resultTTL = config.resultTTL ?? 3600000
this.#visibilityTimeout = config.visibilityTimeout ?? 30000
this.#afterExecution = config.afterExecution ?? noopAfterExecution
}

/**
Expand Down Expand Up @@ -163,8 +170,8 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
*/
async #processJob (message: Buffer): Promise<void> {
const queueMessage = this.#deserializeMessage(message)
const { id, payload, attempts, maxAttempts } = queueMessage
const resultTTL = queueMessage.resultTTL ?? this.#resultTTL
const { id, payload, attempts, maxAttempts, createdAt } = queueMessage
const resolvedTTL = queueMessage.resultTTL ?? this.#resultTTL

// Check if job was cancelled (deleted from jobs hash)
const state = await this.#storage.getJobState(id)
Expand All @@ -185,16 +192,18 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
jobAbortController.abort()
}, this.#visibilityTimeout)

const currentAttempts = attempts + 1

// Update state to processing
const timestamp = Date.now()
await this.#storage.setJobState(id, `processing:${timestamp}:${this.#workerId}`)
const startedAt = Date.now()
await this.#storage.setJobState(id, `processing:${startedAt}:${this.#workerId}`)
await this.#storage.publishEvent(id, 'processing')

try {
const job: Job<TPayload> = {
id,
payload,
attempts: attempts + 1,
attempts: currentAttempts,
signal: jobAbortController.signal
}

Expand All @@ -203,17 +212,34 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
// Clear visibility timer
clearTimeout(visibilityTimer)

const finishedAt = Date.now()
const context = await this.#runAfterExecution({
id,
payload,
attempts: currentAttempts,
maxAttempts,
createdAt,
status: 'completed',
result,
ttl: resolvedTTL,
workerId: this.#workerId,
startedAt,
finishedAt,
durationMs: finishedAt - startedAt
})

const finalResult = context.result as TResult

// Complete the job
const serializedResult = this.#resultSerde.serialize(result)
await this.#storage.completeJob(id, message, this.#workerId, serializedResult, resultTTL)
const serializedResult = this.#resultSerde.serialize(finalResult)
await this.#storage.completeJob(id, message, this.#workerId, serializedResult, context.ttl)

this.emit('completed', id, result)
this.emit('completed', id, finalResult)
} catch (err) {
// Clear visibility timer
clearTimeout(visibilityTimer)

const error = err as ExtendedError
const currentAttempts = attempts + 1

if (currentAttempts < maxAttempts) {
// Retry - update message with incremented attempts
Expand All @@ -228,20 +254,37 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
this.emit('failing', id, error, currentAttempts)
} else {
// Max retries exceeded - fail the job
const maxRetriesError = new MaxRetriesError(id, currentAttempts, error)
const finishedAt = Date.now()
const context = await this.#runAfterExecution({
id,
payload,
attempts: currentAttempts,
maxAttempts,
createdAt,
status: 'failed',
error,
ttl: resolvedTTL,
workerId: this.#workerId,
startedAt,
finishedAt,
durationMs: finishedAt - startedAt
})

const finalError = (context.error as ExtendedError) ?? error
const maxRetriesError = new MaxRetriesError(id, currentAttempts, finalError)
const serializedError = Buffer.from(
JSON.stringify(
typeof error.toJSON === 'function'
? error.toJSON()
typeof finalError.toJSON === 'function'
? finalError.toJSON()
: {
message: error.message,
code: error.code,
stack: error.stack
message: finalError.message,
code: finalError.code,
stack: finalError.stack
}
)
)

await this.#storage.failJob(id, message, this.#workerId, serializedError, resultTTL)
await this.#storage.failJob(id, message, this.#workerId, serializedError, context.ttl)

this.emit('failed', id, maxRetriesError)
}
Expand All @@ -251,6 +294,26 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
}
}

async #runAfterExecution (
context: AfterExecutionContext<TPayload, TResult>
): Promise<AfterExecutionContext<TPayload, TResult>> {
const originalTTL = context.ttl

try {
await this.#afterExecution(context)
} catch (err) {
this.emit('error', err as Error)
context.ttl = originalTTL
}

if (!Number.isFinite(context.ttl) || !Number.isInteger(context.ttl) || context.ttl <= 0) {
this.emit('error', new TypeError('resultTTL must be a positive integer in milliseconds'))
context.ttl = originalTTL
}

return context
}

/**
* Execute the job handler (supports both async and callback styles)
*/
Expand Down
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ export type {
QueueMessage,
MessageState,
MessageStatus,
UpdateResultTTLResult,
EnqueueOptions,
EnqueueAndWaitOptions,
EnqueueResult,
CancelResult,
Job,
AfterExecutionContext,
AfterExecutionHook,
JobHandler,
QueueConfig,
QueueEvents
Expand Down
37 changes: 36 additions & 1 deletion src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import type {
EnqueueResult,
MessageStatus,
QueueMessage,
SerializedError
SerializedError,
UpdateResultTTLResult
} from './types.ts'
import { parseState } from './utils/state.ts'

Expand Down Expand Up @@ -172,6 +173,40 @@ export class Producer<TPayload, TResult> {
return this.#resultSerde.deserialize(resultBuffer)
}

/**
* Update TTL for a terminal job payload (result for completed jobs, error for failed jobs).
*/
async updateResultTTL (id: string, ttlMs: number): Promise<UpdateResultTTLResult> {
this.#validateResultTTL(ttlMs)

const state = await this.#storage.getJobState(id)
if (!state) {
return { status: 'not_found' }
}

const { status } = parseState(state)

if (status !== 'completed' && status !== 'failed') {
return { status: 'not_terminal' }
}

if (status === 'completed') {
const existingResult = await this.#storage.getResult(id)
if (!existingResult) {
return { status: 'missing_payload' }
}
await this.#storage.setResult(id, existingResult, ttlMs)
return { status: 'updated' }
}

const existingError = await this.#storage.getError(id)
if (!existingError) {
return { status: 'missing_payload' }
}
await this.#storage.setError(id, existingError, ttlMs)
return { status: 'updated' }
}

/**
* Get the status of a job
*/
Expand Down
16 changes: 14 additions & 2 deletions src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import type {
EnqueueResult,
CancelResult,
MessageStatus,
UpdateResultTTLResult,
JobHandler,
QueueEvents
QueueEvents,
AfterExecutionHook
} from './types.ts'
import { Producer } from './producer.ts'
import { Consumer } from './consumer.ts'
Expand All @@ -34,6 +36,7 @@ export class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TR
#maxRetries: number
#resultTTL: number
#visibilityTimeout: number
#afterExecution: AfterExecutionHook<TPayload, TResult> | undefined

constructor (config: QueueConfig<TPayload, TResult>) {
super()
Expand All @@ -47,6 +50,7 @@ export class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TR
this.#maxRetries = config.maxRetries ?? 3
this.#resultTTL = config.resultTTL ?? 3600000
this.#visibilityTimeout = config.visibilityTimeout ?? 30000
this.#afterExecution = config.afterExecution

this.#producer = new Producer<TPayload, TResult>({
storage: this.#storage,
Expand Down Expand Up @@ -138,6 +142,13 @@ export class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TR
return this.#producer.getResult(id)
}

/**
* Update TTL for a terminal job payload (result or error).
*/
async updateResultTTL (id: string, ttlMs: number): Promise<UpdateResultTTLResult> {
return this.#producer.updateResultTTL(id, ttlMs)
}

/**
* Get the status of a job
*/
Expand All @@ -157,7 +168,8 @@ export class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TR
blockTimeout: this.#blockTimeout,
maxRetries: this.#maxRetries,
resultTTL: this.#resultTTL,
visibilityTimeout: this.#visibilityTimeout
visibilityTimeout: this.#visibilityTimeout,
afterExecution: this.#afterExecution
})

// Forward consumer events
Expand Down
Loading