There are many reasons to need a background job processing mechanism in your application which is often because the job takes longer than that we can handle it in the context of a http request-response communication.
For example, generating a report and publishing the results that takes a lot of time which makes it nearly impossible to use http request-response to do the job or running a batch processing job every time a certain file is uploaded to Dropbox again is an example of where using background job processing comes handy.
In a series of tutorials I'll show you how to use a fantastic tool named bull
to handle different scenarios of background processing. Each of the examples can be used as a standalone tutorial so in case anyone starts reading them doesn't need to refer to my other tutorials to understand what's going on. This also means that you might find few things repeated in these tutorials which obviously you can easily detect and just skip.
In this example we set up bull and Redis
and we'll see how easily we can just start producing and consuming jobs.
We start with creating our package.json
:
npm init -y
Then let's create a file named index.js
which will contain our whole code.
Now that we have something to run, let's install nodemon to make it easier to see the results as soon as we modify any files:
npm install nodemon
To get nodemon up and running there is one more tiny step, which is to update our package.json
file by adding a start
script like this:
"start": "nodemon index.js"
This is how the package.json
scripts looks like so far:
...
"scripts": {
"start": "nodemon server.js"
},
...
Before starting to write the code we add one last dependency which is the bull
package itself:
npm install bull --save
Bull uses Redis
as its backbone to manage the jobs, so here we simply start a Redis container like this:
docker run -p 6379:6379 -d redis
So far we have all the packages, dependencies and the file to write our code. Now, we start by importing the package bull
in our index.js
file:
const Queue = require('bull');
Let's set some variables to later use in our code:
const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
const intervalInMilli = 1000; // 1000 milliseconds;
const queueName = 'routine_jobs';
Now we create a new queue:
const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost } });
So far it's been pretty self explanatory, just notice that we can also provide a password when creating the queue if our Redis requires it:
const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost, password: 'something_super_secure' } });
Now that we have created the queue, we can specify how the jobs should be processed, using the process
method
routineJobsQueue.process(function (job, done) {
const jobData = job.data; // get the data from the job
console.log(`processing job ${jobData.jobId}`);
done(null, { t2: jobData.value * 2, t3: jobData.value * 3 });
});
Here we simply log the jobId
and set the result by calling the method done
. The first parameter passed to done
is the error that might have happened during processing the job and the second parameter is an arbitrary result object. In case, you're wondering who'd use this result, actually bull
provides an option to register an eventListener that acts whenever a job is done successfully.
This is our eventListener that simply logs a message:
routineJobsQueue.on('completed', function (job, result) {
const jobData = job.data;
console.log(`job ${jobData.jobId} completed with result: ${JSON.stringify(result)}`)
})
Now let's simulate producing jobs by adding an arbitrary job to the queue at equal intervals:
let count = 0;
setInterval(async () => {
const job = {
jobId: count,
value: count,
jobType: 'routine'
};
await routineJobsQueue.add(job);
console.log(`scheduled job: ${count}`);
count++;
}, intervalInMilli);
Just worth mentioning all the properties of the job are arbitrary, and this is a sample output for this code:
const Queue = require('bull');
const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
const intervalInMilli = 1000; // 1000 milliseconds
const queueName = 'routine_jobs';
// A queue for the jobs scheduled based on a routine without any external requests
const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost } });
routineJobsQueue.process(function (job, done) {
const jobData = job.data;
console.log(`processing job ${jobData.jobId}`);
done(null, { t2: jobData.value * 2, t3: jobData.value * 3 });
});
routineJobsQueue.on('completed', function (job, result) {
const jobData = job.data;
console.log(`job ${jobData.jobId} completed with result: ${JSON.stringify(result)}`)
})
// Generate a routine job every second
let count = 0;
setInterval(async () => {
const job = {
jobId: count,
value: count,
jobType: 'routine'
};
await routineJobsQueue.add(job);
console.log(`scheduled job: ${count}`);
count++;
}, intervalInMilli);
Soon I'll update this post and will show you how you can simply host the application with utopiops so stay tuned as you might get some exciting coupon codes too ;) !
Finally, you can find the complete source code here on Github .