Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt

Use this file to discover all available pages before exploring further.

RedisStorage is a production-ready storage backend that uses Redis or Valkey for distributed, persistent job queue storage with atomic operations and pub/sub notifications.

Constructor

import { RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({
  url: 'redis://localhost:6379',
  keyPrefix: 'jq:',
  logger: pinoLogger
})

Configuration

url
string
default:"redis://localhost:6379"
Redis connection URL. Falls back to process.env.REDIS_URL if not provided.Supports standard Redis URL formats:
  • redis://localhost:6379
  • redis://user:pass@host:port/db
  • rediss:// for TLS connections
keyPrefix
string
default:"jq:"
Prefix for all Redis keys used by the job queue. Allows multiple queue instances to share the same Redis database.All keys follow the pattern: {keyPrefix}jobs, {keyPrefix}queue, {keyPrefix}processing:{workerId}, etc.
logger
Logger
default:"abstract logger"
Optional Pino logger instance for debugging and monitoring. If not provided, a no-op logger is used.

When to Use

RedisStorage is the recommended backend for:
  • Production deployments: High availability and durability
  • Distributed systems: Multiple worker processes or machines
  • High throughput: Efficient atomic operations with Lua scripts
  • Horizontal scaling: Add more workers without storage bottlenecks
  • Leader election: Built-in support for single-reaper coordination

Compatibility

Redis 7+ or Valkey 8+ required for optimal performance. Uses BLMOVE for blocking dequeue operations.
  • Redis 7.0+: Full support with blocking list operations
  • Valkey 8.0+: Fully compatible Redis fork
  • Redis 6.x: Not officially supported (missing BLMOVE)

Features

Atomic Operations

All critical operations use Lua scripts for atomicity:
  • Enqueue: Check for duplicates and add to queue atomically
  • Complete: Set state, store result, remove from processing queue, and publish notification in one operation
  • Fail: Similar to complete, but stores error instead
  • Retry: Update state, move to queue, notify subscribers atomically
  • Cancel: Delete job and publish event atomically

Pub/Sub Notifications

RedisStorage uses Redis pub/sub for real-time notifications:
  • Job notifications: Per-job channels for enqueueAndWait() response
  • Event stream: Global events channel for monitoring and reaper
  • Efficient routing: Only subscribes to channels when needed

Blocking Dequeue

RedisStorage uses Redis BLMOVE for efficient blocking dequeue:
// Worker blocks until a job is available or timeout
const job = await storage.dequeue(workerId, 30) // 30 second timeout
Benefits:
  • No polling: Workers sleep until jobs arrive
  • Fair distribution: Redis handles FIFO ordering
  • Connection pooling: Single blocking client handles all workers

Leader Election

Implements distributed leader election for reaper coordination:
// Try to become leader
const isLeader = await storage.acquireLeaderLock('reaper-lock', instanceId, 10000)

if (isLeader) {
  // Renew lock periodically
  await storage.renewLeaderLock('reaper-lock', instanceId, 10000)
  
  // Release on shutdown
  await storage.releaseLeaderLock('reaper-lock', instanceId)
}

TTL Management

Results and errors are stored with TTL expiry:
  • Envelope format: 8-byte timestamp header + payload
  • Lazy expiration: Checked on read, cleaned up by reaper
  • Backward compatible: Falls back to reading legacy non-envelope entries

Example

Basic Usage

import { Queue } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'

const logger = pino()

const queue = new Queue({
  storage: new RedisStorage({
    url: 'redis://localhost:6379',
    keyPrefix: 'myapp:jobs:',
    logger
  }),
  async process(job) {
    // Process job
    return { result: 'done' }
  }
})

await queue.start()

// Enqueue jobs
const job = await queue.enqueue({ data: 'task' })

await queue.stop()

Multiple Queues with Key Prefixes

// High-priority queue
const highPriorityQueue = new Queue({
  storage: new RedisStorage({ keyPrefix: 'high:' }),
  async process(job) { /* ... */ }
})

// Low-priority queue
const lowPriorityQueue = new Queue({
  storage: new RedisStorage({ keyPrefix: 'low:' }),
  async process(job) { /* ... */ }
})

await Promise.all([
  highPriorityQueue.start(),
  lowPriorityQueue.start()
])

Distributed Workers

// worker-1.js (Machine 1)
const queue = new Queue({
  storage: new RedisStorage({ url: 'redis://shared-redis:6379' }),
  async process(job) { return await processJob(job) }
})
await queue.start()

// worker-2.js (Machine 2)
const queue = new Queue({
  storage: new RedisStorage({ url: 'redis://shared-redis:6379' }),
  async process(job) { return await processJob(job) }
})
await queue.start()

// producer.js (Machine 3)
const queue = new Queue({
  storage: new RedisStorage({ url: 'redis://shared-redis:6379' }),
  startWorker: false // Producer-only mode
})
await queue.start()
const job = await queue.enqueue({ task: 'process me' })

Redis Key Structure

With default keyPrefix: 'jq:', RedisStorage uses these keys:
KeyTypePurpose
jq:jobsHashJob states (idstatus:timestamp[:workerId])
jq:queueListMain job queue (FIFO)
jq:processing:{workerId}ListPer-worker processing queues
jq:resultsHashJob results with TTL envelope
jq:errorsHashJob errors with TTL envelope
jq:workersHashActive worker registrations
jq:notify:{jobId}Pub/SubPer-job completion notifications
jq:eventsPub/SubGlobal event stream

Performance Considerations

Connection Pooling

RedisStorage creates three Redis connections:
  1. Main client: All standard operations (enqueue, state updates, etc.)
  2. Blocking client: Dedicated connection for BLMOVE operations
  3. Subscriber: Dedicated connection for pub/sub messages
All connections are automatically managed and cleaned up on disconnect.

Lua Script Caching

Scripts are loaded once during connect() and cached by SHA:
await storage.connect() // Loads all Lua scripts
// Subsequent operations use EVALSHA for efficiency

Batching

For bulk operations, use getJobStates() instead of multiple getJobState() calls:
// Efficient: single HMGET
const states = await storage.getJobStates(['job1', 'job2', 'job3'])

// Inefficient: three round-trips
const state1 = await storage.getJobState('job1')
const state2 = await storage.getJobState('job2')
const state3 = await storage.getJobState('job3')

Testing

For testing, use the clear() method to reset Redis state:
import { RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ keyPrefix: 'test:' })
await storage.connect()

// Run tests...

// Clean up
await storage.clear() // Deletes all keys with prefix 'test:'
await storage.disconnect()
Production warning: Never call clear() in production as it deletes all job data.

Build docs developers (and LLMs) love