Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 91 additions & 19 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ Examples of non-idempotent operations (require external safeguards):
2. **Deduplication**: Prevent duplicate job processing using message IDs
3. **Item Removal**: Cancel pending jobs while maintaining deduplication integrity
4. **Result Cache**: Retrieve processed job results within a configurable TTL
5. **Type Stripping**: Native Node.js TypeScript execution (22.6+)
6. **Flexible Modes**: Producer, consumer, or combined operation
5. **Per-Job TTL Override**: Allow callers to override result/error TTL for specific jobs
6. **Type Stripping**: Native Node.js TypeScript execution (22.6+)
7. **Flexible Modes**: Producer, consumer, or combined operation

## Invocation Modes

Expand Down Expand Up @@ -80,6 +81,18 @@ The request/response mode is optimized for low latency:
3. **Cached results**: If the job was already completed, the cached result is returned immediately without waiting.
4. **Subscribe-first**: Subscription happens before enqueue to eliminate race conditions.

### Per-Job TTL Override

By default, results and errors use `QueueConfig.resultTTL`. Some jobs need different retention (for example, very large results should expire quickly, while expensive jobs may need longer caching).

Proposed behavior:

- Add optional `resultTTL` to `enqueue()` and `enqueueAndWait()` options.
- Persist this TTL inside the queued message (`resultTTL`) so workers can apply it when storing success/error outcomes.
- Resolve TTL at enqueue time: `options.resultTTL ?? QueueConfig.resultTTL`, then persist it in the message.
- Validate TTL at enqueue time (`> 0` and finite integer).
- **Deduplication rule**: for duplicates, the first accepted enqueue defines TTL; later duplicates do not mutate it.

## Architecture

### Redis Data Structures
Expand All @@ -101,6 +114,7 @@ interface QueueMessage<T> {
createdAt: number; // Unix timestamp ms
attempts: number; // Retry count
maxAttempts: number; // Maximum retry attempts
resultTTL?: number; // Resolved TTL (producer default or override); optional for backward compatibility
correlationId?: string; // For request/response pattern
}
```
Expand Down Expand Up @@ -318,7 +332,7 @@ interface Storage {
message: Buffer,
workerId: string,
result: Buffer,
resultTtlMs: number
resultTTL: number
): Promise<void>;

/**
Expand All @@ -333,7 +347,7 @@ interface Storage {
message: Buffer,
workerId: string,
error: Buffer,
errorTtlMs: number
errorTTL: number
): Promise<void>;

/**
Expand Down Expand Up @@ -547,16 +561,51 @@ async notifyJobComplete(id: string, status: 'completed' | 'failed' | 'failing'):
}
```

**Result/Error Storage with Per-Job TTL:**
```typescript
async setResult(id: string, result: Buffer, ttlMs: number): Promise<void> {
const resultPath = this.resultPath(id);
const metaPath = `${resultPath}.meta.json`;

await writeFileAtomic.promise(resultPath, result);
await writeFileAtomic.promise(
metaPath,
Buffer.from(JSON.stringify({ expiresAt: Date.now() + ttlMs }))
);
}

async setError(id: string, error: Buffer, ttlMs: number): Promise<void> {
const errorPath = this.errorPath(id);
const metaPath = `${errorPath}.meta.json`;

await writeFileAtomic.promise(errorPath, error);
await writeFileAtomic.promise(
metaPath,
Buffer.from(JSON.stringify({ expiresAt: Date.now() + ttlMs }))
);
}
```

**TTL Cleanup (Background):**
```typescript
private async cleanupExpired(): Promise<void> {
// Results
for (const file of await fs.readdir(this.resultsDir)) {
const stat = await fs.stat(path.join(this.resultsDir, file));
if (Date.now() - stat.mtimeMs > this.resultTTL) {
await fs.unlink(path.join(this.resultsDir, file));
const cleanupByMeta = async (dir: string, extension: string) => {
for (const file of await fs.readdir(dir)) {
if (!file.endsWith(extension)) continue;

const filePath = path.join(dir, file);
const metaPath = `${filePath}.meta.json`;
const meta = JSON.parse(await fs.readFile(metaPath, 'utf8')) as { expiresAt: number };

if (Date.now() >= meta.expiresAt) {
await fs.unlink(filePath).catch(() => {});
await fs.unlink(metaPath).catch(() => {});
}
}
}
};

await cleanupByMeta(this.resultsDir, '.result');
await cleanupByMeta(this.errorsDir, '.error');

// Worker heartbeats
for (const file of await fs.readdir(this.workersDir)) {
Expand Down Expand Up @@ -654,7 +703,7 @@ interface QueueConfig<TPayload, TResult> {
processingQueueTTL?: number; // TTL for processing queue keys in ms (default: 604800000 = 7 days)

// Result cache options
resultTTL?: number; // TTL for stored results and errors in ms (default: 3600000 = 1 hour)
resultTTL?: number; // Default TTL for stored results and errors in ms (default: 3600000 = 1 hour)
}
```

