Bull uses locks to ensure that a job is only processed by one worker at a time. These methods allow you to manually manage job locks for advanced use cases.
These are low-level methods. Most users will not need to use them directly. Bull automatically manages locks during normal job processing.
lockKey()
Returns a unique key representing a lock for this job.
The Redis key used for locking this job
Getting Lock Key
const job = await queue.getJob(jobId);
const lockKey = job.lockKey();
console.log('Job lock key:', lockKey);
// Output: bull:myqueue:123:lock
Use Cases
// Check if job is locked using Redis directly
const redis = require('ioredis');
const client = new redis();
const job = await queue.getJob(jobId);
const lockKey = job.lockKey();
const lockValue = await client.get(lockKey);
if (lockValue) {
console.log('Job is currently locked (being processed)');
} else {
console.log('Job is not locked');
}
takeLock()
takeLock(): Promise<number | false>
Takes a lock for this job so that no other queue worker can process it at the same time.
Returns a lock identifier (number) on success, or false if lock could not be acquired
Acquiring a Lock
const job = await queue.getJob(jobId);
const lock = await job.takeLock();
if (lock) {
console.log('Lock acquired:', lock);
// Do some work...
await performWork(job.data);
// Release the lock when done
await job.releaseLock();
} else {
console.log('Could not acquire lock - job is being processed');
}
Manual Job Processing
async function manuallyProcessJob(jobId) {
const job = await queue.getJob(jobId);
if (!job) {
throw new Error('Job not found');
}
// Try to acquire lock
const lock = await job.takeLock();
if (!lock) {
throw new Error('Job is already being processed');
}
try {
// Process the job
const result = await customProcessing(job.data);
// Move to completed
await job.moveToCompleted(result, false);
} catch (error) {
// Move to failed
await job.moveToFailed({ message: error.message }, false);
throw error;
} finally {
// Always release the lock
await job.releaseLock();
}
}
releaseLock()
releaseLock(): Promise<void>
Releases the lock on the job. Only locks owned by the queue instance can be released.
Releasing a Lock
const job = await queue.getJob(jobId);
const lock = await job.takeLock();
if (lock) {
try {
// Perform work
await doWork(job.data);
} finally {
// Always release in finally block
await job.releaseLock();
}
}
Error Handling
async function safeProcessJob(jobId) {
const job = await queue.getJob(jobId);
const lock = await job.takeLock();
if (!lock) {
console.log('Job is locked by another worker');
return;
}
try {
await processJob(job);
} catch (error) {
console.error('Error processing job:', error);
throw error;
} finally {
// Ensure lock is always released
try {
await job.releaseLock();
} catch (releaseError) {
console.error('Error releasing lock:', releaseError);
}
}
}
extendLock()
extendLock(duration: number): Promise<number>
Extends the lock for this job. The duration parameter specifies the lock duration in milliseconds.
Lock duration in milliseconds
Returns 1 on success and 0 on failure
Extending Lock Duration
queue.process(async (job) => {
// For very long-running jobs, you may need to extend the lock
for (let i = 0; i < 100; i++) {
await processChunk(job.data, i);
// Extend lock every 10 iterations
if (i % 10 === 0) {
const extended = await job.extendLock(30000); // 30 seconds
if (extended === 1) {
console.log('Lock extended successfully');
} else {
console.log('Failed to extend lock');
}
}
}
return { processed: 100 };
});
Long-Running Jobs
queue.process(async (job) => {
const startTime = Date.now();
const lockDuration = 30000; // 30 seconds
// Set up periodic lock extension
const extendInterval = setInterval(async () => {
const result = await job.extendLock(lockDuration);
if (result === 0) {
console.warn('Failed to extend lock for job', job.id);
clearInterval(extendInterval);
}
}, lockDuration / 2); // Extend halfway through duration
try {
// Long-running operation
await veryLongProcess(job.data);
return { success: true, duration: Date.now() - startTime };
} finally {
clearInterval(extendInterval);
}
});
Lock Configuration
Lock behavior is configured when creating the queue:
const queue = new Queue('myqueue', {
settings: {
lockDuration: 30000, // Lock expires after 30 seconds
lockRenewTime: 15000, // Auto-renew lock every 15 seconds
}
});
Lock Settings
Time in milliseconds to acquire the job lock. Set this higher for CPU-intensive jobs.
lockRenewTime
number
default:"lockDuration / 2"
Interval in milliseconds on which to renew the job lock. Should never be larger than lockDuration.
Understanding Lock Behavior
const queue = new Queue('processing', {
settings: {
lockDuration: 60000, // 60 second lock
lockRenewTime: 30000, // Renew every 30 seconds
}
});
queue.process(async (job) => {
// Bull automatically:
// 1. Takes a lock when job starts
// 2. Renews the lock every 30 seconds
// 3. Releases the lock when job completes
await longRunningTask(job.data);
return { success: true };
});
Stalled Jobs and Locks
If a worker crashes or locks aren’t renewed, jobs become “stalled”:
const queue = new Queue('myqueue', {
settings: {
stalledInterval: 30000, // Check for stalled jobs every 30 seconds
maxStalledCount: 1, // Max times a job can be stalled before permanent failure
}
});
queue.on('stalled', (job) => {
console.log(`Job ${job.id} stalled - lock expired without completion`);
});
queue.on('lock-extension-failed', (job, err) => {
console.error(`Failed to extend lock for job ${job.id}:`, err);
});
Best Practices
- Let Bull manage locks automatically - Only use manual locking for advanced scenarios
- Always release locks in finally blocks - Prevent deadlocks
- Configure appropriate lock duration - Based on expected job processing time
- Monitor lock extension failures - May indicate Redis connectivity issues
- Handle stalled jobs - Implement proper error handling and monitoring
// Good: Let Bull handle it automatically
queue.process(async (job) => {
return await processJob(job.data);
});
// Advanced: Manual lock management
queue.process(async (job) => {
const externalLock = await acquireExternalLock(job.data.resourceId);
try {
return await processWithExternalResource(job.data);
} finally {
await releaseExternalLock(externalLock);
}
});