Skip to main content

Overview

Bull queues emit events throughout the job lifecycle. Events can be either local (only for this queue instance) or global (across all queue instances via Redis pub/sub).

Local vs Global Events

Local Events

Local events fire only on the queue instance where they’re registered. They pass the full Job instance to listeners.
queue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result: ${result}`);
});

Global Events

Global events fire across all queue instances connected to the same Redis server. They pass the job ID instead of the full job instance.
queue.on('global:completed', (jobId, result) => {
  console.log(`Job ${jobId} completed with result: ${result}`);
});
Prefix any event with 'global:' to listen globally across all workers and servers.

Accessing Job in Global Events

Since global events only provide the job ID, use Queue.getJob() to retrieve the full job:
queue.on('global:completed', async (jobId, result) => {
  const job = await queue.getJob(jobId);
  
  if (job) {
    console.log(`Job ${job.name} completed`);
    // Clean up if needed
    await job.remove();
  }
});
If removeOnComplete is enabled, the job may no longer exist when the global event fires. Use Job.remove() in the listener if you need both access and cleanup.

Queue Events

error

Fired when an error occurs in the queue.
queue.on('error', (error) => {
  console.error('Queue error:', error);
});
Parameters:
  • error (Error): The error object

waiting

Fired when a job is waiting to be processed.
queue.on('waiting', (jobId) => {
  console.log(`Job ${jobId} is waiting`);
});
Parameters:
  • jobId (string): ID of the waiting job

active

Fired when a job starts processing.
queue.on('active', (job, jobPromise) => {
  console.log(`Job ${job.id} started`);
  
  // Can cancel the job if needed
  // jobPromise.cancel();
});
Parameters:
  • job (Job): The job instance
  • jobPromise (Promise): The promise representing job processing. Call cancel() to abort.
Global variant:
queue.on('global:active', (jobId) => {
  console.log(`Job ${jobId} started`);
});

stalled

Fired when a job is marked as stalled (unlocked but still in active state).
queue.on('stalled', (job) => {
  console.log(`Job ${job.id} has stalled`);
});
Parameters:
  • job (Job): The stalled job
Use case: Debugging workers that crash or pause the event loop. Global variant:
queue.on('global:stalled', (jobId) => {
  console.log(`Job ${jobId} has stalled`);
});

lock-extension-failed

Fired when a job fails to extend its lock.
queue.on('lock-extension-failed', (job, err) => {
  console.error(`Job ${job.id} failed to extend lock:`, err);
});
Parameters:
  • job (Job): The job that failed to extend lock
  • err (Error): The error
Use case: Debugging Redis connection issues and jobs getting restarted.

progress

Fired when a job’s progress is updated.
queue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} is ${progress}% complete`);
});
Parameters:
  • job (Job): The job instance
  • progress (number | object): Progress value
Global variant:
queue.on('global:progress', (jobId, progress) => {
  console.log(`Job ${jobId} is ${progress}% complete`);
});

completed

Fired when a job successfully completes.
queue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed`);
  console.log(`Result:`, result);
});
Parameters:
  • job (Job): The completed job
  • result (any): The value returned by the processor
Global variant:
queue.on('global:completed', async (jobId, result) => {
  console.log(`Job ${jobId} completed with:`, result);
  
  // Retrieve full job if needed
  const job = await queue.getJob(jobId);
  if (job) {
    await job.remove();
  }
});

failed

Fired when a job fails.
queue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed:`, err.message);
  console.log(`Attempts made: ${job.attemptsMade}`);
});
Parameters:
  • job (Job): The failed job
  • err (Error): The error that caused the failure
Global variant:
queue.on('global:failed', async (jobId, err) => {
  console.error(`Job ${jobId} failed:`, err);
});

paused

Fired when the queue is paused.
queue.on('paused', () => {
  console.log('Queue has been paused');
});
Global variant:
queue.on('global:paused', () => {
  console.log('Queue paused globally');
});

resumed

Fired when the queue is resumed.
queue.on('resumed', () => {
  console.log('Queue has been resumed');
});
Global variant:
queue.on('global:resumed', () => {
  console.log('Queue resumed globally');
});

cleaned

Fired when old jobs are cleaned from the queue.
queue.on('cleaned', (jobs, type) => {
  console.log(`Cleaned ${jobs.length} ${type} jobs`);
});
Parameters:
  • jobs (Job[]): Array of cleaned jobs
  • type (string): Type of jobs cleaned (‘completed’, ‘failed’, etc.)
Global variant:
queue.on('global:cleaned', (jobIds, type) => {
  console.log(`Cleaned ${jobIds.length} ${type} jobs`);
});

drained

Fired when the queue has processed all waiting jobs (even if delayed jobs remain).
queue.on('drained', () => {
  console.log('Queue is drained - all waiting jobs processed');
});
Global variant:
queue.on('global:drained', () => {
  console.log('Queue drained globally');
});

removed

