Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/platformatic/job-queue/llms.txt

Use this file to discover all available pages before exploring further.

The Queue class is the primary interface for the Platformatic Job Queue. It combines producer and consumer functionality, allowing you to both enqueue jobs and process them.

Class Signature

class Queue<TPayload, TResult = void> extends EventEmitter<QueueEvents<TResult>>

Type Parameters

  • TPayload - The type of the job payload
  • TResult - The type of the job result (defaults to void)

Constructor

new Queue<TPayload, TResult>(config: QueueConfig<TPayload, TResult>)
Creates a new Queue instance with the specified configuration.

Configuration

storage
Storage
required
The storage backend instance (e.g., RedisStorage)
workerId
string
Unique identifier for this worker. Defaults to a random UUID.
payloadSerde
Serde<TPayload>
Custom serializer/deserializer for job payloads. Defaults to JSON serialization.
resultSerde
Serde<TResult>
Custom serializer/deserializer for job results. Defaults to JSON serialization.
concurrency
number
default:"1"
Number of jobs to process in parallel. Controls how many jobs this worker will process simultaneously.
blockTimeout
number
default:"5"
Blocking dequeue timeout in seconds. How long to wait for new jobs when polling the queue.
maxRetries
number
default:"3"
Default maximum number of retry attempts for failed jobs. Can be overridden per job in enqueue() options.
visibilityTimeout
number
default:"30000"
Maximum processing time in milliseconds before a job is considered stalled. After this timeout, the Reaper will requeue the job.
resultTTL
number
default:"3600000"
Time-to-live for stored results and errors in milliseconds. Defaults to 1 hour (3600000ms).
afterExecution
AfterExecutionHook<TPayload, TResult>
Hook function called after job execution and before persisting terminal state. Useful for logging, metrics, or custom cleanup.
logger
Logger
Pino logger instance. Defaults to an abstract no-op logger.

Example

import { Queue } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue/storage/redis'

const queue = new Queue({
  storage: new RedisStorage({ url: 'redis://localhost:6379' }),
  workerId: 'worker-1',
  concurrency: 5,
  maxRetries: 3,
  visibilityTimeout: 60000,
  resultTTL: 3600000,
  logger: pino()
})

Methods

start()

Starts the queue by connecting to storage and starting the consumer if a handler is registered.
start(): Promise<void>
void
Promise<void>
Resolves when the queue has started successfully.

Example

await queue.start()
console.log('Queue started')

stop()

Stops the queue gracefully, disconnecting from storage and stopping the consumer.
stop(): Promise<void>
void
Promise<void>
Resolves when the queue has stopped completely.

Example

await queue.stop()
console.log('Queue stopped')

execute()

Registers a job handler function, turning this queue into a consumer.
execute(handler: JobHandler<TPayload, TResult>): void
handler
JobHandler<TPayload, TResult>
required
Function that processes jobs. Can be async/promise-based or callback-based.Promise-based signature:
(job: Job<TPayload>) => Promise<TResult>
Callback-based signature:
(job: Job<TPayload>, callback: (err: Error | null, result?: TResult) => void) => void

Job Object

The handler receives a Job object with the following properties:
  • id (string) - Unique job identifier
  • payload (TPayload) - The job’s payload data
  • attempts (number) - Current attempt number (1-indexed)
  • signal (AbortSignal) - Abort signal for cancellation

Example

interface EmailPayload {
  to: string
  subject: string
  body: string
}

queue.execute(async (job) => {
  console.log(`Processing job ${job.id}, attempt ${job.attempts}`)
  
  await sendEmail({
    to: job.payload.to,
    subject: job.payload.subject,
    body: job.payload.body
  })
  
  return { sent: true }
})

Callback-based Handler Example

queue.execute((job, callback) => {
  processLegacyTask(job.payload, (err, result) => {
    if (err) return callback(err)
    callback(null, result)
  })
})

enqueue()

