Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Leader election allows you to run multiple Reaper instances for high availability. Only one Reaper (the “leader”) actively monitors and recovers stalled jobs at a time. If the leader fails, another instance automatically takes over.

Why Leader Election?

Without leader election:
  • Single Reaper = single point of failure
  • If Reaper crashes, stalled jobs won’t be recovered
  • No automatic failover
With leader election:
  • Multiple Reapers provide redundancy
  • Automatic failover when leader fails
  • Only one Reaper actively processes at a time (no duplicate recovery)
  • Zero-downtime Reaper deployments

Architecture

┌─────────────────────────────────────────┐
│           Redis Storage                 │
│  ┌─────────────────────────────────┐   │
│  │   reaper:lock (Leader Lock)     │   │
│  └─────────────────────────────────┘   │
└────────┬────────────┬───────────────────┘
         │            │
    ┌────▼─────┐ ┌───▼──────┐
    │ Reaper 1 │ │ Reaper 2 │
    │ (Leader) │ │(Follower)│
    │  Active  │ │  Standby │
    └──────────┘ └──────────┘

         │ Leader crashes

    ┌──────────┐ ┌──────────┐
    │ Reaper 1 │ │ Reaper 2 │
    │   Dead   │ │ (Leader) │
    │          │ │  Active  │
    └──────────┘ └──────────┘

Basic Configuration

Enable leader election when creating a Reaper:
import { Reaper, RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true,              // Enable leader election (default: false)
    lockTTL: 30000,             // Lock expiry in ms (default: 30s)
    renewalInterval: 10000,     // Renew lock every 10s (default: 1/3 of TTL)
    acquireRetryInterval: 5000  // Followers retry every 5s (default: 5s)
  }
})

reaper.on('leadershipAcquired', () => {
  console.log('This reaper is now the leader')
})

reaper.on('leadershipLost', () => {
  console.log('This reaper lost leadership')
})

await reaper.start()
From the source code (src/reaper.ts:11-16), leader election configuration includes enabled, lockTTL, renewalInterval, and acquireRetryInterval.

Configuration Options

enabled

Type: boolean
Default: false
Enables or disables leader election:
const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true
  }
})

lockTTL

Type: number (milliseconds)
Default: 30000 (30 seconds)
How long the leader lock is valid. If the leader crashes and doesn’t renew the lock, it expires after this duration:
leaderElection: {
  enabled: true,
  lockTTL: 45000  // Lock expires after 45 seconds
}
Set lockTTL longer than renewalInterval to avoid leadership flapping. A good rule of thumb is lockTTL >= 3 × renewalInterval.

renewalInterval

Type: number (milliseconds)
Default: 10000 (10 seconds, or 1/3 of lockTTL)
How often the leader renews its lock:
leaderElection: {
  enabled: true,
  lockTTL: 30000,
  renewalInterval: 10000  // Renew every 10 seconds
}

acquireRetryInterval

Type: number (milliseconds)
Default: 5000 (5 seconds)
How often followers try to acquire leadership:
leaderElection: {
  enabled: true,
  acquireRetryInterval: 5000  // Check for leadership every 5s
}

Redis SET NX PX Pattern

Leader election uses Redis’s atomic SET NX PX command. From src/reaper.ts:216-225:
async #tryAcquireLock (ttlMs: number): Promise<boolean> {
  if (!this.#storage.acquireLeaderLock) {
    // Storage doesn't support leader election
    this.#emitError(new Error('Storage does not support leader election'))
    return false
  }

  return this.#storage.acquireLeaderLock(LOCK_KEY, this.#reaperId, ttlMs)
}
The Redis implementation uses:
SET reaper:lock <reaper-id> NX PX <ttl-ms>
  • NX - Only set if key doesn’t exist (prevents multiple leaders)
  • PX - Set expiry in milliseconds (automatic failover)
  • reaper-id - Unique identifier for this Reaper instance

How It Works

The leader election process (from src/reaper.ts:159-212):
1

Startup - Try to acquire lock

Each Reaper attempts to acquire the leader lock when it starts:
const acquired = await this.#tryAcquireLock(lockTTL)
if (acquired) {
  this.#isLeader = true
  await this.#becomeActive()  // Subscribe and scan for stalled jobs
  this.emit('leadershipAcquired')
}
2

