Documentation Index
Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt
Use this file to discover all available pages before exploring further.
The Reaper is a monitoring component that detects and recovers stalled jobs. A job becomes “stalled” when a worker crashes or hangs while processing it. The Reaper automatically requeues stalled jobs so they can be processed by healthy workers.
What is a Stalled Job?
A stalled job is one that has been in the “processing” state longer than the visibilityTimeout:
// Worker starts processing job at 12:00:00
// visibilityTimeout = 30000 (30 seconds)
// Worker crashes at 12:00:15 (still processing)
// At 12:00:30, the Reaper detects the job is stalled
// Job is requeued and picked up by another worker
Why jobs stall:
- Worker process crashes (OOM, unhandled exception)
- Worker server dies (hardware failure, power loss)
- Worker hangs (infinite loop, deadlock)
- Network partition (worker loses connection to storage)
How the Reaper Works
The Reaper uses an event-based monitoring approach with per-job timers:
1. Subscribe to Job Events
On startup, the Reaper subscribes to job state change events:
const reaper = new Reaper({ storage, visibilityTimeout: 30000 })
await reaper.start()
// Internally subscribes to all job events
const unsubscribe = await storage.subscribeToEvents((id, event) => {
if (event === 'processing') {
reaper.#startTimer(id) // Start visibility timeout timer
} else if (event === 'completed' || event === 'failed') {
reaper.#cancelTimer(id) // Cancel timer - job finished
}
})
2. Set Per-Job Timers
When a job transitions to “processing”, the Reaper starts a timer:
Implementation (from src/reaper.ts:295):
#startTimer(id: string): void {
this.#cancelTimer(id) // Cancel any existing timer
const timer = setTimeout(() => {
this.#processingTimers.delete(id)
this.#checkJob(id).catch(err => {
this.#emitError(err, 'Failed checking job after visibility timer.')
})
}, this.#visibilityTimeout)
this.#processingTimers.set(id, timer)
}
3. Check and Recover Stalled Jobs
When the timer fires, the Reaper checks if the job is truly stalled:
Implementation (from src/reaper.ts:323):
async #checkJob(id: string): Promise<void> {
const state = await this.#storage.getJobState(id)
if (!state) return
const { status, timestamp, workerId } = parseState(state)
if (status !== 'processing') {
// Job already finished, nothing to do
return
}
// Check if visibility timeout has elapsed
const elapsed = Date.now() - timestamp
if (elapsed < this.#visibilityTimeout) {
// Not yet stalled, restart timer for remaining time
const remaining = this.#visibilityTimeout - elapsed
setTimeout(() => this.#checkJob(id), remaining)
return
}
// Job is stalled - recover it
await this.#recoverStalledJob(id, workerId)
}
4. Requeue Stalled Jobs
Recovery involves finding the job in the worker’s processing queue and requeueing it:
Implementation (from src/reaper.ts:359):
async #recoverStalledJob(id: string, workerId?: string): Promise<void> {
// Get the job from the worker's processing queue
const processingJobs = await this.#storage.getProcessingJobs(workerId)
// Find the message for this job
let jobMessage: Buffer | null = null
for (const message of processingJobs) {
const queueMessage = this.#payloadSerde.deserialize(message)
if (queueMessage.id === id) {
jobMessage = message
break
}
}
if (!jobMessage) return // Already processed
// Requeue the job
await this.#storage.requeue(id, jobMessage, workerId)
// Update state to reflect retry
const newState = `failing:${Date.now()}:${queueMessage.attempts + 1}`
await this.#storage.setJobState(id, newState)
this.emit('stalled', id)
}
Visibility Timeout
The visibilityTimeout determines how long a job can be “in-flight” before it’s considered stalled:
const queue = new Queue({
storage,
visibilityTimeout: 30000 // 30 seconds
})
const reaper = new Reaper({
storage,
visibilityTimeout: 30000 // MUST match the queue's timeout
})
The Reaper’s visibilityTimeout MUST match the Queue’s visibilityTimeout. If they differ, jobs may be recovered prematurely or too late.
Choosing a Timeout Value
Consider your typical job duration:
// Fast jobs (API calls, cache lookups): 10-30 seconds
const reaper = new Reaper({ storage, visibilityTimeout: 30000 })
// Medium jobs (database queries, file processing): 1-5 minutes
const reaper = new Reaper({ storage, visibilityTimeout: 120000 })
// Slow jobs (report generation, video encoding): 5-30 minutes
const reaper = new Reaper({ storage, visibilityTimeout: 600000 })
Set the timeout to ~2x your P95 job duration. This gives jobs enough time to complete while detecting stalls quickly.
When to Run the Reaper
Single Worker (No Reaper Needed)
If you have a single worker, you don’t need the Reaper:
const queue = new Queue({ storage })
queue.execute(async (job) => { /* ... */ })
await queue.start()
// No reaper needed - if this process crashes,
// jobs will be reprocessed on restart
Multiple Workers (Reaper Recommended)
With multiple workers, run a Reaper to handle worker crashes:
// worker-1.ts
const queue1 = new Queue({ storage, workerId: 'worker-1' })
queue1.execute(handler)
await queue1.start()
// worker-2.ts
const queue2 = new Queue({ storage, workerId: 'worker-2' })
queue2.execute(handler)
await queue2.start()
// reaper.ts (separate process)
const reaper = new Reaper({ storage, visibilityTimeout: 30000 })
await reaper.start()
Embedded Reaper
You can also run the Reaper in the same process as a worker:
const queue = new Queue({ storage, workerId: `worker-${process.pid}` })
const reaper = new Reaper({ storage, visibilityTimeout: 30000 })
queue.execute(handler)
await Promise.all([
queue.start(),
reaper.start()
])
process.on('SIGTERM', async () => {
await Promise.all([
queue.stop(),
reaper.stop()
])
process.exit(0)
})
Leader Election for High Availability
Run multiple Reaper instances with leader election enabled. Only one Reaper is active at a time:
const reaper = new Reaper({
storage,
visibilityTimeout: 30000,
leaderElection: {
enabled: true,
lockTTL: 30000, // Lock expires after 30s
renewalInterval: 10000, // Leader renews every 10s
acquireRetryInterval: 5000 // Followers retry every 5s
}
})
reaper.on('leadershipAcquired', () => {
console.log(`Reaper ${reaper.reaperId} is now the leader`)
})
reaper.on('leadershipLost', () => {
console.log(`Reaper ${reaper.reaperId} lost leadership`)
})
await reaper.start()
Leader Election Configuration
Enable leader election. When disabled, all Reaper instances are active (use for single Reaper deployments).
Lock expiry time in milliseconds. If the leader crashes, the lock expires and a follower takes over.
How often the leader renews the lock (should be ~1/3 of lockTTL).
How often followers attempt to acquire leadership.
Leader Election Flow
Leader lifecycle:
1. Reaper starts → Try to acquire lock
2. If acquired → Become leader
- Subscribe to job events
- Start monitoring timers
- Renew lock every renewalInterval
3. If renewal fails → Lose leadership
- Unsubscribe from events
- Clear all timers
- Switch to follower mode
4. Follower → Retry acquisition every acquireRetryInterval
Implementation (from src/reaper.ts:194):
if (this.#isLeader) {
// Renew the lock
const renewed = await this.#tryRenewLock(lockTTL)
if (!renewed) {
// Lost leadership - transition to follower
await this.#transitionToFollower()
}
} else {
// Try to acquire lock
const acquired = await this.#tryAcquireLock(lockTTL)
if (acquired) {
// Became leader - transition to leader
await this.#transitionToLeader()
}
}
Storage Backend Support
RedisStorage - Full support using SET NX PX pattern:
// Acquire: SET key value NX PX ttl
await redis.set('jq:reaper:lock', reaperId, 'NX', 'PX', 30000)
// Renew: Lua script to check owner and extend TTL
local current = redis.call('GET', KEYS[1])
if current == ARGV[1] then
redis.call('PEXPIRE', KEYS[1], ARGV[2])
return 1
end
return 0
FileStorage - Supported using exclusive file creation:
// Acquire: write lock file with wx flag (fails if exists)
await writeFile(lockPath, JSON.stringify({ ownerId, expiresAt }), { flag: 'wx' })
MemoryStorage - Not supported (single process, no distributed locking needed)
Initial Scan for Stalled Jobs
On startup, the Reaper scans all workers’ processing queues to catch jobs that were stalled before the Reaper started:
Implementation (from src/reaper.ts:403):
async #checkStalledJobs(): Promise<void> {
const workers = await this.#storage.getWorkers()
for (const workerId of workers) {
await this.#checkWorkerProcessingQueue(workerId)
}
}
async #checkWorkerProcessingQueue(workerId: string): Promise<void> {
const processingJobs = await this.#storage.getProcessingJobs(workerId)
for (const message of processingJobs) {
const queueMessage = this.#payloadSerde.deserialize(message)
const state = await this.#storage.getJobState(queueMessage.id)
if (!state) continue
const { status, timestamp } = parseState(state)
if (status === 'processing') {
const elapsed = Date.now() - timestamp
if (elapsed >= this.#visibilityTimeout) {
// Job is stalled
await this.#recoverStalledJob(queueMessage.id, workerId)
} else {
// Start timer for remaining time
const remaining = this.#visibilityTimeout - elapsed
setTimeout(() => this.#checkJob(queueMessage.id), remaining)
}
}
}
}
This ensures no stalled jobs are missed, even if they stalled before the Reaper started.
Reaper Configuration
const reaper = new Reaper<TPayload>({
storage, // Storage backend (required)
visibilityTimeout: 30000, // Milliseconds before job is stalled (default: 30000)
payloadSerde, // Custom payload serializer (default: JSON)
logger, // Pino logger (default: no-op)
leaderElection: { // Optional leader election config
enabled: false,
lockTTL: 30000,
renewalInterval: 10000,
acquireRetryInterval: 5000
}
})
Reaper Events
Monitor Reaper activity with events:
reaper.on('stalled', (id: string) => {
console.log(`Job ${id} was stalled and requeued`)
})
reaper.on('error', (error: Error) => {
console.error('Reaper error:', error)
})
reaper.on('leadershipAcquired', () => {
console.log('This reaper is now the leader')
})
reaper.on('leadershipLost', () => {
console.log('This reaper lost leadership')
})
Event definitions (from src/reaper.ts:26):
interface ReaperEvents {
error: [error: Error]
stalled: [id: string]
leadershipAcquired: []
leadershipLost: []
}
Resource Cleanup
The Reaper maintains per-job timers, so proper cleanup is important:
await reaper.start()
// Creates timers for processing jobs
await reaper.stop()
// 1. Clears all timers
// 2. Unsubscribes from events
// 3. Releases leader lock (if leader)
From src/reaper.ts:141:
async #becomeInactive(): Promise<void> {
// Clear all processing timers
for (const timer of this.#processingTimers.values()) {
clearTimeout(timer)
}
this.#processingTimers.clear()
// Unsubscribe from events
if (this.#unsubscribe) {
await this.#unsubscribe()
this.#unsubscribe = null
}
}
Always call reaper.stop() during graceful shutdown to clean up timers and release locks.
Example: Production Deployment
Deploy 3 workers and 2 Reapers with leader election:
// worker.ts (run 3 instances)
import { Queue, RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage({ url: process.env.REDIS_URL })
const queue = new Queue({
storage,
workerId: `worker-${process.env.HOSTNAME}-${process.pid}`,
concurrency: 10,
visibilityTimeout: 60000 // 1 minute
})
queue.execute(async (job) => {
// Process job
return result
})
await queue.start()
process.on('SIGTERM', async () => {
await queue.stop()
process.exit(0)
})
// reaper.ts (run 2 instances for HA)
import { Reaper, RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage({ url: process.env.REDIS_URL })
const reaper = new Reaper({
storage,
visibilityTimeout: 60000, // MUST match worker timeout
leaderElection: {
enabled: true,
lockTTL: 30000,
renewalInterval: 10000,
acquireRetryInterval: 5000
}
})
reaper.on('leadershipAcquired', () => {
console.log('Became leader')
})
reaper.on('stalled', (id) => {
console.warn(`Recovered stalled job: ${id}`)
})
await reaper.start()
process.on('SIGTERM', async () => {
await reaper.stop()
process.exit(0)
})
Deployment:
# docker-compose.yml
services:
redis:
image: redis:7-alpine
worker:
build: .
command: node worker.js
environment:
REDIS_URL: redis://redis:6379
deploy:
replicas: 3
reaper:
build: .
command: node reaper.js
environment:
REDIS_URL: redis://redis:6379
deploy:
replicas: 2 # High availability
Best Practices
Match visibility timeouts: Always set the Reaper’s visibilityTimeout to match your Queue’s timeout.
Run multiple Reapers with leader election: For production, deploy 2-3 Reaper instances with leader election for high availability.
Monitor stalled events: Track the stalled event rate. High rates indicate worker instability or timeout misconfiguration.
Don’t set timeout too short: If jobs regularly exceed the timeout, they’ll be incorrectly marked as stalled and requeued multiple times.
Don’t run Reaper with MemoryStorage in multi-process setups: MemoryStorage is single-process only. Use RedisStorage or FileStorage for distributed setups.