Skip to main content
Learn how to use Semola’s Queue for reliable background job processing with Redis.

Overview

The Queue module provides a Redis-backed job queue with:
  • Automatic retry logic with exponential backoff
  • Configurable concurrency
  • Job timeout handling
  • Success/error/retry callbacks
  • FIFO processing order
  • Dead-letter queue for malformed jobs

Basic setup

import { Queue } from "semola/queue";

const queue = new Queue({
  name: "my-queue",
  redis: new Bun.RedisClient("redis://localhost:6379"),
  handler: async (data) => {
    console.log(`Processing: ${data.message}`);
  },
});

// Add a job
const [error, jobId] = await queue.enqueue({ message: "Hello!" });

// Graceful shutdown
await queue.stop();

Configuration options

Basic options

  • name (required) - Queue name for Redis keys
  • redis (required) - Bun Redis client instance
  • handler (required) - Function to process each job
  • retries - Number of retry attempts (default: 3)
  • timeout - Job timeout in milliseconds (default: 30000)
  • concurrency - Number of parallel workers (default: 1)

Callback options

  • onSuccess - Called when a job succeeds
  • onRetry - Called when a job is retried
  • onError - Called when a job fails permanently
  • onParseError - Called when job data cannot be parsed

Enqueuing jobs

Simple job

const [error, jobId] = await queue.enqueue({ message: "hello" });

if (error) {
  console.error("Failed to enqueue:", error.message);
} else {
  console.log("Job enqueued:", jobId);
}

Type-safe jobs

type EmailJob = {
  to: string;
  subject: string;
  body: string;
};

const emailQueue = new Queue<EmailJob>({
  name: "emails",
  redis: redisClient,
  handler: async (data) => {
    await sendEmail(data.to, data.subject, data.body);
  },
  retries: 5,
  timeout: 30000,
});

await emailQueue.enqueue({
  to: "user@example.com",
  subject: "Welcome!",
  body: "Thanks for signing up.",
});

Error handling

Retry logic

Jobs are automatically retried with exponential backoff:
const taskQueue = new Queue({
  name: "tasks",
  redis: redisClient,
  handler: async (data) => {
    await processTask(data.taskId, data.userId);
  },
  retries: 3, // Will try up to 4 times (initial + 3 retries)
  onRetry: (context) => {
    console.log(`⟳ Retrying ${context.job.id} in ${context.nextRetryDelayMs}ms`);
    console.log(`  Retries remaining: ${context.retriesRemaining}`);
    console.log(`  Error: ${context.error}`);
  },
});

Exponential backoff

Retry delays increase exponentially:
  • Attempt 1 → 1000ms delay
  • Attempt 2 → 2000ms delay
  • Attempt 3 → 4000ms delay
  • And so on…

Success callback

const queue = new Queue({
  name: "tasks",
  redis: redisClient,
  handler: async (data) => {
    await processTask(data);
  },
  onSuccess: (job) => {
    console.log(`✓ Task completed: ${job.id}`);
  },
});

Error callback

const queue = new Queue({
  name: "tasks",
  redis: redisClient,
  handler: async (data) => {
    await processTask(data);
  },
  onError: (context) => {
    console.error(`✗ Task failed: ${context.job.id}`);
    console.error(`  Total attempts: ${context.totalAttempts}`);
    console.error(`  Last error: ${context.lastError}`);
    console.error(`  Duration: ${context.totalDurationMs}ms`);
    console.error(`  Error history:`, context.errorHistory);

    // Notify monitoring service
    notifyFailure(context.job.data, context.lastError);
  },
  retries: 3,
});

Concurrency control

Processing multiple jobs in parallel

const importQueue = new Queue({
  name: "imports",
  redis: redisClient,
  handler: async (data) => {
    const file = await downloadFile(data.url);
    await importFileData(data.fileId, file);
  },
  concurrency: 3, // Process 3 files in parallel
  retries: 2,
  timeout: 120000,
});
Be careful with concurrency settings. High concurrency can:
  • Overwhelm your database or external APIs
  • Increase memory usage
  • Cause rate limiting issues
Start with low concurrency and increase gradually while monitoring performance.

Job timeouts

Handling long-running jobs

const queue = new Queue({
  name: "processing",
  redis: redisClient,
  handler: async (data, signal) => {
    // Check signal to respect timeout
    if (signal?.aborted) {
      throw new Error("Job timed out");
    }

    await processLargeFile(data.fileId);
  },
  timeout: 60000, // 60 second timeout
  onError: (context) => {
    if (context.lastError.includes("timed out")) {
      console.error(`Job ${context.job.id} exceeded timeout`);
    }
  },
});