Leader - Renew lock periodically

The leader renews its lock every renewalInterval:
if (this.#isLeader) {
  const renewed = await this.#tryRenewLock(lockTTL)
  if (!renewed) {
    await this.#transitionToFollower()  // Lost leadership
  }
}
3

Follower - Retry acquisition

Followers try to acquire the lock every acquireRetryInterval:
if (!this.#isLeader) {
  const acquired = await this.#tryAcquireLock(lockTTL)
  if (acquired) {
    await this.#transitionToLeader()  // Become active
  }
}
4

Failover - Leader crashes

If the leader crashes without releasing the lock:
  • Lock expires after lockTTL milliseconds
  • First follower to retry acquisition becomes new leader
  • New leader starts monitoring stalled jobs

Multiple Reaper Setup

Run multiple Reaper instances for high availability:
import { Reaper, RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'

const logger = pino()
const storage = new RedisStorage({ 
  url: process.env.REDIS_URL,
  logger 
})

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true,
    lockTTL: 30000,
    renewalInterval: 10000,
    acquireRetryInterval: 5000
  },
  logger
})

// Track leadership changes
reaper.on('leadershipAcquired', () => {
  logger.info({ reaperId: reaper.reaperId }, 'Leadership acquired')
})

reaper.on('leadershipLost', () => {
  logger.warn({ reaperId: reaper.reaperId }, 'Leadership lost')
})

reaper.on('stalled', (id) => {
  logger.warn({ jobId: id }, 'Recovered stalled job')
})

await reaper.start()

logger.info({ 
  reaperId: reaper.reaperId,
  isLeader: reaper.isLeader 
}, 'Reaper started')

// Graceful shutdown
process.on('SIGTERM', async () => {
  logger.info('Shutting down reaper')
  await reaper.stop()  // Releases lock immediately
  process.exit(0)
})

Monitoring Leadership

Track which Reaper is currently the leader:
import { Reaper, RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'

const logger = pino()
const storage = new RedisStorage({ url: process.env.REDIS_URL })

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: { enabled: true },
  logger
})

// Check leadership status
setInterval(() => {
  logger.info({
    reaperId: reaper.reaperId,
    isLeader: reaper.isLeader
  }, 'Reaper status')
}, 10000)

reaper.on('leadershipAcquired', () => {
  // Emit metric to monitoring system
  metrics.gauge('reaper.is_leader', 1)
  logger.info({ reaperId: reaper.reaperId }, 'Now leading')
})

reaper.on('leadershipLost', () => {
  metrics.gauge('reaper.is_leader', 0)
  logger.warn({ reaperId: reaper.reaperId }, 'Lost leadership')
})

await reaper.start()

Events

Reaper emits two leadership events (from src/reaper.ts:26-31):
interface ReaperEvents {
  error: [error: Error]
  stalled: [id: string]
  leadershipAcquired: []
  leadershipLost: []
}

leadershipAcquired

Emitted when this Reaper becomes the leader:
reaper.on('leadershipAcquired', () => {
  console.log('I am now the leader')
  // Start custom monitoring, update service registry, etc.
})

leadershipLost

Emitted when this Reaper loses leadership:
reaper.on('leadershipLost', () => {
  console.log('I lost leadership')
  // Stop custom monitoring, remove from service registry, etc.
})
Leadership loss is detected when lock renewal fails. This can happen if:
  • Another Reaper acquired the lock (shouldn’t happen under normal operation)
  • Redis connection issues
  • The Reaper fell behind on renewals

Graceful Shutdown

