These methods control job execution flow, allowing you to retry failed jobs, promote delayed jobs, or wait for job completion.
retry()
Re-runs a job that has failed. Returns a promise that resolves when the job is scheduled for retry.
Basic Retry
const job = await queue.getJob(jobId);
const state = await job.getState();
if (state === 'failed') {
console.log('Retrying failed job...');
await job.retry();
}
Automatic Retry on Failure
queue.on('failed', async (job, err) => {
console.log(`Job ${job.id} failed:`, err.message);
// Retry if certain conditions are met
if (err.message.includes('ECONNRESET')) {
console.log('Network error detected, retrying...');
await job.retry();
}
});
Conditional Retry
async function retryIfNeeded(jobId) {
const job = await queue.getJob(jobId);
if (!job) return;
const state = await job.getState();
if (state === 'failed') {
// Check if we haven't exceeded max attempts
if (job.attemptsMade < 5) {
await job.retry();
console.log(`Retrying job ${jobId} (attempt ${job.attemptsMade + 1})`);
} else {
console.log(`Job ${jobId} exceeded max retries`);
}
}
}
The retry() method is different from automatic retries configured via the attempts option. This method manually schedules a retry regardless of the configured attempts.
discard()
Ensures this job is never run again, even if attemptsMade is less than job.opts.attempts.
Preventing Further Retries
queue.process(async (job) => {
try {
// Validate critical data
if (!job.data.userId) {
await job.log('Missing userId - discarding job');
await job.discard();
throw new Error('Missing required field: userId');
}
return await processJob(job.data);
} catch (error) {
if (error.message.includes('Invalid data')) {
// Don't retry jobs with invalid data
await job.discard();
}
throw error;
}
});
Discard on Validation Failure
queue.on('failed', async (job, err) => {
// Discard jobs that fail validation
if (err.message.startsWith('Validation Error:')) {
console.log(`Discarding job ${job.id} - validation failed`);
await job.discard();
}
// Other errors will retry normally according to job.opts.attempts
});
Use Cases for discard()
- Invalid or malformed data that will never be valid
- Jobs that depend on resources that no longer exist
- Jobs that have become obsolete or unnecessary
- Failed validation that cannot be fixed by retrying
queue.process(async (job) => {
// Check if user still exists
const user = await getUserById(job.data.userId);
if (!user) {
await job.log('User no longer exists');
await job.discard();
throw new Error('User not found');
}
// Check if action is still needed
if (user.emailVerified && job.data.action === 'send-verification') {
await job.log('Email already verified - job obsolete');
await job.discard();
return { skipped: true };
}
return await sendEmail(user.email, job.data);
});
Promotes a job that is currently “delayed” to the “waiting” state and executed as soon as possible.
// Add a delayed job
const job = await queue.add('email', data, {
delay: 60000 // 1 minute delay
});
console.log('Job scheduled for 1 minute from now');
// Later, promote it to run immediately
await job.promote();
console.log('Job promoted to run immediately');
// Add a scheduled job
const job = await queue.add('notification', {
userId: 123,
priority: 'low'
}, {
delay: 3600000 // 1 hour delay
});
// If priority changes, promote the job
async function updatePriority(jobId, newPriority) {
const job = await queue.getJob(jobId);
if (newPriority === 'high') {
const state = await job.getState();
if (state === 'delayed') {
await job.promote();
console.log('Job promoted due to high priority');
}
}
}
await updatePriority(job.id, 'high');
async function promoteAllDelayedJobs() {
const delayedJobs = await queue.getDelayed();
console.log(`Promoting ${delayedJobs.length} delayed jobs...`);
for (const job of delayedJobs) {
await job.promote();
}
console.log('All delayed jobs promoted');
}
finished()
Returns a promise that resolves when the job completes successfully, or rejects when the job fails.
The return value from the job processor
Waiting for Job Completion
const job = await queue.add('process-video', {
videoId: 'abc123',
url: 'https://example.com/video.mp4'
});
try {
const result = await job.finished();
console.log('Video processing completed:', result);
// { videoId: 'abc123', thumbnail: 'https://...', duration: 120 }
} catch (error) {
console.error('Video processing failed:', error.message);
}
Sequential Job Processing
async function processSequence(items) {
const results = [];
for (const item of items) {
const job = await queue.add('process-item', item);
// Wait for each job to complete before adding the next
const result = await job.finished();
results.push(result);
console.log(`Completed ${results.length}/${items.length}`);
}
return results;
}
Job Dependencies
// Create a job and wait for it to complete before creating dependent jobs
const uploadJob = await queue.add('upload', { file: 'video.mp4' });
try {
const uploadResult = await uploadJob.finished();
// Now create dependent jobs
await queue.add('generate-thumbnail', {
videoUrl: uploadResult.url
});
await queue.add('transcode', {
videoUrl: uploadResult.url,
formats: ['720p', '1080p']
});
console.log('Upload complete, processing jobs queued');
} catch (error) {
console.error('Upload failed:', error);
}
API Response with Job Result
app.post('/api/process', async (req, res) => {
const job = await queue.add('heavy-processing', req.body);
// Wait for job to complete before responding
try {
const result = await job.finished();
res.json({ success: true, result });
} catch (error) {
res.status(500).json({
success: false,
error: error.message
});
}
});
Timeout with finished()
const job = await queue.add('long-task', data);
try {
// Race between job completion and timeout
const result = await Promise.race([
job.finished(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Job timeout')), 30000)
)
]);
console.log('Job completed:', result);
} catch (error) {
if (error.message === 'Job timeout') {
console.log('Job is taking too long, continuing...');
// Job continues processing in background
} else {
console.error('Job failed:', error);
}
}