Jobs in Bull transition through a series of states from creation to completion. Understanding this lifecycle is crucial for building reliable queue-based systems.
From the source (~/workspace/source/lib/queue.js:46-54):
/** Delayed jobs are jobs that cannot be executed until a certain time in ms has passed since they were added to the queue. The mechanism is simple, a delayedTimestamp variable holds the next known timestamp that is on the delayed set (or MAX_TIMEOUT_MS if none). When the current job has finalized the variable is checked, if no delayed job has to be executed yet a setTimeout is set so that a delayed job is processed after timing out.*/
The queue maintains an internal timer that checks for due delayed jobs:From the source (~/workspace/source/lib/queue.js:991-1040):
Queue.prototype.updateDelayTimer = function() { if (this.closing) { return Promise.resolve(); } return scripts .updateDelaySet(this, Date.now()) .then(nextTimestamp => { this.delayedTimestamp = nextTimestamp ? nextTimestamp / 4096 : Number.MAX_VALUE; // Clear any existing update delay timer if (this.delayTimer) { clearTimeout(this.delayTimer); } // Delay for the next update of delay set const delay = _.min([ this.delayedTimestamp - Date.now(), this.settings.guardInterval ]); // Schedule next processing of the delayed jobs if (delay <= 0) { // Next set of jobs are due right now, process them also this.updateDelayTimer(); } else { // Update the delay set when the next job is due // or the next guard time this.delayTimer = setTimeout(() => this.updateDelayTimer(), delay); } return null; });};
From the source (~/workspace/source/lib/queue.js:1042-1086):
/** * Process jobs that have been added to the active list but are not being * processed properly. This can happen due to a process crash in the middle * of processing a job, leaving it in 'active' but without a job lock. */Queue.prototype.moveUnlockedJobsToWait = function() { if (this.closing) { return Promise.resolve(); } return scripts .moveUnlockedJobsToWait(this) .then(([failed, stalled]) => { const handleFailedJobs = failed.map(jobId => { return this.getJobFromId(jobId).then(job => { utils.emitSafe( this, 'failed', job, new Error('job stalled more than allowable limit'), 'active' ); return null; }); }); const handleStalledJobs = stalled.map(jobId => { return this.getJobFromId(jobId).then(job => { if (job !== null) { utils.emitSafe(this, 'stalled', job); } return null; }); }); return Promise.all(handleFailedJobs.concat(handleStalledJobs)); });};
const queue = new Queue('myqueue', { settings: { stalledInterval: 30000, // Check every 30 seconds maxStalledCount: 1 // Max 1 recovery attempt }});queue.on('stalled', (job) => { console.log(`Job ${job.id} stalled and will be reprocessed`);});
Always monitor stalled jobs in production! Frequent stalls indicate processor problems and can cause duplicate processing. Jobs that exceed maxStalledCount are permanently failed.