Dead-letter queue

Malformed jobs that cannot be parsed are automatically moved to a dead-letter queue:
const queue = new Queue({
  name: "tasks",
  redis: redisClient,
  handler: async (data) => {
    await processTask(data);
  },
  onParseError: async (context) => {
    console.error("Failed to parse job:");
    console.error(`  Raw data: ${context.rawJobData}`);
    console.error(`  Error: ${context.parseError}`);
    console.error(`  Timestamp: ${context.timestamp}`);

    // Alert monitoring
    await notifyParseError(context);
  },
});
Dead-letter jobs are stored in Redis at queue:{name}:dead-letter and include:
  • Original raw job data
  • Parse error message
  • Timestamp

Real-world examples

Email queue

1
Define the queue
2
type EmailJob = {
  to: string;
  subject: string;
  body: string;
  templateId?: string;
};

const emailQueue = new Queue<EmailJob>({
  name: "emails",
  redis: new Bun.RedisClient("redis://localhost:6379"),
  handler: async (data) => {
    await sendEmail(data.to, data.subject, data.body);
  },
  retries: 5,
  timeout: 30000,
  onSuccess: (job) => {
    console.log(`Email sent to ${job.data.to}`);
  },
  onError: (context) => {
    console.error(`Failed to send email to ${context.job.data.to}`);
    // Queue to manual review
    await queueManualReview(context.job.data);
  },
});
3
Enqueue jobs
4
// Welcome email
await emailQueue.enqueue({
  to: "user@example.com",
  subject: "Welcome to our platform!",
  body: "Thanks for signing up.",
  templateId: "welcome",
});

// Password reset
await emailQueue.enqueue({
  to: "user@example.com",
  subject: "Reset your password",
  body: "Click here to reset: ...",
});
5
Graceful shutdown
6
process.on("SIGTERM", async () => {
  console.log("Shutting down email queue...");
  await emailQueue.stop();
  console.log("Queue stopped");
});

Image processing queue

type ImageJob = {
  imageId: string;
  url: string;
  sizes: number[];
};

const imageQueue = new Queue<ImageJob>({
  name: "images",
  redis: redisClient,
  handler: async (data, signal) => {
    // Download image
    const image = await downloadImage(data.url);

    // Check timeout
    if (signal?.aborted) {
      throw new Error("Processing timed out");
    }

    // Generate thumbnails
    for (const size of data.sizes) {
      if (signal?.aborted) break;
      await generateThumbnail(image, size);
    }

    // Upload to storage
    await uploadProcessedImages(data.imageId);
  },
  concurrency: 5,
  timeout: 120000,
  retries: 2,
  onSuccess: (job) => {
    console.log(`Processed image: ${job.data.imageId}`);
  },
  onError: (context) => {
    console.error(`Failed to process image: ${context.job.data.imageId}`);
  },
});

// Enqueue image processing
await imageQueue.enqueue({
  imageId: "img_123",
  url: "https://example.com/photo.jpg",
  sizes: [128, 256, 512, 1024],
});

Data import queue

const importQueue = new Queue({
  name: "imports",
  redis: redisClient,
  handler: async (data) => {
    const file = await downloadFile(data.url);
    await importFileData(data.fileId, file);
  },
  concurrency: 3,
  retries: 2,
  timeout: 120000,
  onError: (context) => {
    console.error(`Import failed: ${context.job.data.fileId}`);
    // Mark import as failed in database
    await markImportFailed(context.job.data.fileId, context.lastError);
  },
});

Best practices

Keep jobs idempotent: Design your job handlers to be safely retried. If a job runs twice, it should produce the same result.
Use timeouts wisely: Set reasonable timeouts based on expected job duration. Jobs exceeding the timeout will be retried.
Monitor your queues: Use onSuccess, onRetry, and onError callbacks to track queue health and identify issues early.
Handle graceful shutdown: Always call queue.stop() during application shutdown to allow in-flight jobs to complete.

Multiple queues

You can run multiple queues with different configurations:
const highPriorityQueue = new Queue({
  name: "high-priority",
  redis: redisClient,
  handler: processHighPriority,
  concurrency: 10,
  retries: 5,
});

const lowPriorityQueue = new Queue({
  name: "low-priority",
  redis: redisClient,
  handler: processLowPriority,
  concurrency: 2,
  retries: 2,
});

Next steps

Build docs developers (and LLMs) love