Overview
Defines a processing function for the jobs in a given Queue. The callback is called every time a job is placed in the queue and ready to be processed.
Signatures
// Process all jobs with default concurrency (1)
process(processor: ((job, done?) => Promise<any>) | string)
// Process all jobs with specified concurrency
process(concurrency: number, processor: ((job, done?) => Promise<any>) | string)
// Process named jobs with default concurrency
process(name: string, processor: ((job, done?) => Promise<any>) | string)
// Process named jobs with specified concurrency
process(name: string, concurrency: number, processor: ((job, done?) => Promise<any>) | string)
Bull recognizes the desired function signature by checking the parameters’ types. Concurrency defaults to 1 if not specified.
Parameters
Optional name to process only jobs with this specific name. Use '*' to process all named jobs as a default processor.
Maximum number of jobs to process in parallel.
processor
function | string
required
The processing function or path to a processor module.As a function:
- Receives
job as the first argument
- Optional
done callback as second argument
- If
done is omitted, must return a Promise
As a string:
- Absolute path to a processor module
- Module should export the processor function
- Runs in a separate process for better CPU utilization
Processor Function
Promise-based Processor
function processor(job: Job): Promise<any>
Return a promise that resolves with the result or rejects with an error.
Callback-based Processor
function processor(job: Job, done: DoneCallback): void
Call done() when finished:
done(null, result) for success
done(error) for failure
Bull determines the processor type by checking the function’s length property. Don’t mix both approaches:// WRONG - Don't do this!
queue.process(function (job, done) {
return Promise.resolve(); // Has done but returns Promise
});
// CORRECT - Pick one approach
queue.process(function (job) {
return Promise.resolve(); // Promise-based
});
Examples
Basic Processor
queue.process(async (job) => {
// Process job
console.log('Processing job', job.id);
return { success: true };
});
Processor with Callback
queue.process((job, done) => {
processData(job.data, (err, result) => {
if (err) {
return done(err);
}
done(null, result);
});
});
Processor with Concurrency
// Process up to 5 jobs in parallel
queue.process(5, async (job) => {
return await heavyProcessing(job.data);
});
Named Processor
// Only process jobs with name 'email'
queue.process('email', async (job) => {
await sendEmail(job.data.to, job.data.subject, job.data.body);
});
// Add a named job
queue.add('email', {
to: 'user@example.com',
subject: 'Hello',
body: 'World'
});
Multiple Named Processors
const queue = new Queue('notifications');
// Each processor handles different job types
queue.process('email', 25, sendEmail);
queue.process('sms', 50, sendSMS);
queue.process('push', 100, sendPush);
Concurrency stacks up across all named processors in the same queue. In the example above, the queue can process up to 175 jobs concurrently (25 + 50 + 100). To avoid this, create separate queues for each job type.
Separate Queues for Concurrency Control
// Better approach for concurrency control
const emailQueue = new Queue('email');
emailQueue.process(25, sendEmail); // Max 25 concurrent
const smsQueue = new Queue('sms');
smsQueue.process(50, sendSMS); // Max 50 concurrent
Wildcard Processor
// Process all named jobs with a single processor
queue.process('*', async (job) => {
console.log(`Processing job type: ${job.name}`);
switch (job.name) {
case 'email':
return await sendEmail(job.data);
case 'sms':
return await sendSMS(job.data);
default:
throw new Error(`Unknown job type: ${job.name}`);
}
});
queue.add('email', { to: 'user@example.com' });
queue.add('sms', { phone: '+1234567890' });
Separate Process File
// processor.js
module.exports = function (job) {
// Heavy CPU work in separate process
const result = intensiveCalculation(job.data);
return result;
};
// main.js
const path = require('path');
queue.process(
5,
path.join(__dirname, 'processor.js')
);
Processor with Progress Updates
queue.process(async (job) => {
const steps = 10;
for (let i = 1; i <= steps; i++) {
await processStep(job.data, i);
await job.progress(i * 10); // Update progress: 10%, 20%, etc.
}
return { completed: true };
});
Error Handling
queue.process(async (job) => {
try {
const result = await riskyOperation(job.data);
return result;
} catch (error) {
// Log and re-throw for job retry
await job.log(`Error: ${error.message}`);
throw error;
}
});
Job Object
The job parameter passed to the processor contains:
job.id - Unique job identifier
job.name - Job name (for named jobs)
job.data - Job data passed to queue.add()
job.opts - Job options
job.progress() - Update job progress
job.log() - Add log entries
job.attemptsMade - Number of failed attempts
Return Value
The process() method does not return a value. It registers the processor function with the queue.
Notes
- You must define processors for all named jobs you add, or use
'*' as a catch-all processor
- Results are passed to the
'completed' event
- Errors are passed to the
'failed' event
- Separate process processors provide better CPU utilization for blocking code
- The processor is called with the job instance and processes it asynchronously