Queue.clean()
Removes jobs of a specific type that were created outside of a grace period.
Signature
clean(grace: number, status?: string, limit?: number): Promise<number[]>
Parameters
Grace period in milliseconds. Jobs older than this will be removed.
status
string
default:"'completed'"
Job status to clean. Valid values: 'completed', 'failed', 'delayed', 'active', 'waiting'.
Maximum number of jobs to clean in this operation.
Returns
Array of removed job IDs.
Examples
Clean Completed Jobs
// Remove completed jobs older than 5 seconds
const removed = await queue.clean(5000);
console.log(`Cleaned ${removed.length} completed jobs`);
Clean Failed Jobs
// Remove failed jobs older than 10 seconds
const removed = await queue.clean(10000, 'failed');
console.log(`Cleaned ${removed.length} failed jobs`);
Clean with Limit
// Clean max 100 jobs at a time
const removed = await queue.clean(60000, 'completed', 100);
console.log(`Cleaned ${removed.length} jobs (max 100)`);
Listen to Clean Events
queue.on('cleaned', (jobs, type) => {
console.log(`Cleaned ${jobs.length} ${type} jobs`);
});
await queue.clean(5000, 'completed');
Scheduled Cleanup
// Clean old jobs every hour
setInterval(async () => {
// Remove completed jobs older than 24 hours
await queue.clean(24 * 60 * 60 * 1000, 'completed');
// Remove failed jobs older than 7 days
await queue.clean(7 * 24 * 60 * 60 * 1000, 'failed');
console.log('Cleanup completed');
}, 60 * 60 * 1000);
Clean Multiple Statuses
async function cleanOldJobs(graceMs) {
const statuses = ['completed', 'failed'];
for (const status of statuses) {
const removed = await queue.clean(graceMs, status);
console.log(`Cleaned ${removed.length} ${status} jobs`);
}
}
await cleanOldJobs(24 * 60 * 60 * 1000); // 24 hours
Queue.removeJobs()
Removes all jobs whose jobId matches the given pattern.
Signature
removeJobs(pattern: string): Promise<void>
Parameters
Returns
A promise that resolves when the matching jobs have been removed.
Pattern Syntax
? - Matches exactly one character
* - Matches any number of characters
[abc] - Matches one character from the set
[^abc] - Matches one character not in the set
[a-z] - Matches one character in the range
Examples
Remove Specific Job Patterns
// Remove jobs starting with "foo"
await queue.removeJobs('foo*');
// Remove jobs with single character + "oo"
await queue.removeJobs('?oo*');
// Remove jobs ending with numbers
await queue.removeJobs('*[0-9]');
Remove Jobs by User
// Assuming jobIds include user IDs
await queue.removeJobs('user-123-*');
console.log('Removed all jobs for user 123');
Remove Test Jobs
// Clean up test jobs
await queue.removeJobs('test-*');
await queue.removeJobs('*-test');
This method does not affect Repeatable Job configurations. To remove repeatable jobs, use removeRepeatable() or removeRepeatableByKey().
Pattern matching uses the Redis KEYS command which can be slow on large queues. Use with caution in production.
Queue.empty()
Drains the queue by deleting all waiting and delayed jobs.
Signature
Returns
A promise that resolves when the queue is emptied.
Examples
Empty the Queue
await queue.empty();
console.log('Queue emptied');
Reset Queue Before Tests
beforeEach(async () => {
await queue.empty();
});
Conditional Empty
async function resetQueue() {
const waiting = await queue.getWaitingCount();
if (waiting > 0) {
console.log(`Emptying ${waiting} waiting jobs`);
await queue.empty();
}
}
Important Notes
- Only removes waiting and delayed jobs
- Does NOT remove active, completed, or failed jobs
- Does NOT remove Repeatable Job configurations
- Repeatable jobs will continue to be created on schedule
To remove other job statuses, use clean(). To remove everything including repeatable configurations, use obliterate().
Queue.obliterate()
Completely removes a queue with all its data.
Signature
obliterate(opts?: { force: boolean }): Promise<void>
Parameters
If true, obliterates the queue even if there are active jobs. If false, will fail if active jobs exist.
Returns
A promise that resolves when the queue is obliterated.
Examples
Safe Obliterate
try {
await queue.obliterate();
console.log('Queue obliterated');
} catch (error) {
console.error('Cannot obliterate: active jobs exist');
}
Force Obliterate
// Remove everything, even with active jobs
await queue.obliterate({ force: true });
console.log('Queue forcefully obliterated');
Clean Shutdown and Remove
async function removeQueue() {
// Pause to stop new jobs
await queue.pause();
// Wait for active jobs
await queue.whenCurrentJobsFinished();
// Now safe to obliterate
await queue.obliterate();
console.log('Queue completely removed');
}
Test Cleanup
after(async () => {
await queue.obliterate({ force: true });
});
Important Notes
- Removes ALL jobs (waiting, active, delayed, completed, failed)
- Removes ALL Repeatable Job configurations
- Removes all queue metadata
- Cannot be undone
- Not atomic - performed iteratively
The queue is paused during obliteration. If another script unpauses the queue during this process, the call will fail and return the items it managed to remove.
Queue.close()
Closes the underlying Redis connections for graceful shutdown.
Signature
close(doNotWaitJobs?: boolean): Promise<void>
Parameters
If true, closes immediately without waiting for active jobs. If false, waits for active jobs to complete.
Returns
A promise that resolves when the queue is closed.
Examples
Basic Close
await queue.close();
console.log('Queue closed');
Close After Processing N Jobs
const _ = require('lodash');
const after100 = _.after(100, async function () {
await queue.close();
console.log('Processed 100 jobs, queue closed');
});
queue.on('completed', after100);
Close from Within Job Handler (Callback)
queue.process((job, done) => {
if (shouldShutdown) {
queue.close(); // Call close first
done(); // Then complete the job
} else {
processJob(job, done);
}
});
Close from Within Job Handler (Promise)
queue.process(async (job) => {
if (shouldShutdown) {
queue.close(); // Close doesn't need to be awaited here
}
return await processJob(job);
});
Important: When closing from within a job handler, don’t wait for close() to complete before finishing the job. The queue won’t close until after the job completes.// WRONG - This won't work
queue.process(async (job) => {
await processJob(job);
await queue.close(); // Will hang
return result;
});
// CORRECT
queue.process(async (job) => {
await processJob(job);
queue.close(); // Don't await
return result;
});
Graceful Application Shutdown
const queues = [queue1, queue2, queue3];
process.on('SIGTERM', async () => {
console.log('SIGTERM received, closing queues...');
// Close all queues
await Promise.all(queues.map(q => q.close()));
console.log('All queues closed');
process.exit(0);
});
Close with Timeout
async function closeWithTimeout(timeoutMs = 30000) {
const timeout = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Close timeout')), timeoutMs);
});
try {
await Promise.race([
queue.close(),
timeout
]);
console.log('Queue closed gracefully');
} catch (error) {
console.log('Close timed out, forcing close');
await queue.close(true); // Force close
}
}
// Close immediately without waiting for active jobs
await queue.close(true);
console.log('Queue closed immediately');
Comparison
| Method | Waiting | Delayed | Active | Completed | Failed | Repeatable |
|---|
empty() | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ |
clean() | ✅* | ✅* | ✅* | ✅* | ✅* | ❌ |
obliterate() | ✅ | ✅ | ✅** | ✅ | ✅ | ✅ |
close() | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ |
*Only if status matches and outside grace period
**Only with force: true
Common Patterns
Automated Cleanup Job
// Create a cleanup queue that runs periodically
const cleanupQueue = new Queue('cleanup');
cleanupQueue.process(async () => {
const day = 24 * 60 * 60 * 1000;
await queue.clean(day, 'completed');
await queue.clean(7 * day, 'failed');
return { cleaned: true };
});
// Schedule cleanup daily
await cleanupQueue.add({}, {
repeat: { cron: '0 2 * * *' } // 2 AM daily
});
Progressive Cleanup
async function progressiveClean() {
const batchSize = 1000;
let total = 0;
while (true) {
const removed = await queue.clean(24 * 60 * 60 * 1000, 'completed', batchSize);
if (removed.length === 0) break;
total += removed.length;
console.log(`Cleaned ${total} jobs so far...`);
// Pause between batches to avoid Redis load
await new Promise(resolve => setTimeout(resolve, 1000));
}
console.log(`Total cleaned: ${total}`);
}
Safe Queue Removal
async function safeRemoveQueue(queue) {
console.log('Pausing queue...');
await queue.pause();
console.log('Waiting for jobs to finish...');
await queue.whenCurrentJobsFinished();
console.log('Obliterating queue...');
await queue.obliterate();
console.log('Closing connections...');
await queue.close();
console.log('Queue removed successfully');
}