A processor is a function that defines how to handle jobs in a queue. Processors receive jobs from the queue and execute the necessary work, whether it’s sending emails, processing images, or any other task.
queue.process(function(job, done) { const { videoUrl } = job.data; // Report progress job.progress(42); // Complete successfully done(null, { framerate: 29.5 }); // Or signal an error // done(new Error('processing failed'));});
Don’t mix patterns! Bull determines the processor type by the function’s .length property. A function with 2 parameters expects a callback, while 1 parameter expects a Promise.
// ❌ THIS WON'T WORK - has done parameter but returns Promisequeue.process(function(job, done) { return Promise.resolve();});// ✅ This works - no done parameter, returns Promisequeue.process(function(job) { return Promise.resolve();});
From the source (~/workspace/source/lib/queue.js:728-738):
const emailQueue = new Queue('emails');// Process 10 emails concurrentlyemailQueue.process(10, async (job) => { await sendEmail(job.data.to, job.data.subject, job.data.body); return { sent: true };});// Add 100 emailsfor (let i = 0; i < 100; i++) { await emailQueue.add({ to: `user${i}@example.com`, subject: 'Hello', body: 'Message' });}// Will process 10 at a time
// processor.jsmodule.exports = async function(job) { // Heavy CPU work won't block the main process const result = await performHeavyComputation(job.data); return result;};
const queue = new Queue('computation');// Single sandboxed processorqueue.process('/path/to/processor.js');// With concurrencyqueue.process(5, '/path/to/processor.js');// Named processorqueue.process('compute', 5, '/path/to/processor.js');
From the source (~/workspace/source/lib/queue.js:710-724):
If a sandboxed processor crashes, it doesn’t affect the main queue process. The job will be marked as failed and retried according to its configuration.
Blocking Code Support
CPU-intensive synchronous code won’t block the event loop or stall other jobs. Each sandboxed processor runs in its own process.
Multi-core Utilization
Sandboxed processors can fully utilize multiple CPU cores since each runs in a separate Node.js process.
Fewer Redis Connections
Multiple sandboxed processors share Redis connections more efficiently than separate queue instances.
await queue.add({ url: 'http://example.com' }, { timeout: 5000 // 5 second timeout});queue.process(async (job) => { // This will fail if it takes > 5 seconds const result = await fetchData(job.data.url); return result;});
Jobs are not proactively stopped after timeout. The job is marked as failed and the promise is rejected, but Bull cannot externally stop the processor function. Implement your own cancellation logic if needed.