Skip to main content
These methods control job execution flow, allowing you to retry failed jobs, promote delayed jobs, or wait for job completion.

retry()

retry(): Promise
Re-runs a job that has failed. Returns a promise that resolves when the job is scheduled for retry.

Basic Retry

const job = await queue.getJob(jobId);
const state = await job.getState();

if (state === 'failed') {
  console.log('Retrying failed job...');
  await job.retry();
}

Automatic Retry on Failure

queue.on('failed', async (job, err) => {
  console.log(`Job ${job.id} failed:`, err.message);
  
  // Retry if certain conditions are met
  if (err.message.includes('ECONNRESET')) {
    console.log('Network error detected, retrying...');
    await job.retry();
  }
});

Conditional Retry

async function retryIfNeeded(jobId) {
  const job = await queue.getJob(jobId);
  
  if (!job) return;
  
  const state = await job.getState();
  
  if (state === 'failed') {
    // Check if we haven't exceeded max attempts
    if (job.attemptsMade < 5) {
      await job.retry();
      console.log(`Retrying job ${jobId} (attempt ${job.attemptsMade + 1})`);
    } else {
      console.log(`Job ${jobId} exceeded max retries`);
    }
  }
}
The retry() method is different from automatic retries configured via the attempts option. This method manually schedules a retry regardless of the configured attempts.

discard()

discard(): Promise
Ensures this job is never run again, even if attemptsMade is less than job.opts.attempts.

Preventing Further Retries

queue.process(async (job) => {
  try {
    // Validate critical data
    if (!job.data.userId) {
      await job.log('Missing userId - discarding job');
      await job.discard();
      throw new Error('Missing required field: userId');
    }
    
    return await processJob(job.data);
    
  } catch (error) {
    if (error.message.includes('Invalid data')) {
      // Don't retry jobs with invalid data
      await job.discard();
    }
    throw error;
  }
});

Discard on Validation Failure

queue.on('failed', async (job, err) => {
  // Discard jobs that fail validation
  if (err.message.startsWith('Validation Error:')) {
    console.log(`Discarding job ${job.id} - validation failed`);
    await job.discard();
  }
  // Other errors will retry normally according to job.opts.attempts
});

Use Cases for discard()

  • Invalid or malformed data that will never be valid
  • Jobs that depend on resources that no longer exist
  • Jobs that have become obsolete or unnecessary
  • Failed validation that cannot be fixed by retrying
queue.process(async (job) => {
  // Check if user still exists
  const user = await getUserById(job.data.userId);
  
  if (!user) {
    await job.log('User no longer exists');
    await job.discard();
    throw new Error('User not found');
  }
  
  // Check if action is still needed
  if (user.emailVerified && job.data.action === 'send-verification') {
    await job.log('Email already verified - job obsolete');
    await job.discard();
    return { skipped: true };
  }
  
  return await sendEmail(user.email, job.data);
});

promote()

promote(): Promise
Promotes a job that is currently “delayed” to the “waiting” state and executed as soon as possible.

Promoting Delayed Jobs

// Add a delayed job
const job = await queue.add('email', data, {
  delay: 60000 // 1 minute delay
});

console.log('Job scheduled for 1 minute from now');

// Later, promote it to run immediately
await job.promote();
console.log('Job promoted to run immediately');

Conditional Promotion

// Add a scheduled job
const job = await queue.add('notification', {
  userId: 123,
  priority: 'low'
}, {
  delay: 3600000 // 1 hour delay
});

// If priority changes, promote the job
async function updatePriority(jobId, newPriority) {
  const job = await queue.getJob(jobId);
  
  if (newPriority === 'high') {
    const state = await job.getState();
    
    if (state === 'delayed') {
      await job.promote();
      console.log('Job promoted due to high priority');
    }
  }
}

await updatePriority(job.id, 'high');

Promoting All Delayed Jobs

async function promoteAllDelayedJobs() {
  const delayedJobs = await queue.getDelayed();
  
  console.log(`Promoting ${delayedJobs.length} delayed jobs...`);
  
  for (const job of delayedJobs) {
    await job.promote();
  }
  
  console.log('All delayed jobs promoted');
}

finished()

finished(): Promise<any>
Returns a promise that resolves when the job completes successfully, or rejects when the job fails.
return
any
The return value from the job processor

Waiting for Job Completion

const job = await queue.add('process-video', {
  videoId: 'abc123',
  url: 'https://example.com/video.mp4'
});

try {
  const result = await job.finished();
  console.log('Video processing completed:', result);
  // { videoId: 'abc123', thumbnail: 'https://...', duration: 120 }
  
} catch (error) {
  console.error('Video processing failed:', error.message);
}

Sequential Job Processing

async function processSequence(items) {
  const results = [];
  
  for (const item of items) {
    const job = await queue.add('process-item', item);
    
    // Wait for each job to complete before adding the next
    const result = await job.finished();
    results.push(result);
    
    console.log(`Completed ${results.length}/${items.length}`);
  }
  
  return results;
}

Job Dependencies

// Create a job and wait for it to complete before creating dependent jobs
const uploadJob = await queue.add('upload', { file: 'video.mp4' });

try {
  const uploadResult = await uploadJob.finished();
  
  // Now create dependent jobs
  await queue.add('generate-thumbnail', {
    videoUrl: uploadResult.url
  });
  
  await queue.add('transcode', {
    videoUrl: uploadResult.url,
    formats: ['720p', '1080p']
  });
  
  console.log('Upload complete, processing jobs queued');
  
} catch (error) {
  console.error('Upload failed:', error);
}

API Response with Job Result

app.post('/api/process', async (req, res) => {
  const job = await queue.add('heavy-processing', req.body);
  
  // Wait for job to complete before responding
  try {
    const result = await job.finished();
    res.json({ success: true, result });
    
  } catch (error) {
    res.status(500).json({
      success: false,
      error: error.message
    });
  }
});

Timeout with finished()

const job = await queue.add('long-task', data);

try {
  // Race between job completion and timeout
  const result = await Promise.race([
    job.finished(),
    new Promise((_, reject) => 
      setTimeout(() => reject(new Error('Job timeout')), 30000)
    )
  ]);
  
  console.log('Job completed:', result);
  
} catch (error) {
  if (error.message === 'Job timeout') {
    console.log('Job is taking too long, continuing...');
    // Job continues processing in background
  } else {
    console.error('Job failed:', error);
  }
}

Build docs developers (and LLMs) love