Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Platformatic Job Queue provides comprehensive error handling with automatic retries, typed errors, and persistent error state. Jobs that fail are automatically retried with exponential backoff until they succeed or exceed the maximum retry limit.

Error Types

The library exports typed errors for specific failure conditions:
import {
  TimeoutError,      // enqueueAndWait timeout
  MaxRetriesError,   // Job failed after all retries
  JobNotFoundError,  // Job doesn't exist
  JobCancelledError, // Job was cancelled
  JobFailedError,    // Job failed with error
  StorageError       // Storage backend error
} from '@platformatic/job-queue'

Error Definitions

From src/errors.ts:
export class TimeoutError extends JobQueueError {
  jobId: string

  constructor (jobId: string, timeout: number) {
    super(`Job '${jobId}' timed out after ${timeout}ms`, 'TIMEOUT')
    this.name = 'TimeoutError'
    this.jobId = jobId
  }
}

Retry Configuration

Configure retry behavior at the queue level:
import { Queue, MemoryStorage } from '@platformatic/job-queue'

const queue = new Queue({
  storage: new MemoryStorage(),
  maxRetries: 3,  // Default: 3 attempts total
  resultTTL: 3600000  // Cache errors for 1 hour
})
Or override per job:
await queue.enqueue('job-1', payload, {
  maxAttempts: 5  // This job gets 5 attempts
})

await queue.enqueueAndWait('job-2', payload, {
  maxAttempts: 10,
  timeout: 30000
})

Job States

Jobs transition through different states (from src/types.ts:21):
type MessageState = 'queued' | 'processing' | 'failing' | 'completed' | 'failed'

State Transitions

┌─────────┐
│  queued │
└────┬────┘


┌────────────┐     Success    ┌───────────┐
│ processing │───────────────>│ completed │
└────┬───────┘                 └───────────┘

     │ Failure (attempts < max)

┌─────────┐
│ failing │──────┐
└─────────┘      │
     ▲           │
     └───────────┘ Retry
     
     │ Failure (attempts >= max)

┌────────┐
│ failed │
└────────┘

State Difference: ‘failing’ vs ‘failed’

  • failing - Job failed but will be retried. Attempt count < maxAttempts.
  • failed - Job permanently failed after exhausting all retries. Terminal state.
// Listen to differentiate
queue.on('failing', (id, error, attempt) => {
  console.log(`Job ${id} failed attempt ${attempt}, will retry:`, error.message)
})

queue.on('failed', (id, error) => {
  console.log(`Job ${id} failed permanently:`, error.message)
})

Handling Errors in Handlers

Catch and handle errors within your job handler:
queue.execute(async (job) => {
  try {
    // Risky operation
    const result = await externalAPI.call(job.payload)
    return result
  } catch (error) {
    // Log the error
    console.error(`Job ${job.id} failed:`, error)
    
    // Re-throw to trigger retry
    throw error
  }
})

Custom Error Handling

Provide context-specific error handling:
class RetryableError extends Error {
  constructor(message: string) {
    super(message)
    this.name = 'RetryableError'
  }
}

class PermanentError extends Error {
  constructor(message: string) {
    super(message)
    this.name = 'PermanentError'
  }
}

queue.execute(async (job) => {
  try {
    const response = await fetch(job.payload.url)
    
    if (response.status === 429) {
      // Rate limited - should retry
      throw new RetryableError('Rate limited')
    }
    
    if (response.status === 404) {
      // Not found - don't retry
      throw new PermanentError('Resource not found')
    }
    
    if (!response.ok) {
      throw new RetryableError(`HTTP ${response.status}`)
    }
    
    return await response.json()
  } catch (error) {
    if (error instanceof PermanentError) {
      // Log and fail immediately
      console.error('Permanent failure:', error)
      // Set attempts to max to skip retries
      throw error
    }
    
    // Retryable error - will be retried automatically
    throw error
  }
})