Fired when a job is successfully removed.
queue.on('removed', (job) => {
  console.log(`Job ${job.id} was removed`);
});
Parameters:
  • job (Job): The removed job
Global variant:
queue.on('global:removed', (jobId) => {
  console.log(`Job ${jobId} was removed`);
});

Common Patterns

Job Monitoring Dashboard

const stats = {
  completed: 0,
  failed: 0,
  active: 0
};

queue.on('global:completed', () => {
  stats.completed++;
});

queue.on('global:failed', () => {
  stats.failed++;
});

queue.on('global:active', () => {
  stats.active++;
});

app.get('/stats', (req, res) => {
  res.json(stats);
});

Alert on High Failure Rate

let recentFails = 0;
let recentCompleted = 0;

queue.on('completed', () => {
  recentCompleted++;
});

queue.on('failed', () => {
  recentFails++;
});

setInterval(() => {
  const total = recentFails + recentCompleted;
  
  if (total > 0) {
    const failRate = recentFails / total;
    
    if (failRate > 0.1) {
      console.error(`High failure rate: ${(failRate * 100).toFixed(1)}%`);
      // Send alert
    }
  }
  
  recentFails = 0;
  recentCompleted = 0;
}, 60000); // Check every minute

Progress Tracking

const activeJobs = new Map();

queue.on('active', (job) => {
  activeJobs.set(job.id, { startTime: Date.now(), progress: 0 });
});

queue.on('progress', (job, progress) => {
  const jobInfo = activeJobs.get(job.id);
  if (jobInfo) {
    jobInfo.progress = progress;
  }
});

queue.on('completed', (job) => {
  activeJobs.delete(job.id);
});

queue.on('failed', (job) => {
  activeJobs.delete(job.id);
});

// API endpoint to check progress
app.get('/jobs/:id/progress', (req, res) => {
  const info = activeJobs.get(req.params.id);
  res.json(info || { progress: 0 });
});

Automatic Job Retry with Delay

queue.on('failed', async (job, err) => {
  if (job.attemptsMade < job.opts.attempts) {
    console.log(`Job ${job.id} will be retried`);
  } else {
    console.error(`Job ${job.id} permanently failed:`, err.message);
    
    // Move to dead letter queue
    await deadLetterQueue.add(job.data, {
      jobId: job.id,
      originalError: err.message
    });
  }
});

Log All Events

const events = [
  'error', 'waiting', 'active', 'stalled', 'progress',
  'completed', 'failed', 'paused', 'resumed', 'cleaned',
  'drained', 'removed'
];

events.forEach(event => {
  queue.on(event, (...args) => {
    console.log(`[${event}]`, ...args);
  });
});

Global Event Aggregation

// Monitor all workers across all servers
const globalStats = {
  totalProcessed: 0,
  totalFailed: 0
};

queue.on('global:completed', (jobId, result) => {
  globalStats.totalProcessed++;
  console.log(`Total processed across all workers: ${globalStats.totalProcessed}`);
});

queue.on('global:failed', (jobId, err) => {
  globalStats.totalFailed++;
  console.log(`Total failed across all workers: ${globalStats.totalFailed}`);
});

Cleanup Completed Jobs in Global Event

queue.on('global:completed', async (jobId, result) => {
  // Only remove if not using removeOnComplete
  try {
    const job = await queue.getJob(jobId);
    if (job) {
      // Log result to database
      await logJobResult(jobId, result);
      
      // Then remove
      await job.remove();
    }
  } catch (err) {
    console.error(`Failed to cleanup job ${jobId}:`, err);
  }
});

Circuit Breaker Pattern

let consecutiveFailures = 0;
const FAILURE_THRESHOLD = 5;

queue.on('failed', async (job, err) => {
  consecutiveFailures++;
  
  if (consecutiveFailures >= FAILURE_THRESHOLD) {
    console.error('Circuit breaker triggered - pausing queue');
    await queue.pause();
    
    // Resume after cooldown
    setTimeout(async () => {
      console.log('Circuit breaker reset - resuming queue');
      consecutiveFailures = 0;
      await queue.resume();
    }, 60000); // 1 minute cooldown
  }
});

queue.on('completed', () => {
  consecutiveFailures = 0; // Reset on success
});

Event Reference

EventLocal ParamsGlobal ParamsDescription
error(error)N/AQueue error occurred
waiting(jobId)N/AJob is waiting
active(job, jobPromise)(jobId)Job started processing
stalled(job)(jobId)Job is stalled
lock-extension-failed(job, err)N/AFailed to extend lock
progress(job, progress)(jobId, progress)Job progress updated
completed(job, result)(jobId, result)Job completed
failed(job, err)(jobId, err)Job failed
paused()()Queue paused
resumed()()Queue resumed
cleaned(jobs, type)(jobIds, type)Jobs cleaned
drained()()Queue drained
removed(job)(jobId)Job removed

Build docs developers (and LLMs) love