Concurrency control allows you to process multiple jobs simultaneously, dramatically improving throughput for I/O-bound operations. Bull provides flexible concurrency options at both the queue and processor level.
Basic Concurrency
Specify the concurrency parameter in the process() method:
const Queue = require('bull');
const imageQueue = new Queue('image-processing');
// Process 5 jobs concurrently
imageQueue.process(5, async (job) => {
return await processImage(job.data.imageUrl);
});
When no concurrency is specified, it defaults to 1 (sequential processing).
Process Method Signatures
Bull recognizes different process() call patterns:
// Process one job at a time
process(processor: Function)
// Process with concurrency
process(concurrency: number, processor: Function)
// Process named jobs
process(name: string, processor: Function)
// Process named jobs with concurrency
process(name: string, concurrency: number, processor: Function)
// Process using external file
process(concurrency: number, processorFile: string)
Concurrency Examples
I/O Bound Tasks
CPU Bound Tasks
Mixed Workloads
High concurrency for API calls, database queries, or file operations:const apiQueue = new Queue('api-requests');
// Process 20 API calls concurrently
apiQueue.process(20, async (job) => {
const response = await fetch(job.data.url);
return await response.json();
});
// Add multiple jobs
for (const url of urls) {
await apiQueue.add({ url });
}
Lower concurrency for CPU-intensive operations:const videoQueue = new Queue('video-encoding');
// Process 2 videos at a time (CPU intensive)
videoQueue.process(2, async (job) => {
return await encodeVideo(job.data.inputPath, job.data.outputPath);
});
For CPU-bound tasks, set concurrency close to the number of CPU cores to avoid overwhelming the system.
Different concurrency for different job types:const queue = new Queue('mixed-tasks');
// Light tasks - high concurrency
queue.process('email', 10, async (job) => {
await sendEmail(job.data);
});
// Heavy tasks - low concurrency
queue.process('report', 2, async (job) => {
await generateReport(job.data);
});
// Add named jobs
await queue.add('email', { to: '[email protected]' });
await queue.add('report', { type: 'monthly' });
Named Processors with Concurrency
When using named processors, concurrency stacks up:
const loadBalancerQueue = new Queue('loadbalancer');
// WARNING: Total concurrency = 100 + 25 + 0 = 125!
loadBalancerQueue.process('requestProfile', 100, requestProfile);
loadBalancerQueue.process('sendEmail', 25, sendEmail);
loadBalancerQueue.process('sendInvitation', 0, sendInvite);
With multiple named processors in one queue, the concurrency for each processor stacks up. The queue can process up to the sum of all concurrency values simultaneously.
Better Approach: Separate Queues
// Recommended: Create separate queues for better control
const profileQueue = new Queue('profile');
profileQueue.process('requestProfile', 100, requestProfile);
// Max concurrency for this queue: 100
const emailQueue = new Queue('email');
emailQueue.process('sendEmail', 25, sendEmail);
// Max concurrency for this queue: 25
Using Wildcard Processors
Process all named jobs from one processor:
const differentJobsQueue = new Queue('differentJobsQueue');
// Process any named job with concurrency of 5
differentJobsQueue.process('*', 5, processFunction);
// Add different job types
await differentJobsQueue.add('jobA', { type: 'A' });
await differentJobsQueue.add('jobB', { type: 'B' });
Multiple Workers Pattern
Scale horizontally by running multiple worker processes:
const Queue = require('bull');
const cluster = require('cluster');
const numWorkers = 8;
const queue = new Queue('test concurrent queue');
if (cluster.isMaster) {
// Master process: spawn workers
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('online', function (worker) {
// Add jobs for workers to process
for (let i = 0; i < 500; i++) {
queue.add({ foo: 'bar' });
}
});
cluster.on('exit', function (worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
// Worker process: process jobs
queue.process(function (job, jobDone) {
console.log('Job done by worker', cluster.worker.id, job.id);
jobDone();
});
}
This pattern creates 8 worker processes, each processing jobs from the same queue. Total effective concurrency: 8 (one job per worker at a time).
Concurrency with External Processors
Use sandboxed processors with concurrency:
// processor.js
module.exports = async function (job) {
// Processor logic
return await heavyComputation(job.data);
};
// main.js
const queue = new Queue('heavy-tasks');
// Process 5 jobs concurrently in separate processes
queue.process(5, '/path/to/processor.js');
// Named processor with concurrency
queue.process('compute', 5, '/path/to/processor.js');
Combining concurrency with sandboxed processors gives you the best of both worlds: parallel processing AND process isolation.
Determining Optimal Concurrency
Identify task type
I/O Bound: API calls, database queries, file operations
- Start with higher concurrency (10-50)
CPU Bound: Video encoding, image processing, complex calculations
- Start with concurrency = CPU cores (2-8)
Monitor resource usage
queue.on('active', (job) => {
const activeCount = queue.getActiveCount();
console.log(`Active jobs: ${activeCount}`);
// Monitor: CPU usage, memory, response times
});
Adjust and test
Gradually increase concurrency while monitoring:
- Error rates
- Processing time
- System resources
- External API rate limits
Best Practices
Set Appropriate Concurrency
// Too low - underutilized resources
queue.process(1, processor); // Sequential processing
// Optimal for I/O - good throughput
queue.process(20, processor);
// Too high - may overwhelm system or external APIs
queue.process(1000, processor);
Combine with Rate Limiting
const queue = new Queue('api-with-limits', {
limiter: {
max: 100, // 100 requests
duration: 1000 // Per second
}
});
// Process 20 concurrent jobs, but respect rate limit
queue.process(20, async (job) => {
return await apiCall(job.data);
});
Concurrency controls how many jobs process simultaneously, while rate limiting controls how many jobs process per time period. Use both for fine-grained control.
Handle Concurrency Errors
queue.process(10, async (job) => {
try {
return await processJob(job);
} catch (error) {
// Log which concurrent job failed
console.error(`Job ${job.id} failed in concurrent batch:`, error);
throw error;
}
});
Monitoring Concurrent Jobs
const queue = new Queue('monitored');
queue.on('active', async () => {
const activeCount = await queue.getActiveCount();
const waitingCount = await queue.getWaitingCount();
console.log(`Active: ${activeCount}, Waiting: ${waitingCount}`);
});
queue.on('completed', async (job) => {
const activeCount = await queue.getActiveCount();
console.log(`Job ${job.id} done, ${activeCount} still active`);
});
queue.process(5, processor);
Common Patterns
Dynamic Concurrency
let currentConcurrency = 5;
async function adjustConcurrency(newConcurrency) {
// Close current processor
await queue.close();
// Restart with new concurrency
currentConcurrency = newConcurrency;
queue.process(currentConcurrency, processor);
}
// Adjust based on load
if (systemLoad < 0.5) {
await adjustConcurrency(10);
} else {
await adjustConcurrency(3);
}
Graceful Shutdown
const queue = new Queue('graceful');
queue.process(10, async (job) => {
return await processJob(job);
});
process.on('SIGTERM', async () => {
console.log('Shutting down gracefully...');
// Wait for active jobs to complete
await queue.whenCurrentJobsFinished();
// Close queue
await queue.close();
process.exit(0);
});
With high concurrency, ensure your graceful shutdown waits for all active jobs to complete, otherwise jobs may fail mid-processing.
Start with conservative concurrency values and increase gradually based on monitoring and testing. It’s easier to scale up than to troubleshoot issues from over-concurrency.