When a leader shuts down gracefully, it releases the lock immediately (from src/reaper.ts:106-125):
async stop (): Promise<void> {
  if (!this.#running) return

  this.#running = false

  // Stop leadership loop
  if (this.#leadershipTimer) {
    clearInterval(this.#leadershipTimer)
    this.#leadershipTimer = null
  }

  // Release lock if we're leader
  if (this.#isLeader && this.#leaderElection.enabled) {
    await this.#releaseLeadership()
    this.#isLeader = false
  }

  // Become inactive (cleanup)
  await this.#becomeInactive()
}
This allows immediate failover without waiting for lock expiry:
process.on('SIGTERM', async () => {
  console.log('Shutting down...')
  await reaper.stop()  // Releases lock immediately
  console.log('Lock released, another Reaper can take over')
  process.exit(0)
})

Storage Backend Support

Leader election requires RedisStorage. Other storage backends (MemoryStorage, FileStorage) don’t support leader election and will emit an error.
From src/reaper.ts:216-222:
if (!this.#storage.acquireLeaderLock) {
  // Storage doesn't support leader election
  this.#emitError(new Error('Storage does not support leader election'))
  return false
}
Only use leader election with Redis:
import { Reaper, RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })

const reaper = new Reaper({
  storage,  // Must be RedisStorage
  leaderElection: { enabled: true }
})

Best Practices

Run at least 3 Reapers in production. This provides resilience against single instance failures while avoiding split-brain scenarios.
  • Set lockTTL to 3-5× the renewalInterval
  • Run Reapers in different availability zones for true HA
  • Monitor the leadershipAcquired and leadershipLost events
  • Set acquireRetryInterval low enough for quick failover (5-10s)
  • Use the same visibilityTimeout for Queue and Reaper
  • Handle leadership events in monitoring/alerting systems
  • Test failover scenarios during development
  • Ensure graceful shutdown releases lock quickly

Common Patterns

Combined Worker and Reaper

Run worker and Reaper in the same process:
import { Queue, Reaper, RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })

// Worker processes jobs
const queue = new Queue({
  storage,
  workerId: `worker-${process.pid}`,
  concurrency: 10,
  visibilityTimeout: 30000
})

// Reaper monitors stalled jobs with HA
const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: {
    enabled: true,
    lockTTL: 30000,
    renewalInterval: 10000
  }
})

queue.execute(async (job) => {
  return await processJob(job.payload)
})

await queue.start()
await reaper.start()

// Shutdown both
process.on('SIGTERM', async () => {
  await Promise.all([
    queue.stop(),
    reaper.stop()
  ])
  process.exit(0)
})

Health Check Endpoint

Expose Reaper status via HTTP:
import express from 'express'
import { Reaper, RedisStorage } from '@platformatic/job-queue'

const app = express()
const storage = new RedisStorage({ url: process.env.REDIS_URL })

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  leaderElection: { enabled: true }
})

await reaper.start()

// Health check endpoint
app.get('/health', (req, res) => {
  res.json({
    status: 'healthy',
    reaperId: reaper.reaperId,
    isLeader: reaper.isLeader
  })
})

// Readiness check (only ready if leader)
app.get('/ready', (req, res) => {
  if (reaper.isLeader) {
    res.json({ status: 'ready' })
  } else {
    res.status(503).json({ status: 'standby' })
  }
})

app.listen(3000)

Troubleshooting

Multiple Leaders

Problem: Multiple Reapers think they’re the leader. Solution: This shouldn’t happen with RedisStorage. Check:
  • Redis connection is stable
  • No clock skew between instances
  • lockTTL > renewalInterval

Leadership Flapping

Problem: Leadership constantly switches between Reapers. Solution: Increase lockTTL and renewalInterval:
leaderElection: {
  enabled: true,
  lockTTL: 60000,        // Increase from 30s to 60s
  renewalInterval: 20000  // Increase from 10s to 20s
}

Slow Failover

Problem: Takes too long for a follower to become leader after leader crashes. Solution: Decrease lockTTL and acquireRetryInterval:
leaderElection: {
  enabled: true,
  lockTTL: 15000,              // Faster expiry
  renewalInterval: 5000,       // Renew more frequently
  acquireRetryInterval: 2000   // Check more frequently
}

No Leader

Problem: All Reapers report isLeader: false. Solution: Check Redis connectivity and logs:
import pino from 'pino'

const logger = pino({ level: 'debug' })

const reaper = new Reaper({
  storage: new RedisStorage({ url: process.env.REDIS_URL, logger }),
  leaderElection: { enabled: true },
  logger
})

reaper.on('error', (error) => {
  console.error('Reaper error:', error)
})

See Also

Build docs developers (and LLMs) love