Skip to main content

Overview

Bull queues emit events throughout the job lifecycle, allowing you to monitor progress, handle completions, react to failures, and track queue state changes.

Local vs Global Events

Local Events

Local events fire only on the specific queue instance where a listener is registered:
const queue = new Queue('myqueue');

// This listener only fires for jobs processed by THIS worker
queue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

Global Events

Global events fire across all queue instances connected to the same Redis database. Prefix event names with global::
// This listener fires for jobs completed by ANY worker
queue.on('global:completed', (jobId, result) => {
  console.log(`Job ${jobId} completed with result:`, result);
});
Global events pass the job ID instead of the full job instance. Use Queue#getJob(jobId) to retrieve the job object if needed.
From the REFERENCE.md (~/workspace/source/REFERENCE.md:1118-1156):
// Local events pass the job instance...
queue.on('progress', function (job, progress) {
  console.log(`Job ${job.id} is ${progress * 100}% ready!`);
});

queue.on('completed', function (job, result) {
  console.log(`Job ${job.id} completed! Result: ${result}`);
  job.remove();
});

// ...whereas global events only pass the job ID:
queue.on('global:progress', function (jobId, progress) {
  console.log(`Job ${jobId} is ${progress * 100}% ready!`);
});

queue.on('global:completed', function (jobId, result) {
  console.log(`Job ${jobId} completed! Result: ${result}`);
  queue.getJob(jobId).then(function (job) {
    job.remove();
  });
});

Common Events

completed

Fires when a job successfully finishes:
queue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed!`);
  console.log('Result:', result);
});

// Global version
queue.on('global:completed', (jobId, result) => {
  console.log(`Job ${jobId} completed globally!`);
});

failed

Fires when a job fails after all retry attempts:
queue.on('failed', (job, err) => {
  console.log(`Job ${job.id} failed with error:`, err.message);
  console.log(`Attempts made: ${job.attemptsMade}`);
});

// Global version
queue.on('global:failed', (jobId, err) => {
  console.error(`Job ${jobId} failed:`, err);
});

progress

Fires when a job reports progress:
queue.process(async (job) => {
  for (let i = 0; i <= 100; i += 10) {
    await job.progress(i);
    await performWork();
  }
});

queue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} is ${progress}% complete`);
});

// Global version
queue.on('global:progress', (jobId, progress) => {
  console.log(`Job ${jobId} progress: ${progress}%`);
});

active

Fires when a job starts processing:
queue.on('active', (job, jobPromise) => {
  console.log(`Job ${job.id} is now active`);
  
  // You can cancel the job using jobPromise.cancel() if needed
});

// Global version
queue.on('global:active', (jobId) => {
  console.log(`Job ${jobId} started globally`);
});
From the source (~/workspace/source/lib/queue.js:1216-1217):
// Local event with jobPromise so that we can cancel job.
utils.emitSafe(this, 'active', job, jobPromise, 'waiting');

stalled

Fires when a job is detected as stalled (process crashed or event loop blocked):
queue.on('stalled', (job) => {
  console.log(`Job ${job.id} has stalled and will be reprocessed`);
  // Alert your monitoring system!
});

// Global version
queue.on('global:stalled', (jobId) => {
  console.warn(`Job ${jobId} stalled globally`);
});
Always monitor the stalled event in production. Frequent stalled jobs indicate problems with your processor (crashes or blocking code) and can lead to jobs being processed multiple times.

Queue State Events

waiting

Fires when a job is added to the queue:
queue.on('waiting', (jobId) => {
  console.log(`Job ${jobId} is waiting to be processed`);
});

paused

Fires when the queue is paused:
queue.on('paused', () => {
  console.log('Queue has been paused');
});

queue.on('global:paused', () => {
  console.log('Queue paused globally');
});

await queue.pause();

resumed

Fires when the queue is resumed:
queue.on('resumed', () => {
  console.log('Queue has been resumed');
});

queue.on('global:resumed', () => {
  console.log('Queue resumed globally');
});

await queue.resume();

drained

Fires when the queue has processed all waiting jobs:
queue.on('drained', () => {
  console.log('Queue is now empty');
});

queue.on('global:drained', () => {
  console.log('Queue drained globally');
});
The drained event fires even if there are delayed jobs still pending. It only indicates that the waiting queue is empty.

Additional Events

error

Fires when an error occurs in the queue:
queue.on('error', (error) => {
  console.error('Queue error:', error);
});

removed

