Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Graceful shutdown ensures that workers complete their current jobs before terminating. This prevents:
  • Lost work from interrupted jobs
  • Inconsistent state in external systems
  • Wasted resources from partial processing
  • Unnecessary retries of almost-complete work

How It Works

When queue.stop() is called:
1

Stop accepting new jobs

The consumer stops dequeuing new jobs from the queue.
2

Wait for in-flight jobs

The queue waits for all currently processing jobs to complete, up to the visibilityTimeout duration.
3

Requeue incomplete jobs

Any jobs that don’t complete within the timeout are returned to the queue for retry.
4

Disconnect from storage

Finally, the storage connection is closed cleanly.

Basic Example

The stop() method handles graceful shutdown automatically:
import { Queue, MemoryStorage } from '@platformatic/job-queue'

const storage = new MemoryStorage()
const queue = new Queue({
  storage,
  visibilityTimeout: 30000  // Jobs have 30s to complete
})

queue.execute(async (job) => {
  // Long-running work...
  await doWork()
  return result
})

await queue.start()

// Handle shutdown signals
process.on('SIGTERM', async () => {
  console.log('Shutting down...')
  await queue.stop()  // Waits for in-flight jobs
  process.exit(0)
})
From the source code (src/consumer.ts:132-169), the consumer waits up to visibilityTimeout milliseconds for active jobs to complete before forcing shutdown.

Cancellation Detection

Jobs can detect when a shutdown is in progress using job.signal:
queue.execute(async (job) => {
  // Check for cancellation
  if (job.signal.aborted) {
    throw new Error('Job cancelled')
  }

  // Long-running work...
  await doWork()

  // Check again before committing
  if (job.signal.aborted) {
    throw new Error('Job cancelled')
  }

  return result
})
The job.signal is an AbortSignal that gets triggered in two scenarios:
  1. During graceful shutdown - When stop() is called and jobs need to be requeued
  2. When visibility timeout expires - When a job runs longer than expected

Complete Shutdown Example

A production-ready shutdown handler that handles multiple signals:
import { Queue, RedisStorage, Reaper } from '@platformatic/job-queue'
import pino from 'pino'

const logger = pino()
const storage = new RedisStorage({ url: process.env.REDIS_URL })

const queue = new Queue({
  storage,
  workerId: `worker-${process.pid}`,
  concurrency: 10,
  visibilityTimeout: 30000,
  logger
})

const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000,
  logger
})

queue.execute(async (job) => {
  // Check for cancellation
  if (job.signal.aborted) {
    throw new Error('Job cancelled')
  }

  // Long-running work...
  await doWork()

  return result
})

await queue.start()
await reaper.start()

logger.info('Worker started')

// Graceful shutdown function
let isShuttingDown = false

async function shutdown(signal: string) {
  if (isShuttingDown) {
    logger.warn('Already shutting down, please wait...')
    return
  }
  
  isShuttingDown = true
  logger.info({ signal }, 'Received shutdown signal')
  
  try {
    // Stop accepting new jobs
    await queue.stop()
    logger.info('Queue stopped')
    
    // Stop the reaper
    await reaper.stop()
    logger.info('Reaper stopped')
    
    logger.info('Shutdown complete')
    process.exit(0)
  } catch (error) {
    logger.error({ error }, 'Error during shutdown')
    process.exit(1)
  }
}

// Handle shutdown signals
process.on('SIGTERM', () => shutdown('SIGTERM'))
process.on('SIGINT', () => shutdown('SIGINT'))

// Handle uncaught errors
process.on('uncaughtException', (error) => {
  logger.error({ error }, 'Uncaught exception')
  shutdown('uncaughtException')
})

process.on('unhandledRejection', (reason) => {
  logger.error({ reason }, 'Unhandled rejection')
  shutdown('unhandledRejection')
})

Visibility Timeout Relationship

The visibilityTimeout determines how long stop() will wait:
const queue = new Queue({
  storage,
  visibilityTimeout: 30000  // Wait up to 30s during shutdown
})
// Jobs typically complete in < 5s
const queue = new Queue({
  storage,
  concurrency: 20,
  visibilityTimeout: 10000  // 10s timeout
})

