Enable Debug Logging
Bull uses Node.js’s built-inutil.debuglog for debug output. Enable it with the NODE_DEBUG environment variable:
NODE_DEBUG=bull node your-app.js
BULL 12345: myqueue queue ready
BULL 12345: Processing job 1
BULL 12345: Extending lock for job 1
BULL 12345: Lock extended successfully for job 1
BULL 12345: Job 1 completed
BULL 12345: Moving job 2 to active
Debug Specific Queues
The debug output shows the process ID and queue name, making it easy to filter:# All Bull debug output
NODE_DEBUG=bull node app.js
# Filter for specific queue in logs
NODE_DEBUG=bull node app.js 2>&1 | grep "myqueue"
Common Issues and Solutions
1. Jobs Not Processing
Symptoms:- Jobs stay in
waitingstate - Workers appear idle
- No jobs moving to
active
Queue is Paused
const isPaused = await queue.isPaused();
if (isPaused) {
console.log('Queue is paused!');
await queue.resume();
}
No Processor Defined
// Make sure you've called queue.process()
queue.process(async (job) => {
return processJob(job.data);
});
Redis Connection Issues
queue.on('error', (error) => {
console.error('Queue error:', error);
});
queue.client.on('error', (error) => {
console.error('Redis client error:', error);
});
// Check connection
try {
await queue.client.ping();
console.log('Redis connection OK');
} catch (err) {
console.error('Redis connection failed:', err);
}
Wrong Job Name
// Processor for named jobs
queue.process('send-email', async (job) => { ... });
// But job added with different name
queue.add('sendEmail', data); // Won't match!
// Solution: Use same name or wildcard processor
queue.process('*', async (job) => {
// Processes all job types
});
2. Lock Extension Failures
Symptoms:lock-extension-failedevents- Jobs becoming stalled
- Logs show lock renewal errors
queue.on('lock-extension-failed', (job, err) => {
console.error(`Lock extension failed for job ${job.id}`);
console.error('Error:', err.message);
console.error('Job data:', job.data);
console.error('Attempts made:', job.attemptsMade);
});
Redis Connection Latency
// Increase lock duration to account for latency
const queue = new Queue('myqueue', {
settings: {
lockDuration: 60000, // Increase from default 30000
lockRenewTime: 30000
}
});
CPU-Intensive Processing
// Use sandboxed processor to avoid blocking
queue.process('/path/to/processor.js');
// Or break work into smaller chunks
queue.process(async (job) => {
for (const item of job.data.items) {
await processItem(item); // await allows event loop to run
}
});
Network Issues
// Monitor Redis connection
queue.client.on('reconnecting', () => {
console.warn('Redis reconnecting...');
});
queue.client.on('end', () => {
console.error('Redis connection ended');
});
3. Jobs Stuck in Active State
Symptoms:- Jobs remain in
activestate indefinitely getActive()returns jobs that should be complete
// Check active jobs
const activeJobs = await queue.getActive();
console.log('Active jobs:', activeJobs.length);
activeJobs.forEach(job => {
console.log(`Job ${job.id}:`, {
name: job.name,
processedOn: new Date(job.processedOn),
age: Date.now() - job.processedOn,
data: job.data
});
});
Force Move Stalled Jobs
// Trigger stalled job check manually
await queue.moveUnlockedJobsToWait();
Check for Unhandled Promise Rejections
queue.process(async (job) => {
try {
return await processJob(job.data);
} catch (err) {
// Must throw or return rejected promise
throw err;
}
});
// Add global handler
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled rejection:', reason);
});
Ensure Processor Returns/Throws
// BAD: Doesn't return or throw
queue.process(async (job) => {
processJob(job.data); // Missing return!
});
// GOOD: Returns promise
queue.process(async (job) => {
return processJob(job.data);
});
4. Memory Leaks
Symptoms:- Memory usage grows over time
- Process eventually crashes
// Monitor memory usage
setInterval(() => {
const used = process.memoryUsage();
console.log('Memory usage:', {
rss: `${Math.round(used.rss / 1024 / 1024)}MB`,
heapTotal: `${Math.round(used.heapTotal / 1024 / 1024)}MB`,
heapUsed: `${Math.round(used.heapUsed / 1024 / 1024)}MB`,
external: `${Math.round(used.external / 1024 / 1024)}MB`
});
}, 30000);
Not Removing Completed Jobs
// Jobs accumulate in Redis
queue.add(data); // Default: keeps all completed jobs
// Solution: Enable auto-removal
queue.add(data, {
removeOnComplete: true,
removeOnFail: true
});
// Or set default for all jobs
const queue = new Queue('myqueue', {
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: 50 // Keep last 50 failed jobs
}
});
Event Listener Leaks
// BAD: Adds new listener on each job
queue.process(async (job) => {
queue.on('completed', () => { ... }); // Memory leak!
});
// GOOD: Add listener once
queue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed`);
});
queue.process(async (job) => {
return processJob(job.data);
});
Large Job Data
// Monitor job data size
queue.on('completed', (job) => {
const dataSize = JSON.stringify(job.data).length;
if (dataSize > 100000) { // 100KB
console.warn(`Large job data: ${dataSize} bytes`);
}
});
// Store large data externally
const dataId = await storage.save(largeData);
queue.add({ dataId }, { removeOnComplete: true });
5. High Redis CPU Usage
Debug:# Monitor Redis CPU
redis-cli INFO | grep cpu
# Monitor slow queries
redis-cli SLOWLOG GET 10
Reduce Stalled Job Checks
const queue = new Queue('myqueue', {
settings: {
stalledInterval: 60000 // Check less frequently (default: 30000)
}
});
Reduce Guard Interval
const queue = new Queue('myqueue', {
settings: {
guardInterval: 30000 // For delayed jobs (default: 5000)
}
});
Limit Concurrent Stalled Checks
With multiple workers:// Worker 1
const queue1 = new Queue('shared', {
settings: { stalledInterval: 30000 }
});
// Worker 2 - disable stalled check
const queue2 = new Queue('shared', {
settings: { stalledInterval: 0 } // Disabled
});
6. Job Timeouts
Debug:queue.on('failed', (job, err) => {
if (err.message.includes('timeout')) {
console.error(`Job ${job.id} timed out after ${job.opts.timeout}ms`);
console.error('Job data:', job.data);
console.error('Processed for:', Date.now() - job.processedOn, 'ms');
}
});
// Increase timeout
queue.add(data, {
timeout: 60000 // 60 seconds
});
// Add timeout handling in processor
queue.process(async (job) => {
const timeout = job.opts.timeout || 30000;
const checkInterval = 1000;
const timer = setInterval(async () => {
const state = await job.getState();
if (state === 'failed') {
clearInterval(timer);
throw new Error('Job timed out');
}
}, checkInterval);
try {
const result = await processJob(job.data);
clearInterval(timer);
return result;
} catch (err) {
clearInterval(timer);
throw err;
}
});
Debugging Tools
Inspect Queue State
async function debugQueue(queue) {
const [counts, workers, isPaused] = await Promise.all([
queue.getJobCounts(),
queue.getWorkers(),
queue.isPaused()
]);
console.log('Queue Debug Info:');
console.log(' Job Counts:', counts);
console.log(' Active Workers:', workers.length);
console.log(' Paused:', isPaused);
// Check for stuck jobs
if (counts.active > 0) {
const active = await queue.getActive();
const now = Date.now();
active.forEach(job => {
const age = now - job.processedOn;
if (age > 60000) { // Older than 1 minute
console.warn(` Stuck job ${job.id}: ${age}ms old`);
}
});
}
}
setInterval(() => debugQueue(queue), 60000);
Monitor Events
function setupQueueMonitoring(queue) {
const events = [
'error', 'waiting', 'active', 'stalled', 'progress',
'completed', 'failed', 'paused', 'resumed', 'cleaned',
'drained', 'removed', 'lock-extension-failed'
];
events.forEach(event => {
queue.on(event, (...args) => {
console.log(`[${new Date().toISOString()}] ${event}:`, args);
});
});
}
setupQueueMonitoring(queue);
Check Job Details
async function inspectJob(queue, jobId) {
const job = await queue.getJob(jobId);
if (!job) {
console.log('Job not found');
return;
}
const state = await job.getState();
const logs = await job.getLogs();
console.log('Job Details:');
console.log(' ID:', job.id);
console.log(' Name:', job.name);
console.log(' State:', state);
console.log(' Data:', job.data);
console.log(' Options:', job.opts);
console.log(' Progress:', job.progress);
console.log(' Attempts:', `${job.attemptsMade}/${job.opts.attempts}`);
console.log(' Timestamps:', {
created: new Date(job.timestamp),
processedOn: job.processedOn ? new Date(job.processedOn) : null,
finishedOn: job.finishedOn ? new Date(job.finishedOn) : null
});
if (state === 'failed') {
console.log(' Failed Reason:', job.failedReason);
console.log(' Stack Trace:', job.stacktrace);
}
if (state === 'completed') {
console.log(' Return Value:', job.returnvalue);
}
console.log(' Logs:', logs);
}
// Usage
await inspectJob(queue, '12345');
Production Debugging Setup
const Queue = require('bull');
const logger = require('./logger'); // Your logger (e.g., winston)
const queue = new Queue('production', {
redis: process.env.REDIS_URL,
settings: {
lockDuration: 45000,
stalledInterval: 30000
}
});
// Error tracking
queue.on('error', (error) => {
logger.error('Queue error', { error: error.message, stack: error.stack });
});
queue.on('failed', (job, err) => {
logger.error('Job failed', {
jobId: job.id,
jobName: job.name,
error: err.message,
attemptsMade: job.attemptsMade,
data: job.data
});
});
queue.on('stalled', (job) => {
logger.warn('Job stalled', {
jobId: job.id,
jobName: job.name,
attemptsMade: job.attemptsMade
});
});
queue.on('lock-extension-failed', (job, err) => {
logger.error('Lock extension failed', {
jobId: job.id,
error: err.message
});
});
// Redis connection monitoring
queue.client.on('error', (err) => {
logger.error('Redis client error', { error: err.message });
});
queue.client.on('reconnecting', () => {
logger.warn('Redis reconnecting');
});
// Performance monitoring
queue.on('completed', (job, result) => {
const duration = Date.now() - job.processedOn;
if (duration > 30000) { // Longer than 30 seconds
logger.warn('Slow job detected', {
jobId: job.id,
duration,
jobName: job.name
});
}
});
// Health check endpoint
const express = require('express');
const app = express();
app.get('/health', async (req, res) => {
try {
const [counts, workers, isPaused] = await Promise.all([
queue.getJobCounts(),
queue.getWorkers(),
queue.isPaused()
]);
res.json({
status: 'healthy',
queue: {
counts,
workers: workers.length,
paused: isPaused
}
});
} catch (err) {
res.status(500).json({
status: 'unhealthy',
error: err.message
});
}
});
app.listen(3000);