Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt

Use this file to discover all available pages before exploring further.

Platformatic Job Queue supports three storage backends, each optimized for different use cases:
  • MemoryStorage: In-memory storage for development and testing
  • RedisStorage: Production-ready storage using Redis or Valkey
  • FileStorage: Filesystem-based storage for single-node deployments

Storage Interface

All storage backends implement the Storage interface defined in src/storage/types.ts. This ensures consistent behavior across different backends.

Core Operations

Every storage backend must support:
  • Queue operations: enqueue, dequeue, requeue, ack
  • Job state management: getJobState, setJobState, deleteJob
  • Result storage: setResult, getResult, setError, getError
  • Worker tracking: registerWorker, unregisterWorker, getWorkers
  • Notifications: subscribeToJob, notifyJobComplete, publishEvent
  • Atomic operations: completeJob, failJob, retryJob

MemoryStorage

In-memory storage using JavaScript data structures. Ideal for development, testing, and single-process scenarios.
import { Queue, MemoryStorage } from '@platformatic/job-queue'

const storage = new MemoryStorage()
const queue = new Queue({ storage })

Characteristics

Data Persistence: None - all data is lost when the process exits
Concurrency: Single process only - does not support multiple workers across processes
Performance: Fastest option - no I/O overhead, all operations in memory

Implementation Details

Internal structures (from src/storage/memory.ts:22):
class MemoryStorage implements Storage {
  #queue: Buffer[] = []                          // Main job queue (FIFO)
  #processingQueues: Map<string, Buffer[]>       // Per-worker processing queues
  #jobs: Map<string, string>                     // Job states (id -> state string)
  #results: Map<string, StoredResult>            // Cached results with expiry
  #errors: Map<string, StoredResult>             // Cached errors with expiry
  #workers: Map<string, WorkerInfo>              // Active workers with TTL
  #eventEmitter: EventEmitter                    // Event pub/sub
  #notifyEmitter: EventEmitter                   // Job notifications
  #cleanupInterval: NodeJS.Timeout               // TTL cleanup timer
  #dequeueWaiters: DequeueWaiter[]              // Blocked dequeue calls
}

Blocking Dequeue

MemoryStorage implements blocking dequeue using promises:
async dequeue(workerId: string, timeout: number): Promise<Buffer | null> {
  // Try immediate dequeue
  const message = this.#queue.shift()
  if (message) {
    this.#addToProcessingQueue(workerId, message)
    return message
  }

  // Wait for a job with timeout
  return new Promise(resolve => {
    const timeoutId = setTimeout(() => {
      // Remove from waiters and return null
      const index = this.#dequeueWaiters.findIndex(w => w.resolve === resolve)
      if (index !== -1) this.#dequeueWaiters.splice(index, 1)
      resolve(null)
    }, timeout * 1000)

    this.#dequeueWaiters.push({ workerId, resolve, timeoutId })
  })
}

TTL Cleanup

A background interval cleans up expired results, errors, and workers every second:
await storage.connect()  // Starts cleanup interval
await storage.disconnect()  // Stops cleanup interval and clears data

When to Use

  • ✅ Unit tests and integration tests
  • ✅ Local development
  • ✅ Simple single-process applications
  • ❌ Production deployments
  • ❌ Multi-worker setups
  • ❌ Data persistence requirements

RedisStorage

Production-ready storage using Redis 7+ or Valkey 8+. Supports distributed workers, atomic operations, and real-time notifications.
import { RedisStorage } from '@platformatic/job-queue'
import pino from 'pino'

const storage = new RedisStorage({
  url: 'redis://localhost:6379',
  keyPrefix: 'myapp:',                    // Optional: prefix all keys
  logger: pino()                           // Optional: pino logger
})

const queue = new Queue({ storage })
await queue.start()

Configuration

url
string
default:"redis://localhost:6379"
Redis connection URL. Can also be set via REDIS_URL environment variable.
keyPrefix
string
default:"jq:"
Prefix for all Redis keys. Useful for namespacing multiple queues in the same Redis instance.
logger
pino.Logger
Optional Pino logger for debugging Redis operations.

