Learn how to use Semola’s Queue for reliable background job processing with Redis.
Overview
The Queue module provides a Redis-backed job queue with:
- Automatic retry logic with exponential backoff
- Configurable concurrency
- Job timeout handling
- Success/error/retry callbacks
- FIFO processing order
- Dead-letter queue for malformed jobs
Basic setup
import { Queue } from "semola/queue";
const queue = new Queue({
name: "my-queue",
redis: new Bun.RedisClient("redis://localhost:6379"),
handler: async (data) => {
console.log(`Processing: ${data.message}`);
},
});
// Add a job
const [error, jobId] = await queue.enqueue({ message: "Hello!" });
// Graceful shutdown
await queue.stop();
Configuration options
Basic options
name (required) - Queue name for Redis keys
redis (required) - Bun Redis client instance
handler (required) - Function to process each job
retries - Number of retry attempts (default: 3)
timeout - Job timeout in milliseconds (default: 30000)
concurrency - Number of parallel workers (default: 1)
Callback options
onSuccess - Called when a job succeeds
onRetry - Called when a job is retried
onError - Called when a job fails permanently
onParseError - Called when job data cannot be parsed
Enqueuing jobs
Simple job
const [error, jobId] = await queue.enqueue({ message: "hello" });
if (error) {
console.error("Failed to enqueue:", error.message);
} else {
console.log("Job enqueued:", jobId);
}
Type-safe jobs
type EmailJob = {
to: string;
subject: string;
body: string;
};
const emailQueue = new Queue<EmailJob>({
name: "emails",
redis: redisClient,
handler: async (data) => {
await sendEmail(data.to, data.subject, data.body);
},
retries: 5,
timeout: 30000,
});
await emailQueue.enqueue({
to: "user@example.com",
subject: "Welcome!",
body: "Thanks for signing up.",
});
Error handling
Retry logic
Jobs are automatically retried with exponential backoff:
const taskQueue = new Queue({
name: "tasks",
redis: redisClient,
handler: async (data) => {
await processTask(data.taskId, data.userId);
},
retries: 3, // Will try up to 4 times (initial + 3 retries)
onRetry: (context) => {
console.log(`⟳ Retrying ${context.job.id} in ${context.nextRetryDelayMs}ms`);
console.log(` Retries remaining: ${context.retriesRemaining}`);
console.log(` Error: ${context.error}`);
},
});
Exponential backoff
Retry delays increase exponentially:
- Attempt 1 → 1000ms delay
- Attempt 2 → 2000ms delay
- Attempt 3 → 4000ms delay
- And so on…
Success callback
const queue = new Queue({
name: "tasks",
redis: redisClient,
handler: async (data) => {
await processTask(data);
},
onSuccess: (job) => {
console.log(`✓ Task completed: ${job.id}`);
},
});
Error callback
const queue = new Queue({
name: "tasks",
redis: redisClient,
handler: async (data) => {
await processTask(data);
},
onError: (context) => {
console.error(`✗ Task failed: ${context.job.id}`);
console.error(` Total attempts: ${context.totalAttempts}`);
console.error(` Last error: ${context.lastError}`);
console.error(` Duration: ${context.totalDurationMs}ms`);
console.error(` Error history:`, context.errorHistory);
// Notify monitoring service
notifyFailure(context.job.data, context.lastError);
},
retries: 3,
});
Concurrency control
Processing multiple jobs in parallel
const importQueue = new Queue({
name: "imports",
redis: redisClient,
handler: async (data) => {
const file = await downloadFile(data.url);
await importFileData(data.fileId, file);
},
concurrency: 3, // Process 3 files in parallel
retries: 2,
timeout: 120000,
});
Be careful with concurrency settings. High concurrency can:
- Overwhelm your database or external APIs
- Increase memory usage
- Cause rate limiting issues
Start with low concurrency and increase gradually while monitoring performance.
Job timeouts
Handling long-running jobs
const queue = new Queue({
name: "processing",
redis: redisClient,
handler: async (data, signal) => {
// Check signal to respect timeout
if (signal?.aborted) {
throw new Error("Job timed out");
}
await processLargeFile(data.fileId);
},
timeout: 60000, // 60 second timeout
onError: (context) => {
if (context.lastError.includes("timed out")) {
console.error(`Job ${context.job.id} exceeded timeout`);
}
},
});
Dead-letter queue
Malformed jobs that cannot be parsed are automatically moved to a dead-letter queue:
const queue = new Queue({
name: "tasks",
redis: redisClient,
handler: async (data) => {
await processTask(data);
},
onParseError: async (context) => {
console.error("Failed to parse job:");
console.error(` Raw data: ${context.rawJobData}`);
console.error(` Error: ${context.parseError}`);
console.error(` Timestamp: ${context.timestamp}`);
// Alert monitoring
await notifyParseError(context);
},
});
Dead-letter jobs are stored in Redis at queue:{name}:dead-letter and include:
- Original raw job data
- Parse error message
- Timestamp
Real-world examples
Email queue
type EmailJob = {
to: string;
subject: string;
body: string;
templateId?: string;
};
const emailQueue = new Queue<EmailJob>({
name: "emails",
redis: new Bun.RedisClient("redis://localhost:6379"),
handler: async (data) => {
await sendEmail(data.to, data.subject, data.body);
},
retries: 5,
timeout: 30000,
onSuccess: (job) => {
console.log(`Email sent to ${job.data.to}`);
},
onError: (context) => {
console.error(`Failed to send email to ${context.job.data.to}`);
// Queue to manual review
await queueManualReview(context.job.data);
},
});
// Welcome email
await emailQueue.enqueue({
to: "user@example.com",
subject: "Welcome to our platform!",
body: "Thanks for signing up.",
templateId: "welcome",
});
// Password reset
await emailQueue.enqueue({
to: "user@example.com",
subject: "Reset your password",
body: "Click here to reset: ...",
});
process.on("SIGTERM", async () => {
console.log("Shutting down email queue...");
await emailQueue.stop();
console.log("Queue stopped");
});
Image processing queue
type ImageJob = {
imageId: string;
url: string;
sizes: number[];
};
const imageQueue = new Queue<ImageJob>({
name: "images",
redis: redisClient,
handler: async (data, signal) => {
// Download image
const image = await downloadImage(data.url);
// Check timeout
if (signal?.aborted) {
throw new Error("Processing timed out");
}
// Generate thumbnails
for (const size of data.sizes) {
if (signal?.aborted) break;
await generateThumbnail(image, size);
}
// Upload to storage
await uploadProcessedImages(data.imageId);
},
concurrency: 5,
timeout: 120000,
retries: 2,
onSuccess: (job) => {
console.log(`Processed image: ${job.data.imageId}`);
},
onError: (context) => {
console.error(`Failed to process image: ${context.job.data.imageId}`);
},
});
// Enqueue image processing
await imageQueue.enqueue({
imageId: "img_123",
url: "https://example.com/photo.jpg",
sizes: [128, 256, 512, 1024],
});
Data import queue
const importQueue = new Queue({
name: "imports",
redis: redisClient,
handler: async (data) => {
const file = await downloadFile(data.url);
await importFileData(data.fileId, file);
},
concurrency: 3,
retries: 2,
timeout: 120000,
onError: (context) => {
console.error(`Import failed: ${context.job.data.fileId}`);
// Mark import as failed in database
await markImportFailed(context.job.data.fileId, context.lastError);
},
});
Best practices
Keep jobs idempotent: Design your job handlers to be safely retried. If a job runs twice, it should produce the same result.
Use timeouts wisely: Set reasonable timeouts based on expected job duration. Jobs exceeding the timeout will be retried.
Monitor your queues: Use onSuccess, onRetry, and onError callbacks to track queue health and identify issues early.
Handle graceful shutdown: Always call queue.stop() during application shutdown to allow in-flight jobs to complete.
Multiple queues
You can run multiple queues with different configurations:
const highPriorityQueue = new Queue({
name: "high-priority",
redis: redisClient,
handler: processHighPriority,
concurrency: 10,
retries: 5,
});
const lowPriorityQueue = new Queue({
name: "low-priority",
redis: redisClient,
handler: processLowPriority,
concurrency: 2,
retries: 2,
});
Next steps