Skip to main content

Overview

Defines a processing function for the jobs in a given Queue. The callback is called every time a job is placed in the queue and ready to be processed.

Signatures

// Process all jobs with default concurrency (1)
process(processor: ((job, done?) => Promise<any>) | string)

// Process all jobs with specified concurrency
process(concurrency: number, processor: ((job, done?) => Promise<any>) | string)

// Process named jobs with default concurrency
process(name: string, processor: ((job, done?) => Promise<any>) | string)

// Process named jobs with specified concurrency
process(name: string, concurrency: number, processor: ((job, done?) => Promise<any>) | string)
Bull recognizes the desired function signature by checking the parameters’ types. Concurrency defaults to 1 if not specified.

Parameters

name
string
Optional name to process only jobs with this specific name. Use '*' to process all named jobs as a default processor.
concurrency
number
default:"1"
Maximum number of jobs to process in parallel.
processor
function | string
required
The processing function or path to a processor module.As a function:
  • Receives job as the first argument
  • Optional done callback as second argument
  • If done is omitted, must return a Promise
As a string:
  • Absolute path to a processor module
  • Module should export the processor function
  • Runs in a separate process for better CPU utilization

Processor Function

Promise-based Processor

function processor(job: Job): Promise<any>
Return a promise that resolves with the result or rejects with an error.

Callback-based Processor

function processor(job: Job, done: DoneCallback): void
Call done() when finished:
  • done(null, result) for success
  • done(error) for failure
Bull determines the processor type by checking the function’s length property. Don’t mix both approaches:
// WRONG - Don't do this!
queue.process(function (job, done) {
  return Promise.resolve(); // Has done but returns Promise
});

// CORRECT - Pick one approach
queue.process(function (job) {
  return Promise.resolve(); // Promise-based
});

Examples

Basic Processor

queue.process(async (job) => {
  // Process job
  console.log('Processing job', job.id);
  
  return { success: true };
});

Processor with Callback

queue.process((job, done) => {
  processData(job.data, (err, result) => {
    if (err) {
      return done(err);
    }
    done(null, result);
  });
});

Processor with Concurrency

// Process up to 5 jobs in parallel
queue.process(5, async (job) => {
  return await heavyProcessing(job.data);
});

Named Processor

// Only process jobs with name 'email'
queue.process('email', async (job) => {
  await sendEmail(job.data.to, job.data.subject, job.data.body);
});

// Add a named job
queue.add('email', {
  to: 'user@example.com',
  subject: 'Hello',
  body: 'World'
});

Multiple Named Processors

const queue = new Queue('notifications');

// Each processor handles different job types
queue.process('email', 25, sendEmail);
queue.process('sms', 50, sendSMS);
queue.process('push', 100, sendPush);
Concurrency stacks up across all named processors in the same queue. In the example above, the queue can process up to 175 jobs concurrently (25 + 50 + 100). To avoid this, create separate queues for each job type.

Separate Queues for Concurrency Control

// Better approach for concurrency control
const emailQueue = new Queue('email');
emailQueue.process(25, sendEmail); // Max 25 concurrent

const smsQueue = new Queue('sms');
smsQueue.process(50, sendSMS); // Max 50 concurrent

Wildcard Processor

// Process all named jobs with a single processor
queue.process('*', async (job) => {
  console.log(`Processing job type: ${job.name}`);
  
  switch (job.name) {
    case 'email':
      return await sendEmail(job.data);
    case 'sms':
      return await sendSMS(job.data);
    default:
      throw new Error(`Unknown job type: ${job.name}`);
  }
});

queue.add('email', { to: 'user@example.com' });
queue.add('sms', { phone: '+1234567890' });

Separate Process File

// processor.js
module.exports = function (job) {
  // Heavy CPU work in separate process
  const result = intensiveCalculation(job.data);
  return result;
};
// main.js
const path = require('path');

queue.process(
  5,
  path.join(__dirname, 'processor.js')
);

Processor with Progress Updates

queue.process(async (job) => {
  const steps = 10;
  
  for (let i = 1; i <= steps; i++) {
    await processStep(job.data, i);
    await job.progress(i * 10); // Update progress: 10%, 20%, etc.
  }
  
  return { completed: true };
});

Error Handling

queue.process(async (job) => {
  try {
    const result = await riskyOperation(job.data);
    return result;
  } catch (error) {
    // Log and re-throw for job retry
    await job.log(`Error: ${error.message}`);
    throw error;
  }
});

Job Object

The job parameter passed to the processor contains:
  • job.id - Unique job identifier
  • job.name - Job name (for named jobs)
  • job.data - Job data passed to queue.add()
  • job.opts - Job options
  • job.progress() - Update job progress
  • job.log() - Add log entries
  • job.attemptsMade - Number of failed attempts

Return Value

void
void
The process() method does not return a value. It registers the processor function with the queue.

Notes

  • You must define processors for all named jobs you add, or use '*' as a catch-all processor
  • Results are passed to the 'completed' event
  • Errors are passed to the 'failed' event
  • Separate process processors provide better CPU utilization for blocking code
  • The processor is called with the job instance and processes it asynchronously

Build docs developers (and LLMs) love