Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions src/storage/redis.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { type Redis } from 'iovalkey'
import { EventEmitter } from 'node:events'
import { readFileSync } from 'node:fs'
import { dirname, join } from 'node:path'
import { fileURLToPath } from 'node:url'
import { join } from 'node:path'
import type { Storage } from './types.ts'
import { loadOptionalDependency } from './utils.ts'

const __dirname = dirname(fileURLToPath(import.meta.url))
const EXPIRING_VALUE_HEADER_SIZE = 8 // First 8 bytes are the expireAt timestamp in milliseconds (Uint64 BE)

interface RedisStorageConfig {
url?: string
Expand Down Expand Up @@ -75,6 +74,29 @@ export class RedisStorage implements Storage {
return this.#key('workers')
}

#encodeExpiringValue (value: Buffer, ttlMs: number): Buffer {
const buffer = Buffer.allocUnsafe(EXPIRING_VALUE_HEADER_SIZE + value.length)
buffer.writeBigInt64BE(BigInt(Date.now() + ttlMs))
value.copy(buffer, EXPIRING_VALUE_HEADER_SIZE)
return buffer
}

#decodeExpiringValue (value: Buffer): { payload: Buffer; expiresAt: number } | null {
if (value.length < EXPIRING_VALUE_HEADER_SIZE) {
return null
}

const expiresAt = Number(value.readBigInt64BE(0))
if (!Number.isFinite(expiresAt) || expiresAt <= 0) {
return null
}

return {
payload: value.subarray(EXPIRING_VALUE_HEADER_SIZE),
expiresAt
}
}

async connect (): Promise<void> {
if (this.#client) return

Expand Down Expand Up @@ -119,7 +141,7 @@ export class RedisStorage implements Storage {
}

async #loadScripts (): Promise<void> {
const scriptsDir = join(__dirname, '..', '..', 'redis-scripts')
const scriptsDir = join(import.meta.dirname, '..', '..', 'redis-scripts')

const enqueueScript = readFileSync(join(scriptsDir, 'enqueue.lua'), 'utf8')
const completeScript = readFileSync(join(scriptsDir, 'complete.lua'), 'utf8')
Expand Down Expand Up @@ -240,23 +262,51 @@ export class RedisStorage implements Storage {
}

async setResult (id: string, result: Buffer, ttlMs: number): Promise<void> {
await this.#client!.hset(this.#resultsKey(), id, result)
// Note: HEXPIRE is not widely supported, so we set TTL on the whole hash
// For production, consider using separate keys per result
await this.#client!.hset(this.#resultsKey(), id, this.#encodeExpiringValue(result, ttlMs))
}

async getResult (id: string): Promise<Buffer | null> {
const result = await this.#client!.hgetBuffer(this.#resultsKey(), id)
return result
if (!result) {
return null
}

const decoded = this.#decodeExpiringValue(result)
if (!decoded) {
// Backward compatibility for legacy entries stored without envelope
return result
}

if (Date.now() > decoded.expiresAt) {
await this.#client!.hdel(this.#resultsKey(), id)
return null
}

return decoded.payload
}

async setError (id: string, error: Buffer, ttlMs: number): Promise<void> {
await this.#client!.hset(this.#errorsKey(), id, error)
await this.#client!.hset(this.#errorsKey(), id, this.#encodeExpiringValue(error, ttlMs))
}

async getError (id: string): Promise<Buffer | null> {
const result = await this.#client!.hgetBuffer(this.#errorsKey(), id)
return result
if (!result) {
return null
}

const decoded = this.#decodeExpiringValue(result)
if (!decoded) {
// Backward compatibility for legacy entries stored without envelope
return result
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be handled in a more resilient way. This can happen but it won’t be legacy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fair. How do you want to handle when it fails? Throw an error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think we can. I’d say to log it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we're still lacking #14.
Do we want to revisit this later?


if (Date.now() > decoded.expiresAt) {
await this.#client!.hdel(this.#errorsKey(), id)
return null
}

return decoded.payload
}

async registerWorker (workerId: string, ttlMs: number): Promise<void> {
Expand Down Expand Up @@ -328,6 +378,7 @@ export class RedisStorage implements Storage {
async completeJob (id: string, message: Buffer, workerId: string, result: Buffer, resultTTL: number): Promise<void> {
const timestamp = Date.now()
const state = `completed:${timestamp}`
const encodedResult = this.#encodeExpiringValue(result, resultTTL)

await this.#client!.evalsha(
this.#scriptSHAs!.complete,
Expand All @@ -338,7 +389,7 @@ export class RedisStorage implements Storage {
id,
message,
state,
result,
encodedResult,
resultTTL.toString()
)

Expand All @@ -350,6 +401,7 @@ export class RedisStorage implements Storage {
async failJob (id: string, message: Buffer, workerId: string, error: Buffer, errorTTL: number): Promise<void> {
const timestamp = Date.now()
const state = `failed:${timestamp}`
const encodedError = this.#encodeExpiringValue(error, errorTTL)

await this.#client!.evalsha(
this.#scriptSHAs!.fail,
Expand All @@ -360,7 +412,7 @@ export class RedisStorage implements Storage {
id,
message,
state,
error,
encodedError,
errorTTL.toString()
)

Expand Down
20 changes: 20 additions & 0 deletions test/redis-storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ describe('RedisStorage', () => {
const result = await storage.getResult('non-existent')
assert.strictEqual(result, null)
})

it('should expire results per job without affecting other entries', async () => {
await storage.setResult('job-short', Buffer.from('short'), 20)
await storage.setResult('job-long', Buffer.from('long'), 1000)

await sleep(30)

assert.strictEqual(await storage.getResult('job-short'), null)
assert.deepStrictEqual(await storage.getResult('job-long'), Buffer.from('long'))
})
})

describe('errors', () => {
Expand All @@ -199,6 +209,16 @@ describe('RedisStorage', () => {
const error = await storage.getError('non-existent')
assert.strictEqual(error, null)
})

it('should expire errors per job without affecting other entries', async () => {
await storage.setError('job-short', Buffer.from('short-error'), 20)
await storage.setError('job-long', Buffer.from('long-error'), 1000)

await sleep(30)

assert.strictEqual(await storage.getError('job-short'), null)
assert.deepStrictEqual(await storage.getError('job-long'), Buffer.from('long-error'))
})
})

describe('workers', () => {
Expand Down