Skip to main content

Queue

A Redis-backed job queue with automatic retry logic, exponential backoff, and concurrent processing.

Import

import { Queue } from "semola/queue";

Class: Queue

Constructor

new Queue<T>(options: QueueOptions<T>)
options
QueueOptions<T>
required
Queue configuration options
options.name
string
required
Queue name used for Redis keys
options.redis
Bun.RedisClient
required
Bun Redis client instance
options.handler
(data: T, signal?: AbortSignal) => void | Promise<void>
required
Function to process each job. Receives job data and optional abort signal for timeout handling.
options.retries
number
default:"3"
Number of retry attempts for failed jobs
options.timeout
number
default:"30000"
Job timeout in milliseconds (default: 30 seconds)
options.concurrency
number
default:"1"
Number of parallel workers processing jobs
options.pollInterval
number
default:"100"
Polling interval in milliseconds between queue checks
options.onSuccess
(job: Job<T>) => void | Promise<void>
Callback called when a job completes successfully
options.onRetry
(context: RetryContext<T>) => void | Promise<void>
Callback called when a job is retried after failure
options.onError
(context: ErrorContext<T>) => void | Promise<void>
Callback called when a job fails permanently after all retries
options.onParseError
(context: ParseErrorContext) => void | Promise<void>
Callback called when a job payload cannot be parsed (moved to dead-letter queue)
Example:
const queue = new Queue({
  name: "my-queue",
  redis: new Bun.RedisClient("redis://localhost:6379"),
  handler: async (data) => {
    console.log(`Processing: ${data.message}`);
  },
  retries: 3,
  timeout: 60000,
  concurrency: 5,
});

Methods

enqueue

Adds a job to the queue.
enqueue(data: T): Promise<[Error, null] | [null, string]>
data
T
required
Job data to enqueue
result
[null, string] | [Error, null]
Result tuple containing either:
  • [null, jobId] - Success with generated job ID
  • [error, null] - Error with type "QueueError"
Example:
const [error, jobId] = await queue.enqueue({ message: "Hello!" });

if (error) {
  console.error("Failed to enqueue:", error.message);
} else {
  console.log("Job enqueued:", jobId);
}

stop

Gracefully stops the queue, waiting for active jobs to complete.
stop(): Promise<void>
Example:
// Graceful shutdown
process.on("SIGTERM", async () => {
  await queue.stop();
  process.exit(0);
});

Type Definitions

Job

type Job<T> = {
  id: string;
  data: T;
  attempts: number;
  maxRetries: number;
  createdAt: number;
};
id
string
Unique job identifier (UUID)
data
T
Job payload
attempts
number
Number of processing attempts made
maxRetries
number
Maximum number of retry attempts allowed
createdAt
number
Timestamp when job was created (milliseconds)

RetryContext

type RetryContext<T> = {
  job: Job<T>;
  error: string;
  nextRetryDelayMs: number;
  retriesRemaining: number;
  backoffMultiplier: number;
};
job
Job<T>
The job being retried
error
string
Error message from the failed attempt
nextRetryDelayMs
number
Delay in milliseconds before next retry (exponential backoff)
retriesRemaining
number
Number of retry attempts remaining
backoffMultiplier
number
Multiplier used for exponential backoff (typically 2)

ErrorContext

type ErrorContext<T> = {
  job: Job<T>;
  lastError: string;
  totalDurationMs: number;
  totalAttempts: number;
  errorHistory: Array<{
    attempt: number;
    error: string;
    timestamp: number;
  }>;
};
job
Job<T>
The job that failed permanently
lastError
string
Most recent error message
totalDurationMs
number
Total time from job creation to final failure
totalAttempts
number
Total number of attempts made
errorHistory
Array<{ attempt: number; error: string; timestamp: number }>
Complete history of all errors encountered

ParseErrorContext