queue.execute(async (job) => {
  // Quick work
  return await processQuickly(job.payload)
})

Job Requeuing

Jobs that don’t complete during shutdown are automatically requeued:
// From src/consumer.ts:186-194
if (abortSignal.aborted) {
  // Put message back
  const queueMessage = this.#deserializeMessage(message)
  this.#logger.warn({ id: queueMessage.id }, 'Consumer aborted while holding job, requeueing.')
  await this.#storage.requeue(queueMessage.id, message, this.#workerId)
  this.emit('requeued', queueMessage.id)
  break
}
Listen for the requeued event to track this:
queue.on('requeued', (id) => {
  console.log(`Job ${id} was returned to queue (e.g., during graceful shutdown)`)
})

Kubernetes/Docker Integration

Configure your deployment to allow time for graceful shutdown:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: job-worker
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: worker
        image: my-worker:latest
        lifecycle:
          preStop:
            exec:
              # Give workers time to finish
              command: ["/bin/sh", "-c", "sleep 5"]
      # Must be longer than visibilityTimeout
      terminationGracePeriodSeconds: 45
Ensure terminationGracePeriodSeconds is longer than your visibilityTimeout to avoid forceful termination of in-flight jobs.

Testing Graceful Shutdown

Test your shutdown logic during development:
import { Queue, MemoryStorage } from '@platformatic/job-queue'
import { setTimeout } from 'node:timers/promises'

const storage = new MemoryStorage()
const queue = new Queue({
  storage,
  concurrency: 1,
  visibilityTimeout: 10000
})

let jobsCompleted = 0
let jobsCancelled = 0

queue.execute(async (job) => {
  console.log(`Started job ${job.id}`)
  
  // Simulate work with cancellation checks
  for (let i = 0; i < 10; i++) {
    if (job.signal.aborted) {
      console.log(`Job ${job.id} was cancelled at step ${i}`)
      jobsCancelled++
      throw new Error('Cancelled')
    }
    await setTimeout(500) // 500ms per step
  }
  
  console.log(`Completed job ${job.id}`)
  jobsCompleted++
  return { success: true }
})

queue.on('requeued', (id) => {
  console.log(`Job ${id} was requeued`)
})

await queue.start()

// Enqueue some jobs
for (let i = 0; i < 5; i++) {
  await queue.enqueue(`job-${i}`, { data: i })
}

// Wait a bit, then shutdown
await setTimeout(3000)
console.log('Initiating shutdown...')

await queue.stop()

console.log(`Completed: ${jobsCompleted}, Cancelled: ${jobsCancelled}`)

Best Practices

Check job.signal.aborted before expensive operations. This allows jobs to exit quickly during shutdown instead of wasting resources on work that will be discarded.
  • Set visibilityTimeout to be longer than your longest expected job
  • Use job.signal.aborted checks at natural boundaries in your job logic
  • Configure infrastructure timeout longer than visibilityTimeout
  • Log shutdown events for debugging and monitoring
  • Test graceful shutdown in development
  • Handle SIGTERM and SIGINT signals
  • Avoid long-running synchronous operations that can’t be interrupted

Common Issues

Jobs Keep Getting Requeued

Problem: Jobs are repeatedly requeued during shutdown. Solution: Increase visibilityTimeout to give jobs more time to complete:
const queue = new Queue({
  storage,
  visibilityTimeout: 60000 // Increase from 30s to 60s
})

Process Won’t Exit

Problem: Process hangs after calling stop(). Solution: Check for lingering resources (see src/CLAUDE.md guidelines):
// Ensure all timers and connections are cleaned up
await queue.stop()
await storage.disconnect()

// If using custom resources, clean them up
clearInterval(myInterval)
myConnection.close()

Jobs Lost During Shutdown

Problem: Jobs disappear when worker shuts down. Solution: Ensure you’re running a Reaper to recover stalled jobs:
const reaper = new Reaper({
  storage,
  visibilityTimeout: 30000 // Match queue timeout
})

await reaper.start()

// Shutdown both
process.on('SIGTERM', async () => {
  await queue.stop()
  await reaper.stop() // Don't forget this!
})

See Also

Build docs developers (and LLMs) love