Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt

Use this file to discover all available pages before exploring further.

Quick Start

This guide walks you through creating a complete working job queue from scratch. You’ll learn how to create a queue, register a handler, enqueue jobs, and handle results.

Prerequisites

  • Node.js 22.19.0 or later installed
  • @platformatic/job-queue package installed (Installation Guide)

Complete Example

Let’s build an email processing queue that demonstrates all core features:
1

Create the queue with storage

First, import the required classes and create a queue with in-memory storage:
index.ts
import { Queue, MemoryStorage } from '@platformatic/job-queue'

// Create a queue with in-memory storage
const storage = new MemoryStorage()
const queue = new Queue<{ email: string }, { sent: boolean }>({
  storage,
  concurrency: 5
})
The generic types <TPayload, TResult> provide full type safety for your job data and results.
2

Register a job handler

Define how your queue should process jobs:
index.ts
// Register a job handler
queue.execute(async (job) => {
  console.log(`Processing job ${job.id}:`, job.payload)
  // Send email...
  return { sent: true }
})
The handler receives a Job object with:
  • id - Unique job identifier
  • payload - The job data (typed as { email: string })
  • attempts - Current attempt number (starts at 1)
  • signal - AbortSignal for cancellation
3

Start the queue

Connect to storage and start processing:
index.ts
// Start the queue
await queue.start()
console.log('Queue started and ready to process jobs')
4

Enqueue jobs (fire-and-forget)

Add jobs to the queue without waiting for results:
index.ts
// Enqueue jobs
await queue.enqueue('email-1', { email: 'user@example.com' })
console.log('Job enqueued')

// Optional per-job TTL override for cached result/error
await queue.enqueue('email-ttl', { email: 'ttl@example.com' }, {
  resultTTL: 5 * 60 * 1000 // 5 minutes
})
Jobs are deduplicated by ID. If you enqueue the same ID twice, it will only be processed once.
5

Enqueue and wait for results

For request/response patterns, use enqueueAndWait():
index.ts
// Or wait for the result
const result = await queue.enqueueAndWait('email-2', { email: 'another@example.com' }, {
  timeout: 30000,
  resultTTL: 24 * 60 * 60 * 1000 // keep this result for 24h
})
console.log('Result:', result) // { sent: true }
This will:
  1. Enqueue the job
  2. Wait for it to be processed
  3. Return the result (or throw if it fails)
6

Graceful shutdown

Stop the queue gracefully, waiting for in-flight jobs to complete:
index.ts
// Graceful shutdown
await queue.stop()
console.log('Queue stopped gracefully')

Complete Working Code

Here’s the full example in one file:
index.ts
import { Queue, MemoryStorage } from '@platformatic/job-queue'

// Create a queue with in-memory storage
const storage = new MemoryStorage()
const queue = new Queue<{ email: string }, { sent: boolean }>({
  storage,
  concurrency: 5
})

// Register a job handler
queue.execute(async (job) => {
  console.log(`Processing job ${job.id}:`, job.payload)
  // Send email...
  return { sent: true }
})

// Start the queue
await queue.start()

// Enqueue jobs
await queue.enqueue('email-1', { email: 'user@example.com' })

// Optional per-job TTL override for cached result/error
await queue.enqueue('email-ttl', { email: 'ttl@example.com' }, {
  resultTTL: 5 * 60 * 1000 // 5 minutes
})

// Or wait for the result
const result = await queue.enqueueAndWait('email-2', { email: 'another@example.com' }, {
  timeout: 30000,
  resultTTL: 24 * 60 * 60 * 1000 // keep this result for 24h
})
console.log('Result:', result) // { sent: true }

// Graceful shutdown
await queue.stop()

Run the Example

Save the code above as index.ts and run it:
node --experimental-strip-types index.ts

Expected Output

Queue started and ready to process jobs
Processing job email-1: { email: 'user@example.com' }
Job enqueued
Processing job email-ttl: { email: 'ttl@example.com' }
Processing job email-2: { email: 'another@example.com' }
Result: { sent: true }
Queue stopped gracefully

Understanding the Flow

When you call enqueue(id, payload, options?), the library:
  1. Checks if a job with that ID already exists
  2. If it’s new, serializes the payload and adds it to the queue
  3. Returns immediately with status 'queued', 'duplicate', or 'completed'
  4. The consumer picks up the job asynchronously
When you call enqueueAndWait(id, payload, options?), the library:
  1. Enqueues the job (same as enqueue())
  2. Subscribes to completion events for that job ID
  3. Waits until the job completes or times out
  4. Returns the result or throws an error
This is perfect for request/response patterns across services.
Jobs are identified by their ID. When you enqueue a job:
  • If no job with that ID exists, it’s queued
  • If a job is queued/processing, you get status 'duplicate'
  • If a job completed, you get the cached result immediately
Results are cached for resultTTL milliseconds (default: 1 hour).
When you call stop():
  1. The queue stops accepting new jobs
  2. In-flight jobs continue processing
  3. The queue waits up to visibilityTimeout for jobs to complete
  4. Any incomplete jobs are returned to the queue
  5. Storage is disconnected

Configuration Options

The Queue constructor accepts these options:
OptionTypeDefaultDescription
storageStoragerequiredStorage backend instance
workerIdstringuuid()Unique identifier for this worker
concurrencynumber1Number of jobs to process in parallel
maxRetriesnumber3Maximum retry attempts for failed jobs
blockTimeoutnumber5Seconds to wait when polling for jobs
visibilityTimeoutnumber30000Milliseconds before a processing job is considered stalled
resultTTLnumber3600000Milliseconds to cache job results (1 hour)
loggerpino.LoggerabstractLoggerLogger instance

Error Handling

Handle errors using try-catch with typed error classes:
import { TimeoutError, JobFailedError } from '@platformatic/job-queue'

try {
  const result = await queue.enqueueAndWait('job-1', payload, {
    timeout: 10000
  })
  console.log('Got result:', result)
} catch (error) {
  if (error instanceof TimeoutError) {
    console.log('Request timed out')
  } else if (error instanceof JobFailedError) {
    console.log('Job failed:', error.originalError)
  }
}
Jobs that fail will be retried up to maxRetries times with exponential backoff. After all retries are exhausted, they enter the 'failed' state.

Listening to Events

The queue emits events for monitoring:
// Job events
queue.on('enqueued', (id) => {
  console.log(`Job ${id} was enqueued`)
})

queue.on('completed', (id, result) => {
  console.log(`Job ${id} completed:`, result)
})

queue.on('failed', (id, error) => {
  console.log(`Job ${id} failed:`, error.message)
})

queue.on('failing', (id, error, attempt) => {
  console.log(`Job ${id} failed attempt ${attempt}, will retry:`, error.message)
})

// Lifecycle events
queue.on('started', () => {
  console.log('Queue started')
})

queue.on('stopped', () => {
  console.log('Queue stopped')
})

// Error events
queue.on('error', (error) => {
  console.error('Queue error:', error)
})

Next Steps

Now that you have a working queue, explore more advanced features:

Storage Backends

Learn about Redis, FileStorage, and MemoryStorage

Queue Configuration

Configure retries, timeouts, and concurrency

Stalled Job Recovery

Set up the Reaper for automatic recovery

Producer/Consumer

Separate producers and consumers across services

Build docs developers (and LLMs) love