Skip to main content

Overview

A processor is a function that defines how to handle jobs in a queue. Processors receive jobs from the queue and execute the necessary work, whether it’s sending emails, processing images, or any other task.

Process Function Signatures

Bull supports multiple processor signatures:
process(processor: ((job, done?) => Promise<any>) | string)
process(concurrency: number, processor: ((job, done?) => Promise<any>) | string)
process(name: string, processor: ((job, done?) => Promise<any>) | string)
process(name: string, concurrency: number, processor: ((job, done?) => Promise<any>) | string)

Promise-based Processor

The recommended approach - return a Promise:
const Queue = require('bull');
const queue = new Queue('image-processing');

queue.process(async (job) => {
  const { imageUrl } = job.data;
  
  // Process the image
  const result = await processImage(imageUrl);
  
  // Return the result
  return result;
});

Callback-based Processor

Use the done callback for completion:
queue.process(function(job, done) {
  const { videoUrl } = job.data;
  
  // Report progress
  job.progress(42);
  
  // Complete successfully
  done(null, { framerate: 29.5 });
  
  // Or signal an error
  // done(new Error('processing failed'));
});
Don’t mix patterns! Bull determines the processor type by the function’s .length property. A function with 2 parameters expects a callback, while 1 parameter expects a Promise.
// ❌ THIS WON'T WORK - has done parameter but returns Promise
queue.process(function(job, done) {
  return Promise.resolve();
});

// ✅ This works - no done parameter, returns Promise
queue.process(function(job) {
  return Promise.resolve();
});
From the source (~/workspace/source/lib/queue.js:728-738):
handler = handler.bind(this);

if (handler.length > 1) {
  this.handlers[name] = promisify(handler);
} else {
  this.handlers[name] = function() {
    try {
      return Promise.resolve(handler.apply(null, arguments));
    } catch (err) {
      return Promise.reject(err);
    }
  };
}

Concurrency

Process multiple jobs in parallel by specifying a concurrency value:
// Process up to 5 jobs simultaneously
queue.process(5, async (job) => {
  return await performWork(job.data);
});
From the source (~/workspace/source/lib/queue.js:668-691):
Queue.prototype.process = function(name, concurrency, handler) {
  switch (arguments.length) {
    case 1:
      handler = name;
      concurrency = 1;
      name = Job.DEFAULT_JOB_NAME;
      break;
    case 2:
      handler = concurrency;
      if (typeof name === 'string') {
        concurrency = 1;
      } else {
        concurrency = name;
        name = Job.DEFAULT_JOB_NAME;
      }
      break;
  }

  this.setHandler(name, handler);

  return this._initProcess().then(() => {
    return this.start(concurrency, name);
  });
};

Example: Concurrent Processing

const emailQueue = new Queue('emails');

// Process 10 emails concurrently
emailQueue.process(10, async (job) => {
  await sendEmail(job.data.to, job.data.subject, job.data.body);
  return { sent: true };
});

// Add 100 emails
for (let i = 0; i < 100; i++) {
  await emailQueue.add({
    to: `user${i}@example.com`,
    subject: 'Hello',
    body: 'Message'
  });
}
// Will process 10 at a time

Named Processors

Define multiple processors for different job types in the same queue:
const queue = new Queue('notifications');

// Processor for email notifications
queue.process('email', async (job) => {
  return await sendEmail(job.data);
});

// Processor for SMS notifications
queue.process('sms', async (job) => {
  return await sendSMS(job.data);
});

// Processor for push notifications
queue.process('push', async (job) => {
  return await sendPush(job.data);
});

// Add different job types
await queue.add('email', { to: 'user@example.com' });
await queue.add('sms', { phone: '+1234567890' });
await queue.add('push', { deviceId: 'abc123' });

Wildcard Processor

Use * to handle all named jobs with a single processor:
const queue = new Queue('tasks');

