Skip to main content
Concurrency control allows you to process multiple jobs simultaneously, dramatically improving throughput for I/O-bound operations. Bull provides flexible concurrency options at both the queue and processor level.

Basic Concurrency

Specify the concurrency parameter in the process() method:
const Queue = require('bull');
const imageQueue = new Queue('image-processing');

// Process 5 jobs concurrently
imageQueue.process(5, async (job) => {
  return await processImage(job.data.imageUrl);
});
When no concurrency is specified, it defaults to 1 (sequential processing).

Process Method Signatures

Bull recognizes different process() call patterns:
// Process one job at a time
process(processor: Function)

// Process with concurrency
process(concurrency: number, processor: Function)

// Process named jobs
process(name: string, processor: Function)

// Process named jobs with concurrency
process(name: string, concurrency: number, processor: Function)

// Process using external file
process(concurrency: number, processorFile: string)

Concurrency Examples

High concurrency for API calls, database queries, or file operations:
const apiQueue = new Queue('api-requests');

// Process 20 API calls concurrently
apiQueue.process(20, async (job) => {
  const response = await fetch(job.data.url);
  return await response.json();
});

// Add multiple jobs
for (const url of urls) {
  await apiQueue.add({ url });
}

Named Processors with Concurrency

When using named processors, concurrency stacks up:
const loadBalancerQueue = new Queue('loadbalancer');

// WARNING: Total concurrency = 100 + 25 + 0 = 125!
loadBalancerQueue.process('requestProfile', 100, requestProfile);
loadBalancerQueue.process('sendEmail', 25, sendEmail);
loadBalancerQueue.process('sendInvitation', 0, sendInvite);
With multiple named processors in one queue, the concurrency for each processor stacks up. The queue can process up to the sum of all concurrency values simultaneously.

Better Approach: Separate Queues

// Recommended: Create separate queues for better control
const profileQueue = new Queue('profile');
profileQueue.process('requestProfile', 100, requestProfile);
// Max concurrency for this queue: 100

const emailQueue = new Queue('email');
emailQueue.process('sendEmail', 25, sendEmail);
// Max concurrency for this queue: 25

Using Wildcard Processors

Process all named jobs from one processor:
const differentJobsQueue = new Queue('differentJobsQueue');

// Process any named job with concurrency of 5
differentJobsQueue.process('*', 5, processFunction);

// Add different job types
await differentJobsQueue.add('jobA', { type: 'A' });
await differentJobsQueue.add('jobB', { type: 'B' });

Multiple Workers Pattern

Scale horizontally by running multiple worker processes:
const Queue = require('bull');
const cluster = require('cluster');

const numWorkers = 8;
const queue = new Queue('test concurrent queue');

if (cluster.isMaster) {
  // Master process: spawn workers
  for (let i = 0; i < numWorkers; i++) {
    cluster.fork();
  }

  cluster.on('online', function (worker) {
    // Add jobs for workers to process
    for (let i = 0; i < 500; i++) {
      queue.add({ foo: 'bar' });
    }
  });

  cluster.on('exit', function (worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  });
} else {
  // Worker process: process jobs
  queue.process(function (job, jobDone) {
    console.log('Job done by worker', cluster.worker.id, job.id);
    jobDone();
  });
}
This pattern creates 8 worker processes, each processing jobs from the same queue. Total effective concurrency: 8 (one job per worker at a time).

Concurrency with External Processors

Use sandboxed processors with concurrency:
// processor.js
module.exports = async function (job) {
  // Processor logic
  return await heavyComputation(job.data);
};
// main.js
const queue = new Queue('heavy-tasks');

// Process 5 jobs concurrently in separate processes
queue.process(5, '/path/to/processor.js');

// Named processor with concurrency
queue.process('compute', 5, '/path/to/processor.js');
Combining concurrency with sandboxed processors gives you the best of both worlds: parallel processing AND process isolation.

Determining Optimal Concurrency

1

Identify task type

I/O Bound: API calls, database queries, file operations
  • Start with higher concurrency (10-50)
CPU Bound: Video encoding, image processing, complex calculations
  • Start with concurrency = CPU cores (2-8)
2

Monitor resource usage

queue.on('active', (job) => {
  const activeCount = queue.getActiveCount();
  console.log(`Active jobs: ${activeCount}`);
  // Monitor: CPU usage, memory, response times
});
3

Adjust and test

Gradually increase concurrency while monitoring:
  • Error rates
  • Processing time
  • System resources
  • External API rate limits

Best Practices

Set Appropriate Concurrency

// Too low - underutilized resources
queue.process(1, processor);  // Sequential processing

// Optimal for I/O - good throughput
queue.process(20, processor);

// Too high - may overwhelm system or external APIs
queue.process(1000, processor);

Combine with Rate Limiting

const queue = new Queue('api-with-limits', {
  limiter: {
    max: 100,        // 100 requests
    duration: 1000   // Per second
  }
});

// Process 20 concurrent jobs, but respect rate limit
queue.process(20, async (job) => {
  return await apiCall(job.data);
});
Concurrency controls how many jobs process simultaneously, while rate limiting controls how many jobs process per time period. Use both for fine-grained control.

Handle Concurrency Errors

queue.process(10, async (job) => {
  try {
    return await processJob(job);
  } catch (error) {
    // Log which concurrent job failed
    console.error(`Job ${job.id} failed in concurrent batch:`, error);
    throw error;
  }
});

Monitoring Concurrent Jobs

const queue = new Queue('monitored');

queue.on('active', async () => {
  const activeCount = await queue.getActiveCount();
  const waitingCount = await queue.getWaitingCount();
  
  console.log(`Active: ${activeCount}, Waiting: ${waitingCount}`);
});

queue.on('completed', async (job) => {
  const activeCount = await queue.getActiveCount();
  console.log(`Job ${job.id} done, ${activeCount} still active`);
});

queue.process(5, processor);

Common Patterns

Dynamic Concurrency

let currentConcurrency = 5;

async function adjustConcurrency(newConcurrency) {
  // Close current processor
  await queue.close();
  
  // Restart with new concurrency
  currentConcurrency = newConcurrency;
  queue.process(currentConcurrency, processor);
}

// Adjust based on load
if (systemLoad < 0.5) {
  await adjustConcurrency(10);
} else {
  await adjustConcurrency(3);
}

Graceful Shutdown

const queue = new Queue('graceful');

queue.process(10, async (job) => {
  return await processJob(job);
});

process.on('SIGTERM', async () => {
  console.log('Shutting down gracefully...');
  
  // Wait for active jobs to complete
  await queue.whenCurrentJobsFinished();
  
  // Close queue
  await queue.close();
  
  process.exit(0);
});
With high concurrency, ensure your graceful shutdown waits for all active jobs to complete, otherwise jobs may fail mid-processing.
Start with conservative concurrency values and increase gradually based on monitoring and testing. It’s easier to scale up than to troubleshoot issues from over-concurrency.

Build docs developers (and LLMs) love