Overview
Bull queues emit events throughout the job lifecycle. Events can be either local (only for this queue instance) or global (across all queue instances via Redis pub/sub).
Local vs Global Events
Local Events
Local events fire only on the queue instance where they’re registered. They pass the full Job instance to listeners.
queue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result: ${result}`);
});
Global Events
Global events fire across all queue instances connected to the same Redis server. They pass the job ID instead of the full job instance.
queue.on('global:completed', (jobId, result) => {
console.log(`Job ${jobId} completed with result: ${result}`);
});
Prefix any event with 'global:' to listen globally across all workers and servers.
Accessing Job in Global Events
Since global events only provide the job ID, use Queue.getJob() to retrieve the full job:
queue.on('global:completed', async (jobId, result) => {
const job = await queue.getJob(jobId);
if (job) {
console.log(`Job ${job.name} completed`);
// Clean up if needed
await job.remove();
}
});
If removeOnComplete is enabled, the job may no longer exist when the global event fires. Use Job.remove() in the listener if you need both access and cleanup.
Queue Events
error
Fired when an error occurs in the queue.
queue.on('error', (error) => {
console.error('Queue error:', error);
});
Parameters:
error (Error): The error object
waiting
Fired when a job is waiting to be processed.
queue.on('waiting', (jobId) => {
console.log(`Job ${jobId} is waiting`);
});
Parameters:
jobId (string): ID of the waiting job
active
Fired when a job starts processing.
queue.on('active', (job, jobPromise) => {
console.log(`Job ${job.id} started`);
// Can cancel the job if needed
// jobPromise.cancel();
});
Parameters:
job (Job): The job instance
jobPromise (Promise): The promise representing job processing. Call cancel() to abort.
Global variant:
queue.on('global:active', (jobId) => {
console.log(`Job ${jobId} started`);
});
stalled
Fired when a job is marked as stalled (unlocked but still in active state).
queue.on('stalled', (job) => {
console.log(`Job ${job.id} has stalled`);
});
Parameters:
job (Job): The stalled job
Use case: Debugging workers that crash or pause the event loop.
Global variant:
queue.on('global:stalled', (jobId) => {
console.log(`Job ${jobId} has stalled`);
});
lock-extension-failed
Fired when a job fails to extend its lock.
queue.on('lock-extension-failed', (job, err) => {
console.error(`Job ${job.id} failed to extend lock:`, err);
});
Parameters:
job (Job): The job that failed to extend lock
err (Error): The error
Use case: Debugging Redis connection issues and jobs getting restarted.
progress
Fired when a job’s progress is updated.
queue.on('progress', (job, progress) => {
console.log(`Job ${job.id} is ${progress}% complete`);
});
Parameters:
job (Job): The job instance
progress (number | object): Progress value
Global variant:
queue.on('global:progress', (jobId, progress) => {
console.log(`Job ${jobId} is ${progress}% complete`);
});
completed
Fired when a job successfully completes.
queue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed`);
console.log(`Result:`, result);
});
Parameters:
job (Job): The completed job
result (any): The value returned by the processor
Global variant:
queue.on('global:completed', async (jobId, result) => {
console.log(`Job ${jobId} completed with:`, result);
// Retrieve full job if needed
const job = await queue.getJob(jobId);
if (job) {
await job.remove();
}
});
failed
Fired when a job fails.
queue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
console.log(`Attempts made: ${job.attemptsMade}`);
});
Parameters:
job (Job): The failed job
err (Error): The error that caused the failure
Global variant:
queue.on('global:failed', async (jobId, err) => {
console.error(`Job ${jobId} failed:`, err);
});
paused
Fired when the queue is paused.
queue.on('paused', () => {
console.log('Queue has been paused');
});
Global variant:
queue.on('global:paused', () => {
console.log('Queue paused globally');
});
resumed
Fired when the queue is resumed.
queue.on('resumed', () => {
console.log('Queue has been resumed');
});
Global variant:
queue.on('global:resumed', () => {
console.log('Queue resumed globally');
});
cleaned
Fired when old jobs are cleaned from the queue.
queue.on('cleaned', (jobs, type) => {
console.log(`Cleaned ${jobs.length} ${type} jobs`);
});
Parameters:
jobs (Job[]): Array of cleaned jobs
type (string): Type of jobs cleaned (‘completed’, ‘failed’, etc.)
Global variant:
queue.on('global:cleaned', (jobIds, type) => {
console.log(`Cleaned ${jobIds.length} ${type} jobs`);
});
drained
Fired when the queue has processed all waiting jobs (even if delayed jobs remain).
queue.on('drained', () => {
console.log('Queue is drained - all waiting jobs processed');
});
Global variant:
queue.on('global:drained', () => {
console.log('Queue drained globally');
});
removed
Fired when a job is successfully removed.
queue.on('removed', (job) => {
console.log(`Job ${job.id} was removed`);
});
Parameters:
job (Job): The removed job
Global variant:
queue.on('global:removed', (jobId) => {
console.log(`Job ${jobId} was removed`);
});
Common Patterns
Job Monitoring Dashboard
const stats = {
completed: 0,
failed: 0,
active: 0
};
queue.on('global:completed', () => {
stats.completed++;
});
queue.on('global:failed', () => {
stats.failed++;
});
queue.on('global:active', () => {
stats.active++;
});
app.get('/stats', (req, res) => {
res.json(stats);
});
Alert on High Failure Rate
let recentFails = 0;
let recentCompleted = 0;
queue.on('completed', () => {
recentCompleted++;
});
queue.on('failed', () => {
recentFails++;
});
setInterval(() => {
const total = recentFails + recentCompleted;
if (total > 0) {
const failRate = recentFails / total;
if (failRate > 0.1) {
console.error(`High failure rate: ${(failRate * 100).toFixed(1)}%`);
// Send alert
}
}
recentFails = 0;
recentCompleted = 0;
}, 60000); // Check every minute
Progress Tracking
const activeJobs = new Map();
queue.on('active', (job) => {
activeJobs.set(job.id, { startTime: Date.now(), progress: 0 });
});
queue.on('progress', (job, progress) => {
const jobInfo = activeJobs.get(job.id);
if (jobInfo) {
jobInfo.progress = progress;
}
});
queue.on('completed', (job) => {
activeJobs.delete(job.id);
});
queue.on('failed', (job) => {
activeJobs.delete(job.id);
});
// API endpoint to check progress
app.get('/jobs/:id/progress', (req, res) => {
const info = activeJobs.get(req.params.id);
res.json(info || { progress: 0 });
});
Automatic Job Retry with Delay
queue.on('failed', async (job, err) => {
if (job.attemptsMade < job.opts.attempts) {
console.log(`Job ${job.id} will be retried`);
} else {
console.error(`Job ${job.id} permanently failed:`, err.message);
// Move to dead letter queue
await deadLetterQueue.add(job.data, {
jobId: job.id,
originalError: err.message
});
}
});
Log All Events
const events = [
'error', 'waiting', 'active', 'stalled', 'progress',
'completed', 'failed', 'paused', 'resumed', 'cleaned',
'drained', 'removed'
];
events.forEach(event => {
queue.on(event, (...args) => {
console.log(`[${event}]`, ...args);
});
});
Global Event Aggregation
// Monitor all workers across all servers
const globalStats = {
totalProcessed: 0,
totalFailed: 0
};
queue.on('global:completed', (jobId, result) => {
globalStats.totalProcessed++;
console.log(`Total processed across all workers: ${globalStats.totalProcessed}`);
});
queue.on('global:failed', (jobId, err) => {
globalStats.totalFailed++;
console.log(`Total failed across all workers: ${globalStats.totalFailed}`);
});
Cleanup Completed Jobs in Global Event
queue.on('global:completed', async (jobId, result) => {
// Only remove if not using removeOnComplete
try {
const job = await queue.getJob(jobId);
if (job) {
// Log result to database
await logJobResult(jobId, result);
// Then remove
await job.remove();
}
} catch (err) {
console.error(`Failed to cleanup job ${jobId}:`, err);
}
});
Circuit Breaker Pattern
let consecutiveFailures = 0;
const FAILURE_THRESHOLD = 5;
queue.on('failed', async (job, err) => {
consecutiveFailures++;
if (consecutiveFailures >= FAILURE_THRESHOLD) {
console.error('Circuit breaker triggered - pausing queue');
await queue.pause();
// Resume after cooldown
setTimeout(async () => {
console.log('Circuit breaker reset - resuming queue');
consecutiveFailures = 0;
await queue.resume();
}, 60000); // 1 minute cooldown
}
});
queue.on('completed', () => {
consecutiveFailures = 0; // Reset on success
});
Event Reference
| Event | Local Params | Global Params | Description |
|---|
error | (error) | N/A | Queue error occurred |
waiting | (jobId) | N/A | Job is waiting |
active | (job, jobPromise) | (jobId) | Job started processing |
stalled | (job) | (jobId) | Job is stalled |
lock-extension-failed | (job, err) | N/A | Failed to extend lock |
progress | (job, progress) | (jobId, progress) | Job progress updated |
completed | (job, result) | (jobId, result) | Job completed |
failed | (job, err) | (jobId, err) | Job failed |
paused | () | () | Queue paused |
resumed | () | () | Queue resumed |
cleaned | (jobs, type) | (jobIds, type) | Jobs cleaned |
drained | () | () | Queue drained |
removed | (job) | (jobId) | Job removed |