Characteristics

Data Persistence: Durable - survives process restarts (depends on Redis persistence configuration)
Concurrency: Fully distributed - supports multiple workers across processes and servers
Performance: High throughput with atomic Lua scripts and blocking operations

Redis Key Structure

With default prefix jq::
  • jq:queue - Main job queue (LIST)
  • jq:processing:<workerId> - Per-worker processing queue (LIST)
  • jq:jobs - Job states (HASH: id → state)
  • jq:results - Job results with TTL (HASH: id → result)
  • jq:errors - Job errors with TTL (HASH: id → error)
  • jq:workers - Active workers (HASH: workerId → timestamp)
  • jq:notify:<jobId> - Job notification channels (PUBSUB)
  • jq:events - Job state change events (PUBSUB)
  • jq:reaper:lock - Reaper leader election lock (STRING)

Atomic Operations with Lua Scripts

RedisStorage uses Lua scripts for atomic multi-key operations. Scripts are loaded once at connection time:
await storage.connect()  // Loads and caches Lua script SHAs
Available scripts (from redis-scripts/ directory):
  • enqueue.lua - Atomically check for duplicates and enqueue
  • complete.lua - Mark job as completed, store result, remove from processing queue
  • fail.lua - Mark job as failed, store error, remove from processing queue
  • retry.lua - Update attempts and requeue failed job
  • cancel.lua - Delete job if not processing
  • renew-leader-lock.lua - Renew Reaper leader lock
  • release-leader-lock.lua - Release Reaper leader lock

Blocking Dequeue

RedisStorage uses BLMOVE for efficient blocking dequeue:
const message = await storage.dequeue(workerId, timeout)
// Internally: BLMOVE jq:queue jq:processing:<workerId> LEFT RIGHT <timeout>
This atomically moves jobs from the main queue to the worker’s processing queue.

TTL Implementation

Results and errors are stored with an 8-byte header containing the expiry timestamp:
// Encoding (from src/storage/redis.ts:82)
#encodeExpiringValue(value: Buffer, ttlMs: number): Buffer {
  const buffer = Buffer.allocUnsafe(8 + value.length)
  buffer.writeBigInt64BE(BigInt(Date.now() + ttlMs))  // First 8 bytes: expiry
  value.copy(buffer, 8)                                // Remaining bytes: payload
  return buffer
}
On retrieval, the expiry is checked and expired values are deleted:
const result = await storage.getResult(id)
// Checks expiry timestamp, deletes if expired, returns payload

Pub/Sub for Real-Time Notifications

RedisStorage maintains a dedicated subscriber client for notifications:
// Subscribing to job completion (from src/storage/redis.ts:346)
const unsubscribe = await storage.subscribeToJob(id, (status) => {
  if (status === 'completed') {
    console.log('Job completed!')
  }
})

// Internally uses Redis SUBSCRIBE to jq:notify:<jobId>

Connection Management

RedisStorage maintains three Redis connections:
class RedisStorage {
  #client: Redis                // Main client for commands
  #subscriber: Redis            // Dedicated subscriber for pub/sub
  #blockingClient: Redis        // Dedicated client for BLMOVE (blocking operations)
}
Always call disconnect() to properly close all three connections.

When to Use

  • ✅ Production deployments
  • ✅ Distributed worker systems
  • ✅ High-throughput workloads
  • ✅ Multi-server deployments
  • ✅ Need for data persistence
  • ✅ Request/response patterns with enqueueAndWait()
  • ❌ Serverless environments (connection overhead)
  • ❌ Edge computing scenarios

FileStorage

Filesystem-based storage using atomic file operations. Suitable for single-node deployments where Redis is not available.
import { FileStorage } from '@platformatic/job-queue'

const storage = new FileStorage({
  basePath: '/var/lib/myapp/queue'
})

const queue = new Queue({ storage })
await queue.start()

Configuration

basePath
string
required
Directory path for storing queue data. Must be writable by the process.

Characteristics