Expand Down Expand Up @@ -739,10 +788,11 @@ class Reaper<TPayload> extends EventEmitter {
```typescript
interface EnqueueOptions {
maxAttempts?: number;
resultTTL?: number; // Per-job result/error TTL override in ms
}

interface EnqueueAndWaitOptions extends EnqueueOptions {
timeout?: number; // Max wait time in ms (default: 30000)
timeout?: number; // Max wait time in ms (default: 30000)
}

type EnqueueResult =
Expand Down Expand Up @@ -823,8 +873,10 @@ closeWithGrace({ delay: 10000 }, async () => {

### Enqueue Flow

`enqueue()` resolves and serializes `resultTTL` into the message (`options.resultTTL ?? config.resultTTL`).

```
enqueue(id, payload)
enqueue(id, payload, { resultTTL? })
┌─────────────────────────────────┐
Expand Down Expand Up @@ -853,6 +905,22 @@ enqueue(id, payload)
Return { status: 'queued' }
```

### TTL Resolution

At processing time, worker resolves TTL as:

```typescript
const ttlMs = message.resultTTL ?? config.resultTTL; // fallback for backward compatibility
```

This `ttlMs` is used for both `completeJob(..., resultTTL)` and `failJob(..., errorTTL)` so success and terminal failure follow the same retention policy. New messages always carry `resultTTL`, so producer defaults are preserved even with separate producer/consumer instances.

Validation happens at enqueue time:

- `resultTTL` must be an integer number of milliseconds
- `resultTTL > 0`
- Reject invalid values with a validation error

### Consumer Flow

```
Expand Down Expand Up @@ -895,10 +963,11 @@ start()
│ │
│ ▼
│ ┌─────────────────────────────────┐
│ │ MULTI │
│ │ SET {prefix}:results:{id} │
│ │ {result} EX {resultTTL} │
│ │ HSET {prefix}:jobs {id} │
│ │ MULTI │
│ │ SET {prefix}:results:{id} │
│ │ {result} EX {message.resultTTL ?? │
│ │ config.resultTTL} │
│ │ HSET {prefix}:jobs {id} │
│ │ "completed:{timestamp}" │
│ │ LREM {prefix}:processing: │
│ │ {workerId} 1 {msg} │
Expand Down Expand Up @@ -935,7 +1004,7 @@ start()
Optimized for minimal latency — pure pub/sub, no polling.

```
enqueueAndWait(id, payload, { timeout })
enqueueAndWait(id, payload, { timeout, resultTTL? })
┌─────────────────────────────────────────────┐
Expand Down Expand Up @@ -1350,7 +1419,10 @@ const queue = new Queue<ImageJob, ImageResult>({
const result = await queue.enqueueAndWait(
'img-abc123',
{ url: 'https://example.com/photo.jpg', width: 200, height: 200 },
{ timeout: 60000 }
{
timeout: 60000,
resultTTL: 24 * 60 * 60 * 1000, // Keep expensive result for 24h
}
);
console.log('Thumbnail:', result.thumbnailUrl);

Expand Down
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,15 @@ await queue.start()
// Enqueue jobs
await queue.enqueue('email-1', { email: 'user@example.com' })

// Optional per-job TTL override for cached result/error
await queue.enqueue('email-ttl', { email: 'ttl@example.com' }, {
resultTTL: 5 * 60 * 1000 // 5 minutes
})

// Or wait for the result
const result = await queue.enqueueAndWait('email-2', { email: 'another@example.com' }, {
timeout: 30000
timeout: 30000,
resultTTL: 24 * 60 * 60 * 1000 // keep this result for 24h
})
console.log('Result:', result) // { sent: true }

Expand Down Expand Up @@ -109,7 +115,10 @@ queue.execute(async (job) => {
Enqueue a job (fire-and-forget).

```typescript
const result = await queue.enqueue('job-123', { data: 'value' })
const result = await queue.enqueue('job-123', { data: 'value' }, {
maxAttempts: 5,
resultTTL: 60_000 // optional per-job TTL override (ms)
})

// result.status can be:
// - 'queued': Job was added to the queue
Expand All @@ -124,12 +133,18 @@ Enqueue a job and wait for the result.
```typescript
const result = await queue.enqueueAndWait('job-123', payload, {
timeout: 30000, // Timeout in milliseconds
maxAttempts: 5 // Override default max retries
maxAttempts: 5, // Override default max retries
resultTTL: 300000 // Optional per-job TTL override (ms)
})
```

Throws `TimeoutError` if the job doesn't complete within the timeout.

`resultTTL` behavior:
- If provided in `enqueue()` / `enqueueAndWait()`, it overrides the queue default for that job.
- If omitted, the producer uses the queue default `resultTTL` at enqueue time.
- For duplicate IDs, the first accepted enqueue defines the TTL for that job.

##### `queue.cancel(id): Promise<CancelResult>`

Cancel a queued job.
Expand Down
5 changes: 3 additions & 2 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
async #processJob (message: Buffer): Promise<void> {
const queueMessage = this.#deserializeMessage(message)
const { id, payload, attempts, maxAttempts } = queueMessage
const resultTTL = queueMessage.resultTTL ?? this.#resultTTL

// Check if job was cancelled (deleted from jobs hash)
const state = await this.#storage.getJobState(id)
Expand Down Expand Up @@ -204,7 +205,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe

// Complete the job
const serializedResult = this.#resultSerde.serialize(result)
await this.#storage.completeJob(id, message, this.#workerId, serializedResult, this.#resultTTL)
await this.#storage.completeJob(id, message, this.#workerId, serializedResult, resultTTL)

this.emit('completed', id, result)
} catch (err) {
Expand Down Expand Up @@ -238,7 +239,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
}
))

