First, require Bull and create a new queue instance. Each queue is identified by a unique name.
const Queue = require('bull');const videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
You can create multiple queues for different job types:
const Queue = require('bull');const videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');const audioQueue = new Queue('audio transcoding', { redis: { port: 6379, host: '127.0.0.1', password: 'foobared' } });const imageQueue = new Queue('image transcoding');const pdfQueue = new Queue('pdf transcoding');
If you don’t specify Redis connection details, Bull connects to 127.0.0.1:6379 by default.
2
Define a job processor
Create a processor function that handles jobs from the queue. The processor receives a job object and a done callback.
videoQueue.process(function (job, done) { // job.data contains the custom data passed when the job was created // job.id contains id of this job. // transcode video asynchronously and report progress job.progress(42); // call done when finished done(); // or give an error if error done(new Error('error transcoding')); // or pass it a result done(null, { framerate: 29.5 /* etc... */ }); // If the job throws an unhandled exception it is also handled correctly throw new Error('some unexpected error');});
The processor function is called every time a job is ready to be processed. Jobs are processed one at a time by default.
3
Add jobs to the queue
Add jobs to the queue with custom data. Jobs are processed in the order they’re added (FIFO by default).
Instead of using the done callback, you can return a promise from your processor:
videoQueue.process(function (job) { // Simply return a promise return fetchVideo(job.data.url).then(transcodeVideo); // Handles promise rejection return Promise.reject(new Error('error transcoding')); // Passes the value the promise is resolved with to the "completed" event return Promise.resolve({ framerate: 29.5 /* etc... */ }); // If the job throws an unhandled exception it is also handled correctly throw new Error('some unexpected error'); // same as return Promise.reject(new Error('some unexpected error'));});
Using promises is the recommended approach as it integrates better with modern async/await syntax and error handling.
You can report job progress using job.progress(). This is useful for long-running jobs:
imageQueue.process(function (job, done) { // Report progress as a percentage job.progress(42); // Continue processing... // call done when finished done(); // or give an error if error done(new Error('error transcoding')); // or pass it a result done(null, { width: 1280, height: 720 /* etc... */ }); // If the job throws an unhandled exception it is also handled correctly throw new Error('some unexpected error');});