Enqueues a job for processing (fire-and-forget).
enqueue(
  id: string,
  payload: TPayload,
  options?: EnqueueOptions
): Promise<EnqueueResult<TResult>>
id
string
required
Unique identifier for the job. Must be unique across all jobs.
payload
TPayload
required
The job data to process.
options.maxAttempts
number
Maximum retry attempts for this specific job. Overrides the queue’s maxRetries setting.
options.resultTTL
number
TTL for this job’s result in milliseconds. Overrides the queue’s resultTTL setting.
status
'queued' | 'duplicate' | 'completed'
  • queued: Job successfully enqueued
  • duplicate: Job with this ID already exists (includes existingState)
  • completed: Job was already completed (includes result)

Example

const result = await queue.enqueue('email-123', {
  to: 'user@example.com',
  subject: 'Welcome',
  body: 'Welcome to our service!'
})

if (result.status === 'queued') {
  console.log('Job enqueued successfully')
} else if (result.status === 'duplicate') {
  console.log('Job already exists with state:', result.existingState)
} else if (result.status === 'completed') {
  console.log('Job already completed with result:', result.result)
}

enqueueAndWait()

Enqueues a job and waits for its result.
enqueueAndWait(
  id: string,
  payload: TPayload,
  options?: EnqueueAndWaitOptions
): Promise<TResult>
id
string
required
Unique identifier for the job.
payload
TPayload
required
The job data to process.
options.maxAttempts
number
Maximum retry attempts for this job.
options.resultTTL
number
TTL for this job’s result in milliseconds.
options.timeout
number
Maximum time to wait for the result in milliseconds. If exceeded, rejects with a timeout error.
result
TResult
The job’s result value. Rejects if the job fails or times out.

Example

try {
  const result = await queue.enqueueAndWait(
    'report-456',
    { reportType: 'monthly', userId: '123' },
    { timeout: 30000 } // 30 second timeout
  )
  
  console.log('Report generated:', result)
} catch (error) {
  console.error('Job failed or timed out:', error)
}

cancel()

Cancels a pending job.
cancel(id: string): Promise<CancelResult>
id
string
required
The job ID to cancel.
status
'cancelled' | 'not_found' | 'processing' | 'completed'
  • cancelled: Job successfully cancelled
  • not_found: Job doesn’t exist
  • processing: Job is currently being processed and cannot be cancelled
  • completed: Job already completed

Example

const result = await queue.cancel('job-789')

if (result.status === 'cancelled') {
  console.log('Job cancelled successfully')
} else if (result.status === 'processing') {
  console.log('Job is already being processed')
}

getResult()

Retrieves the result of a completed job.
getResult(id: string): Promise<TResult | null>
id
string
required
The job ID to retrieve the result for.
result
TResult | null
The job’s result if available, or null if the job hasn’t completed or the result has expired.

Example

const result = await queue.getResult('job-123')

if (result !== null) {
  console.log('Job result:', result)
} else {
  console.log('No result available')
}

updateResultTTL()

Updates the TTL for a completed job’s result or error.
updateResultTTL(id: string, ttlMs: number): Promise<UpdateResultTTLResult>
id
string
required
The job ID to update.
ttlMs
number
required
New TTL in milliseconds.
status
'updated' | 'not_found' | 'not_terminal' | 'missing_payload'
  • updated: TTL successfully updated
  • not_found: Job doesn’t exist
  • not_terminal: Job hasn’t completed yet (not in a terminal state)
  • missing_payload: Job is in terminal state but payload is missing

Example

const result = await queue.updateResultTTL('job-123', 7200000) // 2 hours

if (result.status === 'updated') {
  console.log('TTL extended successfully')
}

getStatus()

Retrieves the current status of a job.
getStatus(id: string): Promise<MessageStatus<TResult> | null>
id
string
required
The job ID to check.
status
MessageStatus<TResult> | null
Status object containing:
  • id (string) - Job ID
  • state (MessageState) - Current state: ‘queued’, ‘processing’, ‘failing’, ‘completed’, or ‘failed’
  • createdAt (number) - Timestamp when job was created
  • attempts (number) - Number of attempts made
  • result (TResult, optional) - Result if completed
  • error (SerializedError, optional) - Error if failed