Error Handling with enqueueAndWait

Handle different error scenarios when waiting for results:
import { 
  TimeoutError, 
  JobFailedError,
  MaxRetriesError 
} from '@platformatic/job-queue'

try {
  const result = await queue.enqueueAndWait('request-1', payload, {
    timeout: 10000
  })
  console.log('Got result:', result)
} catch (error) {
  if (error instanceof TimeoutError) {
    console.log('Request timed out after 10s')
    // Job might still complete later
  } else if (error instanceof JobFailedError) {
    console.log('Job failed:', error.originalError)
    // Job permanently failed
  } else if (error instanceof MaxRetriesError) {
    console.log(`Job failed after ${error.attempts} attempts`)
    console.log('Last error:', error.lastError.message)
  } else {
    console.error('Unexpected error:', error)
  }
}

Complete Request/Response Error Handling

import express from 'express'
import { 
  Queue, 
  RedisStorage,
  TimeoutError,
  JobFailedError 
} from '@platformatic/job-queue'

const app = express()
const storage = new RedisStorage({ url: process.env.REDIS_URL })
const queue = new Queue({ storage })

await queue.start()

app.post('/process', async (req, res) => {
  const jobId = `job-${Date.now()}`
  
  try {
    const result = await queue.enqueueAndWait(jobId, req.body, {
      timeout: 30000,
      maxAttempts: 5
    })
    
    res.json({ success: true, result })
  } catch (error) {
    if (error instanceof TimeoutError) {
      // Return 408 Request Timeout
      res.status(408).json({
        error: 'Request timed out',
        jobId: error.jobId,
        message: 'Job is still processing'
      })
    } else if (error instanceof JobFailedError) {
      // Return 500 Internal Server Error
      res.status(500).json({
        error: 'Job failed',
        jobId: error.jobId,
        details: error.originalError
      })
    } else {
      // Unknown error
      res.status(500).json({
        error: 'Internal server error',
        message: error.message
      })
    }
  }
})

app.listen(3000)

Exponential Backoff

The retry mechanism uses exponential backoff. From src/consumer.ts:291-300:
if (currentAttempts < maxAttempts) {
  // Retry - update message with incremented attempts
  const updatedMessage: QueueMessage<TPayload> = {
    ...queueMessage,
    attempts: currentAttempts
  }
  const serializedMessage = this.#payloadSerde.serialize(updatedMessage as unknown as TPayload)

  await this.#storage.retryJob(id, serializedMessage, this.#workerId, currentAttempts)
  this.emit('failing', id, error, currentAttempts)
}
Retries happen immediately, but you can implement custom backoff in your handler:
queue.execute(async (job) => {
  // Exponential backoff based on attempt number
  if (job.attempts > 1) {
    const delayMs = Math.min(1000 * Math.pow(2, job.attempts - 1), 30000)
    await new Promise(resolve => setTimeout(resolve, delayMs))
  }
  
  // Process job
  return await processJob(job.payload)
})

Monitoring Errors

Track errors across your queue:
import pino from 'pino'

const logger = pino()

const queue = new Queue({
  storage,
  logger
})

// Track failing jobs (will retry)
queue.on('failing', (id, error, attempt) => {
  logger.warn({
    jobId: id,
    attempt,
    maxAttempts: queue.maxRetries,
    error: error.message,
    stack: error.stack
  }, 'Job attempt failed')
})

// Track permanently failed jobs
queue.on('failed', (id, error) => {
  logger.error({
    jobId: id,
    error: error.message,
    attempts: error instanceof MaxRetriesError ? error.attempts : 'unknown'
  }, 'Job failed permanently')
  
  // Alert or send to dead letter queue
  alertOps({ jobId: id, error })
})

// General error events
queue.on('error', (error) => {
  logger.error({ error }, 'Queue error')
})

Retrieving Error Information

Get error details from failed jobs:
const status = await queue.getStatus('job-123')