Data Persistence: Durable - all data persists to disk
Concurrency: Single filesystem - works across processes on the same machine, but not across servers
Performance: Slower than Redis due to disk I/O, but faster than remote Redis with high latency

Directory Structure

FileStorage creates the following directory structure:
basePath/
├── queue/              # Queued jobs (sequence-id.msg files)
├── processing/         # Per-worker processing queues
│   ├── worker-1/
│   └── worker-2/
├── jobs/               # Job state files (id.state)
├── results/            # Job results (id.result + id.ttl)
├── errors/             # Job errors (id.error + id.ttl)
├── workers/            # Active workers (workerId.worker)
├── notify/             # Notification files (id-timestamp.notify)
└── cleanup-leader.lock # Leader election lock for TTL cleanup

FIFO Ordering with Sequence Numbers

Jobs are enqueued with monotonic sequence numbers for strict FIFO ordering:
// Example queue files:
000000000001-email-123.msg
000000000002-email-456.msg
000000000003-email-789.msg
Sequence numbers are padded to 12 digits and sorted lexicographically.

Atomic Operations

FileStorage uses fast-write-atomic for atomic writes:
import { promise as writeFileAtomic } from 'fast-write-atomic'

// Atomic write: creates temp file, then renames atomically
await writeFileAtomic(filePath, data)
Dequeue uses atomic rename() to claim jobs:
// Try to claim job by moving it to processing directory
try {
  await rename(srcPath, dstPath)  // Only one worker succeeds
  return await readFile(dstPath)
} catch {
  // Another worker claimed it, try next job
}

File Watching for Notifications

FileStorage uses fs.watch() for real-time notifications:
const watcher = watch(this.#queuePath, { signal })
for await (const event of watcher) {
  if (event.eventType === 'rename' && event.filename?.endsWith('.msg')) {
    this.#notifyDequeueWaiters()  // Wake up waiting workers
  }
}

TTL Cleanup with Leader Election

Multiple FileStorage instances on the same filesystem elect a leader to perform TTL cleanup:
const acquired = await storage.acquireLeaderLock('cleanup-leader', instanceId, ttlMs)
if (acquired) {
  // This instance becomes the cleanup leader
  // Runs cleanup interval every second
}
Leader election prevents multiple processes from competing for cleanup I/O.

When to Use

  • ✅ Single-server deployments
  • ✅ No Redis infrastructure
  • ✅ Need data persistence
  • ✅ Moderate throughput requirements
  • ✅ Shared filesystem access (NFS, EFS)
  • ❌ High-throughput workloads (> 1000 jobs/sec)
  • ❌ Multi-server deployments without shared storage
  • ❌ Need for distributed locking

Performance Comparison

BackendThroughputLatencyPersistenceDistributedSetup
MemoryStorageHighestLowestNoneNoNone
RedisStorageHighLowDurableYesRedis/Valkey
FileStorageModerateMediumDurableSingle FSDirectory

Switching Between Backends

You can easily switch backends by changing the storage configuration:
// Development
const storage = new MemoryStorage()

// Production (Redis)
const storage = new RedisStorage({
  url: process.env.REDIS_URL
})

// Production (File-based)
const storage = new FileStorage({
  basePath: process.env.QUEUE_DATA_PATH
})

const queue = new Queue({ storage })
Use environment variables to switch backends without code changes:
const storage = process.env.REDIS_URL
  ? new RedisStorage({ url: process.env.REDIS_URL })
  : new FileStorage({ basePath: process.env.QUEUE_DATA_PATH || '/tmp/queue' })

Custom Storage Backends

You can implement custom storage backends by implementing the Storage interface:
import { Storage } from '@platformatic/job-queue'

class MyCustomStorage implements Storage {
  async connect() { /* ... */ }
  async disconnect() { /* ... */ }
  async enqueue(id, message, timestamp) { /* ... */ }
  async dequeue(workerId, timeout) { /* ... */ }
  // ... implement all required methods
}

const queue = new Queue({ storage: new MyCustomStorage() })
See src/storage/types.ts for the complete interface definition.

Build docs developers (and LLMs) love