Returns null if the job doesn’t exist.

Example

const status = await queue.getStatus('job-123')

if (status) {
  console.log(`Job ${status.id} is ${status.state}`)
  console.log(`Attempts: ${status.attempts}`)
  
  if (status.state === 'completed' && status.result) {
    console.log('Result:', status.result)
  } else if (status.state === 'failed' && status.error) {
    console.log('Error:', status.error.message)
  }
}

Events

The Queue class emits various events during its lifecycle. Listen to events using the standard EventEmitter API.

started

Emitted when the queue has successfully started.
queue.on('started', () => {
  console.log('Queue started')
})

stopped

Emitted when the queue has successfully stopped.
queue.on('stopped', () => {
  console.log('Queue stopped')
})

enqueued

Emitted when a job is successfully enqueued.
queue.on('enqueued', (id: string) => {
  console.log(`Job ${id} enqueued`)
})

completed

Emitted when a job completes successfully.
queue.on('completed', (id: string, result: TResult) => {
  console.log(`Job ${id} completed with result:`, result)
})

failed

Emitted when a job fails permanently (all retries exhausted).
queue.on('failed', (id: string, error: Error) => {
  console.error(`Job ${id} failed:`, error.message)
})

failing

Emitted when a job fails but will be retried.
queue.on('failing', (id: string, error: Error, attempt: number) => {
  console.warn(`Job ${id} failed attempt ${attempt}:`, error.message)
})

requeued

Emitted when a job is requeued for retry.
queue.on('requeued', (id: string) => {
  console.log(`Job ${id} requeued for retry`)
})

cancelled

Emitted when a job is successfully cancelled.
queue.on('cancelled', (id: string) => {
  console.log(`Job ${id} cancelled`)
})

error

Emitted when an error occurs in the queue’s internal operations.
queue.on('error', (error: Error) => {
  console.error('Queue error:', error)
})

Complete Example

import { Queue } from '@platformatic/job-queue'
import { RedisStorage } from '@platformatic/job-queue/storage/redis'
import pino from 'pino'

interface ProcessVideoPayload {
  videoId: string
  operations: string[]
}

interface ProcessVideoResult {
  outputUrl: string
  duration: number
}

const queue = new Queue<ProcessVideoPayload, ProcessVideoResult>({
  storage: new RedisStorage({ url: 'redis://localhost:6379' }),
  workerId: 'video-processor-1',
  concurrency: 3,
  maxRetries: 5,
  visibilityTimeout: 300000, // 5 minutes
  resultTTL: 86400000, // 24 hours
  logger: pino(),
  afterExecution: async (context) => {
    // Custom metrics or logging
    console.log(`Job ${context.id} took ${context.durationMs}ms`)
  }
})

// Register event listeners
queue.on('completed', (id, result) => {
  console.log(`Video ${id} processed: ${result.outputUrl}`)
})

queue.on('failed', (id, error) => {
  console.error(`Video processing failed for ${id}:`, error.message)
})

queue.on('error', (error) => {
  console.error('Queue error:', error)
})

// Register handler
queue.execute(async (job) => {
  console.log(`Processing video ${job.payload.videoId}, attempt ${job.attempts}`)
  
  // Check for cancellation
  if (job.signal.aborted) {
    throw new Error('Job was cancelled')
  }
  
  // Process video
  const result = await processVideo(
    job.payload.videoId,
    job.payload.operations
  )
  
  return result
})

// Start the queue
await queue.start()

// Enqueue a job
await queue.enqueue('video-123', {
  videoId: 'abc-def',
  operations: ['transcode', 'thumbnail', 'compress']
})

// Or enqueue and wait for result
const result = await queue.enqueueAndWait('video-456', {
  videoId: 'ghi-jkl',
  operations: ['transcode']
}, { timeout: 60000 })

console.log('Processing complete:', result.outputUrl)

// Clean shutdown
process.on('SIGTERM', async () => {
  await queue.stop()
  process.exit(0)
})

Build docs developers (and LLMs) love