Documentation Index
Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt
Use this file to discover all available pages before exploring further.
The Queue class is the main entry point for @platformatic/job-queue. It combines producer and consumer functionality into a single interface, making it easy to both enqueue jobs and process them.
Architecture Overview
Internally, the Queue is composed of two main components:
- Producer: Handles job enqueueing and result retrieval
- Consumer: Handles job processing and execution
When you call queue.execute(), the Queue becomes both a producer and consumer. Without calling execute(), it operates in producer-only mode.
import { Queue, RedisStorage } from '@platformatic/job-queue'
const queue = new Queue<{ email: string }, { sent: boolean }>({
storage: new RedisStorage({ url: 'redis://localhost:6379' }),
concurrency: 5,
maxRetries: 3
})
// Register handler (makes this a consumer)
queue.execute(async (job) => {
console.log(`Processing ${job.id}:`, job.payload)
return { sent: true }
})
await queue.start()
Configuration Options
The Queue accepts a QueueConfig<TPayload, TResult> object with the following options:
Storage backend instance (MemoryStorage, RedisStorage, or FileStorage)
workerId
string
default:"randomUUID()"
Unique identifier for this worker instance. Used to track which worker is processing which job.
Number of jobs to process in parallel. Each concurrent job runs in its own worker loop.
Seconds to wait when polling for jobs. Uses blocking operations where supported (e.g., BLMOVE in Redis).
Default maximum retry attempts for failed jobs. Can be overridden per-job via EnqueueOptions.maxAttempts.
Milliseconds before a processing job is considered stalled. The Reaper uses this to detect and recover crashed jobs.
Milliseconds to cache job results and errors (1 hour default). Can be overridden per-job or in afterExecution hook.
payloadSerde
Serde<TPayload>
default:"JsonSerde"
Custom serializer for job payloads. Defaults to JSON serialization.
resultSerde
Serde<TResult>
default:"JsonSerde"
Custom serializer for job results. Defaults to JSON serialization.
afterExecution
AfterExecutionHook<TPayload, TResult>
Hook called after job execution and before persisting terminal state. Can modify result TTL or transform results.
logger
pino.Logger
default:"abstractLogger"
Pino-compatible logger instance. Defaults to a no-op logger.
Lifecycle Methods
start()
Connects to storage and starts consuming jobs if a handler is registered.
Implementation details (from src/queue.ts:72):
- Connects to storage backend
- Sets internal
#started flag
- If a handler was registered via
execute(), starts the Consumer
- Emits
'started' event
stop()
Gracefully stops processing jobs and disconnects from storage.
Implementation details (from src/queue.ts:94):
- Stops the Consumer (waits for active jobs with
visibilityTimeout limit)
- Disconnects from storage
- Sets
#started to false
- Emits
'stopped' event
The stop() method waits up to visibilityTimeout milliseconds for active jobs to complete. Jobs that don’t finish in time will be aborted and requeued by the Reaper.
execute(handler)
Registers a job handler function. Can be called before or after start().
queue.execute(async (job) => {
// job.id: string
// job.payload: TPayload
// job.attempts: number (starts at 1)
// job.signal: AbortSignal
return result // TResult
})
Job object structure (from src/types.ts:88):
interface Job<TPayload> {
id: string
payload: TPayload
attempts: number // Current attempt number (1-indexed)
signal: AbortSignal // Aborted on cancellation or visibility timeout
}
Producer Methods
These methods are available on all Queue instances, even without calling execute().
enqueue()
Enqueue a job for processing (fire-and-forget).
const result = await queue.enqueue('job-123', { email: 'user@example.com' }, {
maxAttempts: 5,
resultTTL: 300000 // 5 minutes
})
if (result.status === 'queued') {
console.log('Job added to queue')
} else if (result.status === 'completed') {
console.log('Job already completed:', result.result)
} else if (result.status === 'duplicate') {
console.log('Job already exists with state:', result.existingState)
}
Return type (from src/types.ts:71):
type EnqueueResult<TResult> =
| { status: 'queued' }
| { status: 'duplicate'; existingState: MessageState }
| { status: 'completed'; result: TResult }
enqueueAndWait()
Enqueue a job and wait for the result (request/response pattern).
try {
const result = await queue.enqueueAndWait('req-123', { url: 'https://api.example.com' }, {
timeout: 30000, // 30 seconds
maxAttempts: 3,
resultTTL: 60000 // 1 minute
})
console.log('Success:', result)
} catch (error) {
if (error instanceof TimeoutError) {
console.log('Request timed out')
} else if (error instanceof JobFailedError) {
console.log('Job failed:', error.originalError)
}
}
See Request/Response Pattern for detailed usage.
cancel()
Cancel a queued job. Cannot cancel jobs that are currently processing.
const result = await queue.cancel('job-123')
// result.status: 'cancelled' | 'not_found' | 'processing' | 'completed'
getStatus()
Get the current status of a job.
const status = await queue.getStatus('job-123')
// {
// id: 'job-123',
// state: 'completed',
// createdAt: 1234567890,
// attempts: 2,
// result?: { ... },
// error?: { message: '...', code?: '...', stack?: '...' }
// }
getResult()
Retrieve the cached result of a completed job.
const result = await queue.getResult('job-123')
if (result) {
console.log('Cached result:', result)
}
updateResultTTL()
Update the TTL for a completed or failed job’s cached payload.
const update = await queue.updateResultTTL('job-123', 7200000) // 2 hours
// update.status: 'updated' | 'not_found' | 'not_terminal' | 'missing_payload'
See Deduplication for more details.
Events
The Queue extends EventEmitter and emits the following events:
Lifecycle Events
queue.on('started', () => {
console.log('Queue started')
})
queue.on('stopped', () => {
console.log('Queue stopped')
})
queue.on('error', (error: Error) => {
console.error('Queue error:', error)
})
Job Events
queue.on('enqueued', (id: string) => {
console.log(`Job ${id} was enqueued`)
})
queue.on('completed', (id: string, result: TResult) => {
console.log(`Job ${id} completed:`, result)
})
queue.on('failed', (id: string, error: Error) => {
console.log(`Job ${id} failed permanently:`, error)
})
queue.on('failing', (id: string, error: Error, attempt: number) => {
console.log(`Job ${id} failed attempt ${attempt}, will retry:`, error)
})
queue.on('requeued', (id: string) => {
console.log(`Job ${id} was requeued (e.g., during graceful shutdown)`)
})
queue.on('cancelled', (id: string) => {
console.log(`Job ${id} was cancelled`)
})
queue.on('stalled', (id: string) => {
console.log(`Job ${id} was stalled and recovered`)
})
Event definitions (from src/types.ts:172):
interface QueueEvents<TResult> {
started: []
stopped: []
error: [error: Error]
enqueued: [id: string]
completed: [id: string, result: TResult]
failed: [id: string, error: Error]
failing: [id: string, error: Error, attempt: number]
requeued: [id: string]
cancelled: [id: string]
stalled: [id: string]
}
AfterExecution Hook
The afterExecution hook is called after job execution completes (success or failure) but before the terminal state is persisted. This allows you to:
- Dynamically adjust result/error TTL based on job outcome
- Transform or enrich results before storage
- Perform side effects (logging, metrics, notifications)
const queue = new Queue<{ url: string }, { body: string; cacheFor?: number }>({
storage,
resultTTL: 60000, // Default 1 minute
afterExecution: async (context) => {
// Context is mutable (passed by reference)
console.log(`Job ${context.id} finished in ${context.durationMs}ms`)
if (context.status === 'completed' && context.result?.cacheFor) {
// Override TTL based on result
context.ttl = context.result.cacheFor
}
if (context.status === 'failed' && context.error?.message.includes('temporary')) {
// Cache temporary errors for shorter time
context.ttl = 5000
}
}
})
AfterExecutionContext structure (from src/types.ts:98):
interface AfterExecutionContext<TPayload, TResult> {
id: string
payload: TPayload
attempts: number
maxAttempts: number
createdAt: number
status: 'completed' | 'failed'
result?: TResult
error?: Error
ttl: number // Mutable: modify to change stored TTL
workerId: string
startedAt: number
finishedAt: number
durationMs: number
}
If the hook throws an error, the original TTL is restored and the error is logged. The job’s terminal state is still persisted.
Producer/Consumer Separation
You can run producers and consumers as separate processes:
// producer.ts (API server)
import { Queue, RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage({ url: process.env.REDIS_URL })
const producer = new Queue({ storage })
await producer.start()
await producer.enqueue('task-1', { email: 'user@example.com' })
await producer.stop()
// worker.ts (background worker)
import { Queue, RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage({ url: process.env.REDIS_URL })
const worker = new Queue({
storage,
workerId: `worker-${process.pid}`,
concurrency: 10
})
worker.execute(async (job) => {
// Process job
return result
})
await worker.start()
process.on('SIGTERM', async () => {
await worker.stop()
process.exit(0)
})