if (status?.state === 'failed') {
  console.log('Job failed with:')
  console.log('Message:', status.error?.message)
  console.log('Code:', status.error?.code)
  console.log('Stack:', status.error?.stack)
  console.log('Attempts:', status.attempts)
}

Error Serialization

Errors are stored as JSON (from src/types.ts:23-30):
interface SerializedError {
  message: string
  code?: string
  stack?: string
}

interface MessageStatus<TResult = unknown> {
  id: string
  state: MessageState
  createdAt: number
  attempts: number
  result?: TResult
  error?: SerializedError
}

Dead Letter Queue Pattern

Implement a dead letter queue for permanently failed jobs:
import { Queue, RedisStorage } from '@platformatic/job-queue'

const storage = new RedisStorage({ url: process.env.REDIS_URL })

const mainQueue = new Queue({
  storage,
  maxRetries: 3,
  resultTTL: 24 * 60 * 60 * 1000 // Keep errors for 24h
})

const deadLetterQueue = new Queue({
  storage,
  maxRetries: 0, // Don't retry in DLQ
  resultTTL: 7 * 24 * 60 * 60 * 1000 // Keep for 7 days
})

mainQueue.execute(async (job) => {
  // Process job
  return await process(job.payload)
})

// Move permanently failed jobs to DLQ
mainQueue.on('failed', async (id, error) => {
  console.log(`Moving job ${id} to dead letter queue`)
  
  // Get original job details
  const status = await mainQueue.getStatus(id)
  
  if (status) {
    await deadLetterQueue.enqueue(`dlq-${id}`, {
      originalJobId: id,
      failureReason: error.message,
      attempts: status.attempts,
      failedAt: Date.now(),
      originalPayload: status
    })
  }
})

// Later: manually process DLQ jobs
deadLetterQueue.execute(async (job) => {
  console.log('Manual intervention required for:', job.payload.originalJobId)
  // Alert admins, log to monitoring, etc.
})

await mainQueue.start()
await deadLetterQueue.start()

Best Practices

Set appropriate maxRetries based on failure type. Transient failures (network issues) benefit from retries. Permanent failures (validation errors) don’t.
  • Use typed errors to distinguish retry-able from permanent failures
  • Log both ‘failing’ and ‘failed’ events for debugging
  • Set resultTTL high enough to debug failed jobs
  • Implement a dead letter queue for manual intervention
  • Monitor error rates and patterns
  • Use job.signal.aborted to detect cancellation
  • Include error context in your custom error messages
  • Test error scenarios in development

Common Patterns

Conditional Retries

class NoRetryError extends Error {
  constructor(message: string) {
    super(message)
    this.name = 'NoRetryError'
  }
}

queue.execute(async (job) => {
  try {
    return await processJob(job.payload)
  } catch (error) {
    if (isValidationError(error)) {
      // Don't retry validation errors
      throw new NoRetryError(error.message)
    }
    // Retry other errors
    throw error
  }
})

Circuit Breaker

class CircuitBreaker {
  private failures = 0
  private lastFailure = 0
  private readonly threshold = 5
  private readonly timeout = 60000 // 1 minute
  
  async execute<T>(fn: () => Promise<T>): Promise<T> {
    // Check if circuit is open
    if (this.failures >= this.threshold) {
      if (Date.now() - this.lastFailure < this.timeout) {
        throw new Error('Circuit breaker open')
      }
      // Try to close circuit
      this.failures = 0
    }
    
    try {
      const result = await fn()
      this.failures = 0 // Reset on success
      return result
    } catch (error) {
      this.failures++
      this.lastFailure = Date.now()
      throw error
    }
  }
}

const breaker = new CircuitBreaker()

queue.execute(async (job) => {
  return await breaker.execute(async () => {
    return await externalAPI.call(job.payload)
  })
})

See Also

Build docs developers (and LLMs) love