Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 33 additions & 31 deletions benchmark/request-response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ ${'─'.repeat(50)}

// Consumer worker code
async function runConsumer (): Promise<void> {
const { redisUrl, keyPrefix } = workerData as { redisUrl: string, keyPrefix: string }
const { redisUrl, keyPrefix } = workerData as { redisUrl: string; keyPrefix: string }

const storage = new RedisStorage({
url: redisUrl,
Expand All @@ -93,12 +93,12 @@ async function runConsumer (): Promise<void> {
concurrency: 100
})

queue.execute(async (job) => {
queue.execute(async job => {
const latency = Date.now() - job.payload.timestamp
return { processed: true, latency }
})

queue.on('error', (err) => {
queue.on('error', err => {
console.error('Consumer error:', err)
})

Expand All @@ -108,7 +108,7 @@ async function runConsumer (): Promise<void> {
parentPort!.postMessage({ type: 'ready' })

// Wait for done signal
await new Promise<void>((resolve) => {
await new Promise<void>(resolve => {
parentPort!.on('message', async (msg: string) => {
if (msg === 'stop') {
await queue.stop()
Expand All @@ -121,11 +121,7 @@ async function runConsumer (): Promise<void> {
}

// Main thread code
async function runBenchmark (
redisUrl: string,
keyPrefix: string,
config: BenchmarkConfig
): Promise<Stats> {
async function runBenchmark (redisUrl: string, keyPrefix: string, config: BenchmarkConfig): Promise<Stats> {
const { requests, concurrency, payloadSize } = config
const payload = 'x'.repeat(payloadSize)
const latencies: number[] = []
Expand All @@ -152,26 +148,32 @@ async function runBenchmark (
const workers: Promise<void>[] = []

for (let i = 0; i < concurrency; i++) {
workers.push((async () => {
while (completed < requests) {
const id = `${config.name}-${jobId++}`
if (jobId > requests) break

const requestStart = performance.now()
try {
await producer.enqueueAndWait(id, {
data: payload,
timestamp: Date.now()
}, { timeout: 30000 })

const latency = performance.now() - requestStart
latencies.push(latency)
completed++
} catch (err) {
console.error(`Request ${id} failed:`, err)
workers.push(
(async () => {
while (completed < requests) {
const id = `${config.name}-${jobId++}`
if (jobId > requests) break

const requestStart = performance.now()
try {
await producer.enqueueAndWait(
id,
{
data: payload,
timestamp: Date.now()
},
{ timeout: 30000 }
)

const latency = performance.now() - requestStart
latencies.push(latency)
completed++
} catch (err) {
console.error(`Request ${id} failed:`, err)
}
}
}
})())
})()
)
}

await Promise.all(workers)
Expand Down Expand Up @@ -257,7 +259,7 @@ Redis URL: ${redisUrl}

// Stop consumer worker
consumerWorker.postMessage('stop')
await new Promise<void>((resolve) => {
await new Promise<void>(resolve => {
consumerWorker.on('message', (msg: WorkerMessage) => {
if (msg.type === 'done') resolve()
})
Expand All @@ -275,12 +277,12 @@ Redis URL: ${redisUrl}

// Entry point
if (isMainThread) {
main().catch((err) => {
main().catch(err => {
console.error('Benchmark failed:', err)
process.exit(1)
})
} else {
runConsumer().catch((err) => {
runConsumer().catch(err => {
parentPort!.postMessage({ type: 'error', error: err.message })
})
}
22 changes: 18 additions & 4 deletions eslint.config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
import eslintPluginPrettier from 'eslint-plugin-prettier'
import { globalIgnores } from 'eslint/config'
import neostandard from 'neostandard'

export default neostandard({
ts: true,
ignores: ['dist/**']
})
const eslint = [
...neostandard({ ts: true }),
globalIgnores(['dist/',]),
{
files: ['**/*.ts'],
rules: {
'@typescript-eslint/consistent-type-imports': ['error', { fixStyle: 'inline-type-imports' }],
'prettier/prettier': 'error'
},
plugins: {
prettier: eslintPluginPrettier
}
}
]

export default eslint
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@
"c8": "^10.1.3",
"cleaner-spec-reporter": "^1.0.3",
"eslint": "^9.0.0",
"eslint-plugin-prettier": "^5.5.5",
"neostandard": "^0.12.0",
"prettier-plugin-space-before-function-paren": "^0.0.10",
"semver": "^7.7.4",
"typescript": "^5.7.0"
},
"engines": {
"node": ">=22.19.0"
}
}
}
70 changes: 70 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions prettier.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export default {
printWidth: 120,
semi: false,
singleQuote: true,
bracketSpacing: true,
trailingComma: 'none',
arrowParens: 'avoid',
plugins: ['prettier-plugin-space-before-function-paren']
}
38 changes: 20 additions & 18 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
}

// Don't await - let them run in background
Promise.all(loops).catch((err) => {
Promise.all(loops).catch(err => {
this.emit('error', err)
})
}
Expand All @@ -111,7 +111,7 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
const maxWait = this.#visibilityTimeout
const startTime = Date.now()

while (this.#activeJobs > 0 && (Date.now() - startTime) < maxWait) {
while (this.#activeJobs > 0 && Date.now() - startTime < maxWait) {
await new Promise(resolve => setTimeout(resolve, 100))
}

Expand Down Expand Up @@ -229,15 +229,17 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
} else {
// Max retries exceeded - fail the job
const maxRetriesError = new MaxRetriesError(id, currentAttempts, error)
const serializedError = Buffer.from(JSON.stringify(
typeof error.toJSON === 'function'
? error.toJSON()
: {
message: error.message,
code: error.code,
stack: error.stack
}
))
const serializedError = Buffer.from(
JSON.stringify(
typeof error.toJSON === 'function'
? error.toJSON()
: {
message: error.message,
code: error.code,
stack: error.stack
}
)
)

await this.#storage.failJob(id, message, this.#workerId, serializedError, resultTTL)

Expand All @@ -258,13 +260,13 @@ export class Consumer<TPayload, TResult> extends EventEmitter<ConsumerEvents<TRe
// Check if callback style (handler.length > 1)
if (handler.length > 1) {
return new Promise<TResult>((resolve, reject) => {
(handler as (job: Job<TPayload>, callback: (err: Error | null, result?: TResult) => void) => void)(
job,
(err, result) => {
if (err) reject(err)
else resolve(result as TResult)
}
)
;(handler as (job: Job<TPayload>, callback: (err: Error | null, result?: TResult) => void) => void)(job, (
err,
result
) => {
if (err) reject(err)
else resolve(result as TResult)
})
})
}

Expand Down
Loading