await this.#storage.failJob(id, message, this.#workerId, serializedError, this.#resultTTL)
await this.#storage.failJob(id, message, this.#workerId, serializedError, resultTTL)

this.emit('failed', id, maxRetriesError)
}
Expand Down
24 changes: 17 additions & 7 deletions src/producer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import type { Storage } from './storage/types.ts'
import { JobFailedError, TimeoutError } from './errors.ts'
import type { Serde } from './serde/index.ts'
import { createJsonSerde } from './serde/index.ts'
import type { Storage } from './storage/types.ts'
import type {
QueueMessage,
EnqueueOptions,
CancelResult,
EnqueueAndWaitOptions,
EnqueueOptions,
EnqueueResult,
CancelResult,
MessageStatus,
QueueMessage,
SerializedError
} from './types.ts'
import { TimeoutError, JobFailedError } from './errors.ts'
import { createJsonSerde } from './serde/index.ts'
import { parseState } from './utils/state.ts'

interface ProducerConfig<TPayload, TResult> {
Expand Down Expand Up @@ -49,13 +49,17 @@ export class Producer<TPayload, TResult> {
): Promise<EnqueueResult<TResult>> {
const timestamp = Date.now()
const maxAttempts = options?.maxAttempts ?? this.#maxRetries
const resultTTL = options?.resultTTL ?? this.#resultTTL

this.#validateResultTTL(resultTTL)

const message: QueueMessage<TPayload> = {
id,
payload,
createdAt: timestamp,
attempts: 0,
maxAttempts
maxAttempts,
resultTTL
}

const serialized = this.#payloadSerde.serialize(message as unknown as TPayload)
Expand Down Expand Up @@ -213,4 +217,10 @@ export class Producer<TPayload, TResult> {

return messageStatus
}

#validateResultTTL (resultTTL: number): void {
if (!Number.isFinite(resultTTL) || !Number.isInteger(resultTTL) || resultTTL <= 0) {
throw new TypeError('resultTTL must be a positive integer in milliseconds')
}
}
}
8 changes: 4 additions & 4 deletions src/storage/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -611,15 +611,15 @@ export class FileStorage implements Storage {
message: Buffer,
workerId: string,
result: Buffer,
resultTtlMs: number
resultTTL: number
): Promise<void> {
const timestamp = Date.now()

// Set state to completed
await this.setJobState(id, `completed:${timestamp}`)

// Store result
await this.setResult(id, result, resultTtlMs)
await this.setResult(id, result, resultTTL)

// Remove from processing queue
await this.ack(id, message, workerId)
Expand All @@ -636,15 +636,15 @@ export class FileStorage implements Storage {
message: Buffer,
workerId: string,
error: Buffer,
errorTtlMs: number
errorTTL: number
): Promise<void> {
const timestamp = Date.now()

// Set state to failed
await this.setJobState(id, `failed:${timestamp}`)

// Store error
await this.setError(id, error, errorTtlMs)
await this.setError(id, error, errorTTL)

// Remove from processing queue
await this.ack(id, message, workerId)
Expand Down
Loading