type ParseErrorContext = {
  rawJobData: string;
  parseError: string;
  timestamp: number;
};
rawJobData
string
Raw job data that failed to parse
parseError
string
Parse error message
timestamp
number
When the parse error occurred

Usage Examples

Basic Queue

import { Queue } from "semola/queue";

const queue = new Queue({
  name: "my-queue",
  redis: new Bun.RedisClient("redis://localhost:6379"),
  handler: async (data) => {
    console.log(`Processing: ${data.message}`);
  },
});

// Add a job
const [error, jobId] = await queue.enqueue({ message: "Hello!" });

// Graceful shutdown
await queue.stop();

Email Queue with Error Handling

import { Queue } from "semola/queue";

type EmailJob = {
  to: string;
  subject: string;
  body: string;
};

const emailQueue = new Queue<EmailJob>({
  name: "emails",
  redis: new Bun.RedisClient("redis://localhost:6379"),
  handler: async (data) => {
    await sendEmail(data.to, data.subject, data.body);
  },
  onSuccess: (job) => {
    console.log(`✓ Email sent: ${job.id}`);
  },
  onRetry: (context) => {
    console.log(
      `⟳ Retrying ${context.job.id} in ${context.nextRetryDelayMs}ms ` +
      `(${context.retriesRemaining} retries remaining)`
    );
  },
  onError: (context) => {
    console.error(
      `✗ Email failed: ${context.job.id} after ${context.totalAttempts} attempts`
    );
    notifyFailure(context.job.data, context.lastError);
  },
  retries: 5,
  timeout: 30000,
});

await emailQueue.enqueue({
  to: "[email protected]",
  subject: "Welcome!",
  body: "Thanks for signing up.",
});

File Processing with Concurrency

import { Queue } from "semola/queue";

type ImportJob = {
  fileId: string;
  url: string;
};

const importQueue = new Queue<ImportJob>({
  name: "imports",
  redis: new Bun.RedisClient("redis://localhost:6379"),
  handler: async (data, signal) => {
    // Check abort signal for timeout
    if (signal?.aborted) {
      throw new Error("Job aborted");
    }
    
    const file = await downloadFile(data.url);
    await importFileData(data.fileId, file);
  },
  onError: (context) => {
    logger.error(`Import failed: ${context.job.data.fileId}`, {
      errorHistory: context.errorHistory,
    });
  },
  retries: 2,
  timeout: 120000, // 2 minutes
  concurrency: 3, // Process 3 files in parallel
});

Task Queue with Full Callbacks

import { Queue } from "semola/queue";

type TaskJob = {
  taskId: string;
  userId: string;
  action: string;
};

const taskQueue = new Queue<TaskJob>({
  name: "tasks",
  redis: new Bun.RedisClient("redis://localhost:6379"),
  handler: async (data) => {
    await processTask(data.taskId, data.userId);
  },
  onSuccess: (job) => {
    console.log(`✓ Task completed: ${job.id}`);
  },
  onRetry: (context) => {
    console.log(
      `⟳ Retrying ${context.job.id} in ${context.nextRetryDelayMs}ms ` +
      `(${context.retriesRemaining} left)`
    );
  },
  onError: (context) => {
    console.error(
      `✗ Task failed: ${context.job.id} after ${context.totalAttempts} attempts`
    );
    notifyFailure(context.job.data, context.lastError);
  },
  onParseError: (context) => {
    console.error("Failed to parse job:", {
      rawData: context.rawJobData,
      error: context.parseError,
    });
  },
  retries: 3,
  timeout: 60000,
  concurrency: 5,
});

Features

  • Automatic retries with exponential backoff (1s, 2s, 4s, etc., up to 60s max)
  • Concurrent processing with configurable worker count
  • Job timeout with abort signal support
  • Error tracking with complete error history
  • Dead-letter queue for unparseable jobs
  • Graceful shutdown waits for active jobs to complete
  • Result pattern uses [error, data] tuples for error handling

Build docs developers (and LLMs) love