Photo by William Daigneault on Unsplash
Node.js background job processing with bull - separate producer and consumers with dashboard
There are many reasons to need a background job processing mechanism in your application which is often because the job takes longer than that we can handle it in the context of a http request-response communication.
For example, generating a report and publishing the results that takes a lot of time which makes it nearly impossible to use http request-response to do the job or running a batch processing job every time a certain file is uploaded to Dropbox again is an example of where using background job processing comes handy.
In a series of tutorials I'm showing you how to use a fantastic tool named bull
to handle different scenarios of background processing. Each of the examples can be used as a standalone tutorial so in case anyone starts reading them doesn't need to refer to my other tutorials to understand what's going on. This also means that you might find few things repeated in these tutorials which obviously you can easily detect and just skip.
In this example we set up bull and Redis
and we'll see how easily we can have separate processes that produce and consume the jobs and add a dashboard to see what's happening in our queue.
There are multiple tools to monitor our queue, and we use bull-board in this tutorial. bull-board
supports multiple popular REST frameworks, and we stick to the most popular one, express
in our examples.
Let's first create two folders server
and worker
to hold the code for the server
and the worker
services.
We start by preparing the server part. First things first, we start with creating our package.json
:
npm init -y
Let's start by installing nodemon to make it easier to see the results as soon as we modify any files:
npm install nodemon
To get nodemon up and running there is one more tiny step, which is to update our package.json
file by adding a start
script like this:
"start": "nodemon server.js",
This is how the package.json
scripts looks like so far:
...
"scripts": {
"start": "nodemon server.js"
},
...
Now we're ready to add the part that our server schedules some jobs to be processed. We start by installing the bull
package:
npm install bull --save
Bull uses Redis
as its backbone to manage the jobs, so here we simply start a Redis container like this:
docker run -p 6379:6379 -d redis
Let's create a file named server.js
inside the server
folder to both produce jobs and serve the UI to monitor our queue. We start our server's code by importing the necessary dependencies:
const express = require('express');
const Queue = require('bull');
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');
Let's set some variables to later use in our code:
const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
const intervalInMilli = 1000; // 1000 milliseconds;
const queueName = 'routine_jobs';
Now we create a new queue:
const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost } });
So far it's been pretty self explanatory, just notice that we can also provide a password when creating the queue if our Redis requires it:
const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost, password: 'something_super_secure' } });
Now let's simulate producing jobs by adding an arbitrary job to the queue at equal intervals:
let count = 0;
setInterval(async () => {
const job = {
jobId: count,
value: count,
jobType: 'routine'
};
await routineJobsQueue.add(job);
console.log(`scheduled job: ${count}`);
count++;
}, intervalInMilli);
Just worth mentioning all the properties of the job are arbitrary.
Finally we add the monitoring dashboard for our queue:
// ------------ enable bull-ui -----------
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [
new BullAdapter(routineJobsQueue),
],
serverAdapter:serverAdapter
});
const app = express()
serverAdapter.setBasePath('/admin/queues'); // An arbitrary path to serve the dashboard
app.use('/admin/queues', serverAdapter.getRouter());
// The rest of your sever's setup....
app.listen(port, () => {
console.log(`Listening on port: ${port}`)
});
Ultimately, this is how our server.js
looks like.
const express = require('express');
const Queue = require('bull');
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');
const port = process.env.PORT || 9090;
const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
const intervalInMilli = 1000; // 1000 milliseconds
// A queue for the jobs scheduled based on a routine without any external requests
const routineJobsQueue = new Queue('routine_jobs', { redis: { port: redisPort, host: redisHost } });
routineJobsQueue.on('completed', function (job, result) {
const jobData = job.data;
console.log(`job ${jobData.jobId} completed with result: ${JSON.stringify(result)}`)
})
// Generate a routine job every second
let count = 0;
setInterval(async () => {
await routineJobsQueue.add({
jobId: count,
value: count,
jobType: 'routine'
});
console.log(`scheduled job: ${count}`);
count++;
}, intervalInMilli);
// ------------ enable bull-ui -----------
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [
new BullAdapter(routineJobsQueue),
],
serverAdapter:serverAdapter
});
const app = express()
serverAdapter.setBasePath('/admin/queues'); // An arbitrary path to serve the dashboard
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(port, () => {
console.log(`Listening on port: ${port}`)
});
If you have read the rest of my articles in this series, you can just skip to the last part and see the results as there is no change in the worker code.
Now let's write the workers' code where we process these jobs. Notice that having the workers in separate processes not only help you with avoiding issues cause by CPU intensive jobs that can impact bull
's behaviour, it helps you automatically increase or decrease the number of worker processes depending on the load.
In our worker/index.js
file, again we create the package.json
, install bull
and update our package.json
like this:
...
"scripts": {
"start": "nodemon index.js"
},
...
Now again we start with creating the queue. Notice that it won't cause any issues if the queue is already created by server or even another worker and this is actually how we get a handle to our queue to start fetching jobs from.
const Queue = require('bull');
const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
// The queue name is exactly the same as what we set in the server. Notice that you must connect to the same redis instance too.
const routineJobsQueue = new Queue('routine_jobs', { redis: { port: redisPort, host: redisHost } });
Now that we created the queue in our worker too, it's time to add the function to process the jobs:
routineJobsQueue.process(async function (job) {
const jobData = job.data;
console.log(`processing job ${jobData.jobId}`);
// Let's set a dummy result
return ({ t2: jobData.value * 2, t3: jobData.value * 3 });
});
The process
method supports callbacks too, and to do so, you have to pass two parameters to the processor function: function (job, done)
where done
is a function with two parameters: error
and result
. So, this code could be also written like this:
routineJobsQueue.process(function (job, done) {
const jobData = job.data;
console.log(`processing job ${jobData.jobId}`);
// Let's set a dummy result
done(null, { t2: jobData.value * 2, t3: jobData.value * 3 });
});
Finally, this is how our index.js
looks like:
const Queue = require('bull');
const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
// A queue for the jobs scheduled based on a routine without any external requests
const routineJobsQueue = new Queue('routine_jobs', { redis: { port: redisPort, host: redisHost } });
routineJobsQueue.process(async function (job) {
const jobData = job.data;
console.log(`processing job ${jobData.jobId}`);
// Let's set a dummy result
return ({ t2: jobData.value * 2, t3: jobData.value * 3 });
});
Time to test the code and see the end result. Open a shell, go to the worker folder and run the program:
cd worker && npm start
Then, we go to the server folder and run the program:
cd server && npm start
This is how the dashboard looks like: localhost:9090/admin/queues/queue/routine_j..
Soon I'll update this post and will show you how you can simple host the application with utopiops so stay tuned as you might get some exciting coupon codes too ;) !
Finally, you can find the complete source code here on Github .