A job becomes “stalled” when it loses its lock while being processed. Understanding stalled jobs is crucial for building reliable queue systems.
What is a Stalled Job?
Bull uses an “at least once” processing strategy. When a worker processes a job, it acquires a lock to prevent other workers from processing the same job. If this lock expires before the job completes, the job is considered stalled and may be reprocessed.
Stalled jobs will be double processed. Always listen for the stalled event and log it to your error monitoring system.
How Job Locking Works
When a worker starts processing a job:
- A lock is created for
lockDuration milliseconds
- The lock is automatically renewed every
lockRenewTime milliseconds
- If
lockDuration elapses before renewal, the job is marked as stalled
- Another worker can pick up the stalled job and restart it
const queue = new Queue('myqueue', {
settings: {
lockDuration: 30000, // Lock expires after 30 seconds
lockRenewTime: 15000 // Renew lock every 15 seconds
}
});
Common Causes of Stalling
1. Process Crash
The most legitimate cause - the Node process running your worker crashes unexpectedly.
queue.process(async (job) => {
// Process crashes here
process.exit(1);
// Job becomes stalled and will be retried
});
2. CPU-Intensive Processing
Blocking the event loop prevents Bull from renewing locks.
// BAD: Blocks event loop
queue.process(async (job) => {
// This blocks for 60 seconds
const result = heavySyncComputation(job.data);
return result;
});
Solution 1: Break into smaller chunks
// GOOD: Allow event loop to breathe
queue.process(async (job) => {
const data = job.data.items;
const results = [];
for (let i = 0; i < data.length; i++) {
results.push(await processItem(data[i]));
// await allows event loop to run, including lock renewal
}
return results;
});
Solution 2: Increase lock duration
const queue = new Queue('cpu-intensive', {
settings: {
lockDuration: 120000, // 2 minutes for heavy processing
lockRenewTime: 60000 // Renew every minute
}
});
Solution 3: Use sandboxed processor
// Runs in separate process, won't block main event loop
queue.process('/path/to/processor.js');
3. Infinite Loops or Hangs
Jobs that never complete will eventually stall.
// BAD: Might hang forever
queue.process(async (job) => {
while (true) {
if (await checkCondition()) break;
// If condition never met, job hangs
}
});
Solution: Use timeouts
queue.add(data, {
timeout: 60000 // Fail after 60 seconds
});
queue.process(async (job) => {
// Will be marked as failed if takes longer than 60 seconds
return await longRunningTask(job.data);
});
4. Redis Connection Issues
Network problems can prevent lock renewal.
queue.on('lock-extension-failed', (job, err) => {
console.error(`Failed to extend lock for job ${job.id}:`, err);
// Log to monitoring system - indicates Redis connectivity issues
});
Stalled Job Configuration
lockDuration
Time in milliseconds before a job lock expires.const queue = new Queue('myqueue', {
settings: {
lockDuration: 45000 // 45 seconds
}
});
When to increase:
- CPU-intensive jobs that may block event loop
- Jobs that take longer than 30 seconds
- High network latency to Redis
When to decrease:
- Time-sensitive jobs where faster recovery is important
- You can tolerate double processing
lockRenewTime
lockRenewTime
number
default:"lockDuration / 2"
Interval for renewing job locks. Should be less than lockDuration.const queue = new Queue('myqueue', {
settings: {
lockDuration: 60000,
lockRenewTime: 30000 // Renew at 50% of lock duration
}
});
Never set lockRenewTime larger than lockDuration.
stalledInterval
How often to check for stalled jobs (in milliseconds). Set to 0 to disable.const queue = new Queue('myqueue', {
settings: {
stalledInterval: 10000 // Check every 10 seconds
}
});
Tradeoffs:
- Lower value: Faster recovery but higher Redis CPU usage
- Higher value: Lower overhead but slower recovery
Each worker checks independently, so actual check frequency is higher than this value.
maxStalledCount
Maximum times a job can be recovered from stalled state before permanent failure.const queue = new Queue('myqueue', {
settings: {
maxStalledCount: 2 // Allow 2 stalled recoveries
}
});
After maxStalledCount is exceeded:queue.on('failed', (job, err) => {
if (err.message.includes('stalled more than allowable limit')) {
console.error(`Job ${job.id} stalled too many times`);
}
});
Monitoring Stalled Jobs
Listen for Stalled Events
queue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled and will be reprocessed`);
// Log to monitoring service
logger.warn('Job stalled', {
jobId: job.id,
jobName: job.name,
attemptsMade: job.attemptsMade,
data: job.data
});
});
Monitor Lock Extension Failures
queue.on('lock-extension-failed', (job, err) => {
console.error(`Lock extension failed for job ${job.id}:`, err);
// Alert on Redis connectivity issues
alerting.send({
severity: 'high',
message: 'Redis connection issues detected',
error: err.message
});
});
Track Stalled Job Metrics
let stalledCount = 0;
queue.on('stalled', () => {
stalledCount++;
metrics.increment('queue.stalled');
});
setInterval(() => {
if (stalledCount > 10) {
console.error(`High stalled job rate: ${stalledCount} in last minute`);
}
stalledCount = 0;
}, 60000);
Best Practices
1. Always Monitor Stalled Events
queue.on('stalled', (job) => {
// CRITICAL: Log this to your error monitoring
errorReporting.captureException(new Error('Job stalled'), {
extra: {
jobId: job.id,
jobName: job.name,
attemptsMade: job.attemptsMade
}
});
});
2. Design for Idempotency
Since stalled jobs may be double processed, make sure your jobs are idempotent:
queue.process(async (job) => {
const { orderId } = job.data;
// Check if already processed
const existingOrder = await db.orders.findOne({ id: orderId });
if (existingOrder.status === 'processed') {
console.log(`Order ${orderId} already processed, skipping`);
return existingOrder;
}
// Process order
return await processOrder(orderId);
});
3. Use Appropriate Lock Durations
// Short-running jobs
const fastQueue = new Queue('fast', {
settings: {
lockDuration: 10000, // 10 seconds
lockRenewTime: 5000
}
});
// Long-running jobs
const slowQueue = new Queue('slow', {
settings: {
lockDuration: 300000, // 5 minutes
lockRenewTime: 150000
}
});
4. Break Up CPU-Intensive Work
queue.process(async (job) => {
const items = job.data.items;
const batchSize = 100;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
await processBatch(batch);
// Update progress and allow event loop to run
await job.progress((i + batchSize) / items.length * 100);
}
});
5. Use Sandboxed Processors for Heavy Work
// processor.js
module.exports = async (job) => {
// Heavy CPU work runs in separate process
// Won't block main queue's event loop
return heavyComputation(job.data);
};
// main.js
queue.process('/path/to/processor.js');
6. Set Appropriate maxStalledCount
// Critical jobs - be cautious with retries
const paymentQueue = new Queue('payments', {
settings: {
maxStalledCount: 1 // Only one stalled recovery
}
});
// Idempotent jobs - can tolerate more retries
const notificationQueue = new Queue('notifications', {
settings: {
maxStalledCount: 3 // More tolerant of stalling
}
});
Debugging Stalled Jobs
Enable debug logging:
NODE_DEBUG=bull node your-app.js
This will show detailed information about lock acquisition and renewal:
BULL 12345: Extending lock for job 1
BULL 12345: Lock extended successfully
BULL 12345: Failed to extend lock for job 2
BULL 12345: Job 2 marked as stalled
See Debugging for more details.
Example: Production Configuration
const Queue = require('bull');
const logger = require('./logger');
const queue = new Queue('production', {
settings: {
lockDuration: 45000, // 45 seconds
lockRenewTime: 22500, // 50% of lockDuration
stalledInterval: 30000, // Check every 30 seconds
maxStalledCount: 2 // Allow 2 recoveries
}
});
// Monitor stalled jobs
queue.on('stalled', (job) => {
logger.error('Job stalled', {
jobId: job.id,
name: job.name,
attemptsMade: job.attemptsMade,
timestamp: new Date().toISOString()
});
});
// Monitor lock extension failures
queue.on('lock-extension-failed', (job, err) => {
logger.error('Lock extension failed', {
jobId: job.id,
error: err.message,
timestamp: new Date().toISOString()
});
});
// Idempotent processor
queue.process(async (job) => {
const { taskId } = job.data;
// Check if already completed
const existing = await redis.get(`task:${taskId}:completed`);
if (existing) {
logger.info(`Task ${taskId} already completed`);
return JSON.parse(existing);
}
// Process in chunks to avoid blocking
const result = await processInChunks(job.data);
// Mark as completed
await redis.set(`task:${taskId}:completed`, JSON.stringify(result), 'EX', 3600);
return result;
});