queue.process('*', async (job) => {
  console.log(`Processing ${job.name}`);
  
  switch(job.name) {
    case 'taskA':
      return handleTaskA(job.data);
    case 'taskB':
      return handleTaskB(job.data);
    default:
      return handleDefault(job.data);
  }
});

await queue.add('taskA', { data: 1 });
await queue.add('taskB', { data: 2 });
When using multiple named processors in one queue, concurrency values stack. If you need isolated concurrency per job type, create separate queues.

Sandboxed Processors

Run processors in separate processes for isolation and better CPU utilization:

Creating a Processor File

// processor.js
module.exports = async function(job) {
  // Heavy CPU work won't block the main process
  const result = await performHeavyComputation(job.data);
  return result;
};

Using the Processor

const queue = new Queue('computation');

// Single sandboxed processor
queue.process('/path/to/processor.js');

// With concurrency
queue.process(5, '/path/to/processor.js');

// Named processor
queue.process('compute', 5, '/path/to/processor.js');
From the source (~/workspace/source/lib/queue.js:710-724):
if (typeof handler === 'string') {
  const supportedFileTypes = ['.js', '.ts', '.flow', '.cjs'];
  const processorFile =
    handler +
    (supportedFileTypes.includes(path.extname(handler)) ? '' : '.js');

  if (!fs.existsSync(processorFile)) {
    throw new Error('File ' + processorFile + ' does not exist');
  }
  const isSharedChildPool = this.settings.isSharedChildPool;
  this.childPool =
    this.childPool || require('./process/child-pool')(isSharedChildPool);

  const sandbox = require('./process/sandbox');
  this.handlers[name] = sandbox(handler, this.childPool).bind(this);
}

Advantages of Sandboxed Processors

If a sandboxed processor crashes, it doesn’t affect the main queue process. The job will be marked as failed and retried according to its configuration.
CPU-intensive synchronous code won’t block the event loop or stall other jobs. Each sandboxed processor runs in its own process.
Sandboxed processors can fully utilize multiple CPU cores since each runs in a separate Node.js process.
Multiple sandboxed processors share Redis connections more efficiently than separate queue instances.

Example: Sandboxed Processor

// video-processor.js
const ffmpeg = require('fluent-ffmpeg');

module.exports = async function(job) {
  const { inputPath, outputPath } = job.data;
  
  return new Promise((resolve, reject) => {
    ffmpeg(inputPath)
      .output(outputPath)
      .on('progress', (progress) => {
        job.progress(progress.percent);
      })
      .on('end', () => resolve({ path: outputPath }))
      .on('error', reject)
      .run();
  });
};
// main.js
const queue = new Queue('video-processing');

// Process 3 videos concurrently in separate processes
queue.process(3, __dirname + '/video-processor.js');

await queue.add({
  inputPath: '/videos/input.mp4',
  outputPath: '/videos/output.mp4'
});

Processor Patterns

Progress Reporting

queue.process(async (job) => {
  const steps = 100;
  
  for (let i = 0; i <= steps; i++) {
    await performStep(i);
    await job.progress(i);
  }
  
  return { completed: steps };
});

Error Handling

queue.process(async (job) => {
  try {
    const result = await riskyOperation(job.data);
    return result;
  } catch (error) {
    // Log the error
    await job.log(`Error: ${error.message}`);
    
    // Rethrow to mark job as failed
    throw error;
  }
});

Timeout Handling

Jobs can have timeouts specified in options:
await queue.add({ url: 'http://example.com' }, {
  timeout: 5000  // 5 second timeout
});

queue.process(async (job) => {
  // This will fail if it takes > 5 seconds
  const result = await fetchData(job.data.url);
  return result;
});
Jobs are not proactively stopped after timeout. The job is marked as failed and the promise is rejected, but Bull cannot externally stop the processor function. Implement your own cancellation logic if needed.

Next Steps

Events

Listen to job and queue events

Job Lifecycle

Understand the complete job lifecycle

Build docs developers (and LLMs) love