Documentation Index Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Platformatic Job Queue provides comprehensive error handling with automatic retries, typed errors, and persistent error state. Jobs that fail are automatically retried with exponential backoff until they succeed or exceed the maximum retry limit.
Error Types
The library exports typed errors for specific failure conditions:
import {
TimeoutError , // enqueueAndWait timeout
MaxRetriesError , // Job failed after all retries
JobNotFoundError , // Job doesn't exist
JobCancelledError , // Job was cancelled
JobFailedError , // Job failed with error
StorageError // Storage backend error
} from '@platformatic/job-queue'
Error Definitions
From src/errors.ts:
TimeoutError
MaxRetriesError
JobNotFoundError
JobCancelledError
JobFailedError
export class TimeoutError extends JobQueueError {
jobId : string
constructor ( jobId : string , timeout : number ) {
super ( `Job ' ${ jobId } ' timed out after ${ timeout } ms` , 'TIMEOUT' )
this . name = 'TimeoutError'
this . jobId = jobId
}
}
Retry Configuration
Configure retry behavior at the queue level:
import { Queue , MemoryStorage } from '@platformatic/job-queue'
const queue = new Queue ({
storage: new MemoryStorage (),
maxRetries: 3 , // Default: 3 attempts total
resultTTL: 3600000 // Cache errors for 1 hour
})
Or override per job:
await queue . enqueue ( 'job-1' , payload , {
maxAttempts: 5 // This job gets 5 attempts
})
await queue . enqueueAndWait ( 'job-2' , payload , {
maxAttempts: 10 ,
timeout: 30000
})
Job States
Jobs transition through different states (from src/types.ts:21):
type MessageState = 'queued' | 'processing' | 'failing' | 'completed' | 'failed'
State Transitions
┌─────────┐
│ queued │
└────┬────┘
│
▼
┌────────────┐ Success ┌───────────┐
│ processing │───────────────>│ completed │
└────┬───────┘ └───────────┘
│
│ Failure (attempts < max)
▼
┌─────────┐
│ failing │──────┐
└─────────┘ │
▲ │
└───────────┘ Retry
│ Failure (attempts >= max)
▼
┌────────┐
│ failed │
└────────┘
State Difference: ‘failing’ vs ‘failed’
failing - Job failed but will be retried. Attempt count < maxAttempts.
failed - Job permanently failed after exhausting all retries. Terminal state.
// Listen to differentiate
queue . on ( 'failing' , ( id , error , attempt ) => {
console . log ( `Job ${ id } failed attempt ${ attempt } , will retry:` , error . message )
})
queue . on ( 'failed' , ( id , error ) => {
console . log ( `Job ${ id } failed permanently:` , error . message )
})
Handling Errors in Handlers
Catch and handle errors within your job handler:
queue . execute ( async ( job ) => {
try {
// Risky operation
const result = await externalAPI . call ( job . payload )
return result
} catch ( error ) {
// Log the error
console . error ( `Job ${ job . id } failed:` , error )
// Re-throw to trigger retry
throw error
}
})
Custom Error Handling
Provide context-specific error handling:
class RetryableError extends Error {
constructor ( message : string ) {
super ( message )
this . name = 'RetryableError'
}
}
class PermanentError extends Error {
constructor ( message : string ) {
super ( message )
this . name = 'PermanentError'
}
}
queue . execute ( async ( job ) => {
try {
const response = await fetch ( job . payload . url )
if ( response . status === 429 ) {
// Rate limited - should retry
throw new RetryableError ( 'Rate limited' )
}
if ( response . status === 404 ) {
// Not found - don't retry
throw new PermanentError ( 'Resource not found' )
}
if ( ! response . ok ) {
throw new RetryableError ( `HTTP ${ response . status } ` )
}
return await response . json ()
} catch ( error ) {
if ( error instanceof PermanentError ) {
// Log and fail immediately
console . error ( 'Permanent failure:' , error )
// Set attempts to max to skip retries
throw error
}
// Retryable error - will be retried automatically
throw error
}
})
Error Handling with enqueueAndWait
Handle different error scenarios when waiting for results:
import {
TimeoutError ,
JobFailedError ,
MaxRetriesError
} from '@platformatic/job-queue'
try {
const result = await queue . enqueueAndWait ( 'request-1' , payload , {
timeout: 10000
})
console . log ( 'Got result:' , result )
} catch ( error ) {
if ( error instanceof TimeoutError ) {
console . log ( 'Request timed out after 10s' )
// Job might still complete later
} else if ( error instanceof JobFailedError ) {
console . log ( 'Job failed:' , error . originalError )
// Job permanently failed
} else if ( error instanceof MaxRetriesError ) {
console . log ( `Job failed after ${ error . attempts } attempts` )
console . log ( 'Last error:' , error . lastError . message )
} else {
console . error ( 'Unexpected error:' , error )
}
}
Complete Request/Response Error Handling
import express from 'express'
import {
Queue ,
RedisStorage ,
TimeoutError ,
JobFailedError
} from '@platformatic/job-queue'
const app = express ()
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const queue = new Queue ({ storage })
await queue . start ()
app . post ( '/process' , async ( req , res ) => {
const jobId = `job- ${ Date . now () } `
try {
const result = await queue . enqueueAndWait ( jobId , req . body , {
timeout: 30000 ,
maxAttempts: 5
})
res . json ({ success: true , result })
} catch ( error ) {
if ( error instanceof TimeoutError ) {
// Return 408 Request Timeout
res . status ( 408 ). json ({
error: 'Request timed out' ,
jobId: error . jobId ,
message: 'Job is still processing'
})
} else if ( error instanceof JobFailedError ) {
// Return 500 Internal Server Error
res . status ( 500 ). json ({
error: 'Job failed' ,
jobId: error . jobId ,
details: error . originalError
})
} else {
// Unknown error
res . status ( 500 ). json ({
error: 'Internal server error' ,
message: error . message
})
}
}
})
app . listen ( 3000 )
Exponential Backoff
The retry mechanism uses exponential backoff. From src/consumer.ts:291-300:
if ( currentAttempts < maxAttempts ) {
// Retry - update message with incremented attempts
const updatedMessage : QueueMessage < TPayload > = {
... queueMessage ,
attempts: currentAttempts
}
const serializedMessage = this . #payloadSerde . serialize ( updatedMessage as unknown as TPayload )
await this . #storage . retryJob ( id , serializedMessage , this . #workerId , currentAttempts )
this . emit ( 'failing' , id , error , currentAttempts )
}
Retries happen immediately, but you can implement custom backoff in your handler:
queue . execute ( async ( job ) => {
// Exponential backoff based on attempt number
if ( job . attempts > 1 ) {
const delayMs = Math . min ( 1000 * Math . pow ( 2 , job . attempts - 1 ), 30000 )
await new Promise ( resolve => setTimeout ( resolve , delayMs ))
}
// Process job
return await processJob ( job . payload )
})
Monitoring Errors
Track errors across your queue:
import pino from 'pino'
const logger = pino ()
const queue = new Queue ({
storage ,
logger
})
// Track failing jobs (will retry)
queue . on ( 'failing' , ( id , error , attempt ) => {
logger . warn ({
jobId: id ,
attempt ,
maxAttempts: queue . maxRetries ,
error: error . message ,
stack: error . stack
}, 'Job attempt failed' )
})
// Track permanently failed jobs
queue . on ( 'failed' , ( id , error ) => {
logger . error ({
jobId: id ,
error: error . message ,
attempts: error instanceof MaxRetriesError ? error . attempts : 'unknown'
}, 'Job failed permanently' )
// Alert or send to dead letter queue
alertOps ({ jobId: id , error })
})
// General error events
queue . on ( 'error' , ( error ) => {
logger . error ({ error }, 'Queue error' )
})
Get error details from failed jobs:
const status = await queue . getStatus ( 'job-123' )
if ( status ?. state === 'failed' ) {
console . log ( 'Job failed with:' )
console . log ( 'Message:' , status . error ?. message )
console . log ( 'Code:' , status . error ?. code )
console . log ( 'Stack:' , status . error ?. stack )
console . log ( 'Attempts:' , status . attempts )
}
Error Serialization
Errors are stored as JSON (from src/types.ts:23-30):
interface SerializedError {
message : string
code ?: string
stack ?: string
}
interface MessageStatus < TResult = unknown > {
id : string
state : MessageState
createdAt : number
attempts : number
result ?: TResult
error ?: SerializedError
}
Dead Letter Queue Pattern
Implement a dead letter queue for permanently failed jobs:
import { Queue , RedisStorage } from '@platformatic/job-queue'
const storage = new RedisStorage ({ url: process . env . REDIS_URL })
const mainQueue = new Queue ({
storage ,
maxRetries: 3 ,
resultTTL: 24 * 60 * 60 * 1000 // Keep errors for 24h
})
const deadLetterQueue = new Queue ({
storage ,
maxRetries: 0 , // Don't retry in DLQ
resultTTL: 7 * 24 * 60 * 60 * 1000 // Keep for 7 days
})
mainQueue . execute ( async ( job ) => {
// Process job
return await process ( job . payload )
})
// Move permanently failed jobs to DLQ
mainQueue . on ( 'failed' , async ( id , error ) => {
console . log ( `Moving job ${ id } to dead letter queue` )
// Get original job details
const status = await mainQueue . getStatus ( id )
if ( status ) {
await deadLetterQueue . enqueue ( `dlq- ${ id } ` , {
originalJobId: id ,
failureReason: error . message ,
attempts: status . attempts ,
failedAt: Date . now (),
originalPayload: status
})
}
})
// Later: manually process DLQ jobs
deadLetterQueue . execute ( async ( job ) => {
console . log ( 'Manual intervention required for:' , job . payload . originalJobId )
// Alert admins, log to monitoring, etc.
})
await mainQueue . start ()
await deadLetterQueue . start ()
Best Practices
Set appropriate maxRetries based on failure type. Transient failures (network issues) benefit from retries. Permanent failures (validation errors) don’t.
Use typed errors to distinguish retry-able from permanent failures
Log both ‘failing’ and ‘failed’ events for debugging
Set resultTTL high enough to debug failed jobs
Implement a dead letter queue for manual intervention
Monitor error rates and patterns
Use job.signal.aborted to detect cancellation
Include error context in your custom error messages
Test error scenarios in development
Common Patterns
Conditional Retries
class NoRetryError extends Error {
constructor ( message : string ) {
super ( message )
this . name = 'NoRetryError'
}
}
queue . execute ( async ( job ) => {
try {
return await processJob ( job . payload )
} catch ( error ) {
if ( isValidationError ( error )) {
// Don't retry validation errors
throw new NoRetryError ( error . message )
}
// Retry other errors
throw error
}
})
Circuit Breaker
class CircuitBreaker {
private failures = 0
private lastFailure = 0
private readonly threshold = 5
private readonly timeout = 60000 // 1 minute
async execute < T >( fn : () => Promise < T >) : Promise < T > {
// Check if circuit is open
if ( this . failures >= this . threshold ) {
if ( Date . now () - this . lastFailure < this . timeout ) {
throw new Error ( 'Circuit breaker open' )
}
// Try to close circuit
this . failures = 0
}
try {
const result = await fn ()
this . failures = 0 // Reset on success
return result
} catch ( error ) {
this . failures ++
this . lastFailure = Date . now ()
throw error
}
}
}
const breaker = new CircuitBreaker ()
queue . execute ( async ( job ) => {
return await breaker . execute ( async () => {
return await externalAPI . call ( job . payload )
})
})
See Also