Queue
A Redis-backed job queue with automatic retry logic, exponential backoff, and concurrent processing.
Import
import { Queue } from "semola/queue";
Class: Queue
Constructor
new Queue<T>(options: QueueOptions<T>)
Queue configuration optionsQueue name used for Redis keys
Bun Redis client instance
options.handler
(data: T, signal?: AbortSignal) => void | Promise<void>
required
Function to process each job. Receives job data and optional abort signal for timeout handling.
Number of retry attempts for failed jobs
Job timeout in milliseconds (default: 30 seconds)
Number of parallel workers processing jobs
Polling interval in milliseconds between queue checks
options.onSuccess
(job: Job<T>) => void | Promise<void>
Callback called when a job completes successfully
options.onRetry
(context: RetryContext<T>) => void | Promise<void>
Callback called when a job is retried after failure
options.onError
(context: ErrorContext<T>) => void | Promise<void>
Callback called when a job fails permanently after all retries
options.onParseError
(context: ParseErrorContext) => void | Promise<void>
Callback called when a job payload cannot be parsed (moved to dead-letter queue)
Example:
const queue = new Queue({
name: "my-queue",
redis: new Bun.RedisClient("redis://localhost:6379"),
handler: async (data) => {
console.log(`Processing: ${data.message}`);
},
retries: 3,
timeout: 60000,
concurrency: 5,
});
Methods
enqueue
Adds a job to the queue.
enqueue(data: T): Promise<[Error, null] | [null, string]>
result
[null, string] | [Error, null]
Result tuple containing either:
[null, jobId] - Success with generated job ID
[error, null] - Error with type "QueueError"
Example:
const [error, jobId] = await queue.enqueue({ message: "Hello!" });
if (error) {
console.error("Failed to enqueue:", error.message);
} else {
console.log("Job enqueued:", jobId);
}
stop
Gracefully stops the queue, waiting for active jobs to complete.
Example:
// Graceful shutdown
process.on("SIGTERM", async () => {
await queue.stop();
process.exit(0);
});
Type Definitions
Job
type Job<T> = {
id: string;
data: T;
attempts: number;
maxRetries: number;
createdAt: number;
};
Unique job identifier (UUID)
Number of processing attempts made
Maximum number of retry attempts allowed
Timestamp when job was created (milliseconds)
RetryContext
type RetryContext<T> = {
job: Job<T>;
error: string;
nextRetryDelayMs: number;
retriesRemaining: number;
backoffMultiplier: number;
};
Error message from the failed attempt
Delay in milliseconds before next retry (exponential backoff)
Number of retry attempts remaining
Multiplier used for exponential backoff (typically 2)
ErrorContext
type ErrorContext<T> = {
job: Job<T>;
lastError: string;
totalDurationMs: number;
totalAttempts: number;
errorHistory: Array<{
attempt: number;
error: string;
timestamp: number;
}>;
};
The job that failed permanently
Most recent error message
Total time from job creation to final failure
Total number of attempts made
errorHistory
Array<{ attempt: number; error: string; timestamp: number }>
Complete history of all errors encountered
ParseErrorContext
type ParseErrorContext = {
rawJobData: string;
parseError: string;
timestamp: number;
};
Raw job data that failed to parse
When the parse error occurred
Usage Examples
Basic Queue
import { Queue } from "semola/queue";
const queue = new Queue({
name: "my-queue",
redis: new Bun.RedisClient("redis://localhost:6379"),
handler: async (data) => {
console.log(`Processing: ${data.message}`);
},
});
// Add a job
const [error, jobId] = await queue.enqueue({ message: "Hello!" });
// Graceful shutdown
await queue.stop();
Email Queue with Error Handling
import { Queue } from "semola/queue";
type EmailJob = {
to: string;
subject: string;
body: string;
};
const emailQueue = new Queue<EmailJob>({
name: "emails",
redis: new Bun.RedisClient("redis://localhost:6379"),
handler: async (data) => {
await sendEmail(data.to, data.subject, data.body);
},
onSuccess: (job) => {
console.log(`✓ Email sent: ${job.id}`);
},
onRetry: (context) => {
console.log(
`⟳ Retrying ${context.job.id} in ${context.nextRetryDelayMs}ms ` +
`(${context.retriesRemaining} retries remaining)`
);
},
onError: (context) => {
console.error(
`✗ Email failed: ${context.job.id} after ${context.totalAttempts} attempts`
);
notifyFailure(context.job.data, context.lastError);
},
retries: 5,
timeout: 30000,
});
await emailQueue.enqueue({
to: "[email protected]",
subject: "Welcome!",
body: "Thanks for signing up.",
});
File Processing with Concurrency
import { Queue } from "semola/queue";
type ImportJob = {
fileId: string;
url: string;
};
const importQueue = new Queue<ImportJob>({
name: "imports",
redis: new Bun.RedisClient("redis://localhost:6379"),
handler: async (data, signal) => {
// Check abort signal for timeout
if (signal?.aborted) {
throw new Error("Job aborted");
}
const file = await downloadFile(data.url);
await importFileData(data.fileId, file);
},
onError: (context) => {
logger.error(`Import failed: ${context.job.data.fileId}`, {
errorHistory: context.errorHistory,
});
},
retries: 2,
timeout: 120000, // 2 minutes
concurrency: 3, // Process 3 files in parallel
});
Task Queue with Full Callbacks
import { Queue } from "semola/queue";
type TaskJob = {
taskId: string;
userId: string;
action: string;
};
const taskQueue = new Queue<TaskJob>({
name: "tasks",
redis: new Bun.RedisClient("redis://localhost:6379"),
handler: async (data) => {
await processTask(data.taskId, data.userId);
},
onSuccess: (job) => {
console.log(`✓ Task completed: ${job.id}`);
},
onRetry: (context) => {
console.log(
`⟳ Retrying ${context.job.id} in ${context.nextRetryDelayMs}ms ` +
`(${context.retriesRemaining} left)`
);
},
onError: (context) => {
console.error(
`✗ Task failed: ${context.job.id} after ${context.totalAttempts} attempts`
);
notifyFailure(context.job.data, context.lastError);
},
onParseError: (context) => {
console.error("Failed to parse job:", {
rawData: context.rawJobData,
error: context.parseError,
});
},
retries: 3,
timeout: 60000,
concurrency: 5,
});
Features
- Automatic retries with exponential backoff (1s, 2s, 4s, etc., up to 60s max)
- Concurrent processing with configurable worker count
- Job timeout with abort signal support
- Error tracking with complete error history
- Dead-letter queue for unparseable jobs
- Graceful shutdown waits for active jobs to complete
- Result pattern uses
[error, data] tuples for error handling