Fires when a job is successfully removed:
queue.on('removed', (job) => {
  console.log(`Job ${job.id} was removed`);
});

cleaned

Fires after old jobs are cleaned from the queue:
queue.on('cleaned', (jobs, type) => {
  console.log(`Cleaned ${jobs.length} ${type} jobs`);
});

// Trigger cleanup
await queue.clean(5000, 'completed'); // Remove completed jobs older than 5 seconds

lock-extension-failed

Fires when a job fails to extend its lock (usually due to Redis connection issues):
queue.on('lock-extension-failed', (job, err) => {
  console.error(`Job ${job.id} failed to extend lock:`, err);
  // This may indicate Redis connection problems
});

Event Handling Patterns

Complete Job Monitoring

const queue = new Queue('tasks');

queue.on('waiting', (jobId) => {
  console.log(`[WAITING] Job ${jobId}`);
});

queue.on('active', (job) => {
  console.log(`[ACTIVE] Job ${job.id} started`);
});

queue.on('progress', (job, progress) => {
  console.log(`[PROGRESS] Job ${job.id}: ${progress}%`);
});

queue.on('completed', (job, result) => {
  console.log(`[COMPLETED] Job ${job.id}`, result);
});

queue.on('failed', (job, err) => {
  console.log(`[FAILED] Job ${job.id}:`, err.message);
});

queue.on('stalled', (job) => {
  console.log(`[STALLED] Job ${job.id}`);
});

Centralized Event Logging

const monitorQueue = (queue, queueName) => {
  const events = [
    'error',
    'waiting',
    'active', 
    'stalled',
    'progress',
    'completed',
    'failed',
    'paused',
    'resumed',
    'cleaned',
    'drained',
    'removed'
  ];
  
  events.forEach(event => {
    queue.on(event, (...args) => {
      logger.info(`[${queueName}] ${event}`, args);
    });
  });
};

monitorQueue(emailQueue, 'emails');
monitorQueue(videoQueue, 'videos');

Error Recovery

queue.on('failed', async (job, err) => {
  // Send to error tracking service
  await errorTracker.report(err, {
    jobId: job.id,
    jobData: job.data,
    attempts: job.attemptsMade
  });
  
  // Notify team
  if (job.attemptsMade >= job.opts.attempts) {
    await slack.notify(`Job ${job.id} permanently failed: ${err.message}`);
  }
});

queue.on('stalled', async (job) => {
  await slack.notify(`Job ${job.id} stalled - check worker health`);
});

Metrics Collection

const metrics = {
  completed: 0,
  failed: 0,
  active: 0,
  waiting: 0
};

queue.on('global:completed', () => {
  metrics.completed++;
  metrics.active--;
});

queue.on('global:failed', () => {
  metrics.failed++;
  metrics.active--;
});

queue.on('global:active', () => {
  metrics.active++;
  metrics.waiting--;
});

queue.on('global:waiting', () => {
  metrics.waiting++;
});

// Report metrics periodically
setInterval(() => {
  console.log('Queue metrics:', metrics);
}, 60000);

Graceful Shutdown

const gracefulShutdown = async () => {
  console.log('Shutting down gracefully...');
  
  await queue.pause(true); // Pause locally
  
  queue.on('drained', async () => {
    console.log('All active jobs completed');
    await queue.close();
    process.exit(0);
  });
  
  // Force exit after 30 seconds
  setTimeout(() => {
    console.log('Forcing shutdown');
    process.exit(1);
  }, 30000);
};

process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);

Implementation Details

From the source (~/workspace/source/lib/queue.js:404-521), Bull uses Redis pub/sub for global events:
Queue.prototype._setupQueueEventListeners = function() {
  const pmessageHandler = (pattern, channel, message) => {
    const keyAndToken = channel.split('@');
    const key = keyAndToken[0];
    const token = keyAndToken[1];
    switch (key) {
      case activeKey:
        utils.emitSafe(this, 'global:active', message, 'waiting');
        break;
      case waitingKey:
        if (this.token === token) {
          utils.emitSafe(this, 'waiting', message, null);
        }
        token && utils.emitSafe(this, 'global:waiting', message, null);
        break;
      // ...
    }
  };
  
  this.eclient.on('pmessage', pmessageHandler);
  this.eclient.on('message', messageHandler);
};

Next Steps

Job Lifecycle

See how events relate to job states

Processors

Learn how to process jobs that trigger events

Build docs developers (and LLMs) love