Documentation Index
Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt
Use this file to discover all available pages before exploring further.
The Reaper class is a background service that monitors jobs and automatically recovers stalled ones. A job is considered stalled when it has been in the “processing” state longer than the visibility timeout, which typically happens when a worker crashes or loses connection.
Class Signature
class Reaper<TPayload> extends EventEmitter<ReaperEvents>
Type Parameters
TPayload - The type of the job payload
How It Works
The Reaper operates in two modes:
- Single Instance Mode (default): One Reaper instance actively monitors jobs
- Leader Election Mode: Multiple Reaper instances can run for high availability, with only one active at a time
Stall Detection
The Reaper detects stalled jobs by:
- Subscribing to job state change events (when a job starts processing)
- Starting a timer equal to the visibility timeout for each processing job
- When the timer expires, checking if the job is still in the “processing” state
- If stalled, recovering the job by requeueing it
Initial Scan
When the Reaper starts (or becomes leader), it performs an initial scan of all workers’ processing queues to catch any jobs that were stalled before the Reaper started.
Constructor
new Reaper<TPayload>(config: ReaperConfig<TPayload>)
Creates a new Reaper instance with the specified configuration.
Configuration
The storage backend instance (must be the same instance used by the Queue).
Custom serializer/deserializer for job payloads. Must match the Queue’s serializer. Defaults to JSON serialization.
Maximum processing time in milliseconds before a job is considered stalled. Should match the Queue’s visibilityTimeout setting.
Leader election configuration for high availability deployments.
Whether to enable leader election. Set to true for multi-instance deployments.
Time-to-live for the leader lock in milliseconds. The leader must renew the lock before this expires.
leaderElection.renewalInterval
How often the leader renews its lock in milliseconds. Should be significantly less than lockTTL.
leaderElection.acquireRetryInterval
How often followers attempt to acquire leadership in milliseconds.
Pino logger instance. Defaults to an abstract no-op logger.
Example
import { Reaper } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'
const storage = new RedisStorage({ url: 'redis://localhost:6379' })
const reaper = new Reaper({
storage,
visibilityTimeout: 30000,
logger: pino()
})
Example with Leader Election
const reaper = new Reaper({
storage,
visibilityTimeout: 30000,
leaderElection: {
enabled: true,
lockTTL: 30000,
renewalInterval: 10000,
acquireRetryInterval: 5000
},
logger: pino()
})
Properties
reaperId
The unique identifier for this Reaper instance.
Example
console.log('Reaper ID:', reaper.reaperId)
isLeader
Indicates whether this Reaper instance is currently the active leader (only relevant when leader election is enabled).
Example
if (reaper.isLeader) {
console.log('This reaper is the active leader')
}
Methods
start()
Starts the Reaper. If leader election is disabled, begins monitoring immediately. If leader election is enabled, attempts to acquire leadership.
Resolves when the Reaper has started (but not necessarily acquired leadership).
Example
await reaper.start()
console.log('Reaper started')
stop()
Stops the Reaper gracefully. Clears all timers, unsubscribes from events, and releases leadership if held.
Resolves when the Reaper has stopped completely.
Example
await reaper.stop()
console.log('Reaper stopped')
Events
The Reaper class emits various events during its operation.
stalled
Emitted when a stalled job is detected and recovered.
reaper.on('stalled', (id: string) => {
console.log(`Recovered stalled job: ${id}`)
})
leadershipAcquired
Emitted when this Reaper instance acquires leadership (only emitted when leader election is enabled).
reaper.on('leadershipAcquired', () => {
console.log(`Reaper ${reaper.reaperId} became leader`)
})
leadershipLost
Emitted when this Reaper instance loses leadership (only emitted when leader election is enabled).
reaper.on('leadershipLost', () => {
console.warn(`Reaper ${reaper.reaperId} lost leadership`)
})
error
Emitted when an error occurs in the Reaper’s internal operations.
reaper.on('error', (error: Error) => {
console.error('Reaper error:', error)
})
Usage Patterns
Single Instance Deployment
For development or simple deployments with a single worker:
import { Queue, Reaper } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage({ url: 'redis://localhost:6379' })
const queue = new Queue({
storage,
visibilityTimeout: 30000
})
const reaper = new Reaper({
storage,
visibilityTimeout: 30000 // Must match queue's setting
})
// Register handler
queue.execute(async (job) => {
// Process job
})
// Start both
await queue.start()
await reaper.start()
// Monitor for stalled jobs
reaper.on('stalled', (id) => {
console.log(`Job ${id} was stalled and recovered`)
})
High Availability Deployment
For production deployments with multiple workers and Reaper instances:
import { Reaper } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'
const storage = new RedisStorage({ url: 'redis://localhost:6379' })
const logger = pino()
const reaper = new Reaper({
storage,
visibilityTimeout: 30000,
leaderElection: {
enabled: true,
lockTTL: 30000, // Leader lock expires after 30s
renewalInterval: 10000, // Leader renews every 10s
acquireRetryInterval: 5000 // Followers check every 5s
},
logger
})
// Monitor leadership changes
reaper.on('leadershipAcquired', () => {
logger.info({ reaperId: reaper.reaperId }, 'Became leader')
})
reaper.on('leadershipLost', () => {
logger.warn({ reaperId: reaper.reaperId }, 'Lost leadership')
})
reaper.on('stalled', (id) => {
logger.info({ jobId: id }, 'Recovered stalled job')
})
reaper.on('error', (error) => {
logger.error({ err: error }, 'Reaper error')
})
await reaper.start()
// In this configuration:
// - Multiple Reaper instances can run safely
// - Only one will be active (the leader)
// - If the leader crashes, another will take over within ~5-10s
// - The leader renews its lock every 10s
// - The lock expires after 30s of no renewal
Separate Reaper Process
You can run the Reaper in a separate process from your workers:
// reaper-service.ts
import { Reaper } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'
const logger = pino()
const storage = new RedisStorage({
url: process.env.REDIS_URL || 'redis://localhost:6379'
})
const reaper = new Reaper({
storage,
visibilityTimeout: 30000,
leaderElection: {
enabled: true,
lockTTL: 30000,
renewalInterval: 10000,
acquireRetryInterval: 5000
},
logger
})
reaper.on('stalled', (id) => {
logger.info({ jobId: id }, 'Recovered stalled job')
})
reaper.on('error', (error) => {
logger.error({ err: error }, 'Reaper error')
})
async function main() {
await storage.connect()
await reaper.start()
logger.info('Reaper service started')
}
// Graceful shutdown
process.on('SIGTERM', async () => {
logger.info('Shutting down reaper service')
await reaper.stop()
await storage.disconnect()
process.exit(0)
})
main().catch((error) => {
logger.error({ err: error }, 'Failed to start reaper service')
process.exit(1)
})
Best Practices
Visibility Timeout
Set the visibilityTimeout to a value slightly longer than your longest expected job duration:
// If jobs typically take 1-2 minutes
const visibilityTimeout = 180000 // 3 minutes
const queue = new Queue({ storage, visibilityTimeout })
const reaper = new Reaper({ storage, visibilityTimeout })
Leader Election Timing
For leader election, ensure proper timing relationships:
// Good configuration:
const leaderElection = {
enabled: true,
lockTTL: 30000, // 30 seconds
renewalInterval: 10000, // Renew every 10s (1/3 of TTL)
acquireRetryInterval: 5000 // Check every 5s
}
// renewalInterval should be < lockTTL / 2 for safety margin
// acquireRetryInterval determines failover speed
Error Handling
Always listen for errors:
reaper.on('error', (error) => {
// Log to monitoring system
logger.error({ err: error }, 'Reaper encountered an error')
// Optionally alert on critical errors
if (error.message.includes('connection')) {
alertOps('Reaper lost connection to storage')
}
})
Storage Connection
Ensure storage is connected before starting the Reaper:
const storage = new RedisStorage({ url: 'redis://localhost:6379' })
await storage.connect()
const reaper = new Reaper({ storage })
await reaper.start()
Graceful Shutdown
Always stop the Reaper gracefully:
process.on('SIGTERM', async () => {
await reaper.stop()
await storage.disconnect()
process.exit(0)
})
Monitoring
Track Reaper health and effectiveness:
const metrics = {
stalledJobsRecovered: 0,
leadershipChanges: 0,
errors: 0
}
reaper.on('stalled', (id) => {
metrics.stalledJobsRecovered++
logger.info({
jobId: id,
total: metrics.stalledJobsRecovered
}, 'Stalled job recovered')
})
reaper.on('leadershipAcquired', () => {
metrics.leadershipChanges++
})
reaper.on('leadershipLost', () => {
metrics.leadershipChanges++
})
reaper.on('error', (error) => {
metrics.errors++
logger.error({ err: error, total: metrics.errors }, 'Reaper error')
})
// Periodically log metrics
setInterval(() => {
logger.info({
metrics,
isLeader: reaper.isLeader,
reaperId: reaper.reaperId
}, 'Reaper metrics')
}, 60000)