Node.js background job processing with bull - basics

Node.js background job processing with bull - basics

There are many reasons to need a background job processing mechanism in your application which is often because the job takes longer than that we can handle it in the context of a http request-response communication.

For example, generating a report and publishing the results that takes a lot of time which makes it nearly impossible to use http request-response to do the job or running a batch processing job every time a certain file is uploaded to Dropbox again is an example of where using background job processing comes handy.

In a series of tutorials I'll show you how to use a fantastic tool named bull to handle different scenarios of background processing. Each of the examples can be used as a standalone tutorial so in case anyone starts reading them doesn't need to refer to my other tutorials to understand what's going on. This also means that you might find few things repeated in these tutorials which obviously you can easily detect and just skip.

In this example we set up bull and Redis and we'll see how easily we can just start producing and consuming jobs.

We start with creating our package.json:

npm init -y

Then let's create a file named index.js which will contain our whole code.

Now that we have something to run, let's install nodemon to make it easier to see the results as soon as we modify any files:

npm install nodemon

To get nodemon up and running there is one more tiny step, which is to update our package.json file by adding a start script like this:

"start": "nodemon index.js"

This is how the package.json scripts looks like so far:

...
"scripts": {
    "start": "nodemon server.js"
  },
...

Before starting to write the code we add one last dependency which is the bull package itself:

npm install bull --save

Bull uses Redis as its backbone to manage the jobs, so here we simply start a Redis container like this:

docker run -p 6379:6379 -d redis

So far we have all the packages, dependencies and the file to write our code. Now, we start by importing the package bull in our index.js file:

const Queue = require('bull');

Let's set some variables to later use in our code:

const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
const intervalInMilli = 1000; // 1000 milliseconds;
const queueName = 'routine_jobs';

Now we create a new queue:

const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost } });

So far it's been pretty self explanatory, just notice that we can also provide a password when creating the queue if our Redis requires it:

const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost, password: 'something_super_secure'  } });

Now that we have created the queue, we can specify how the jobs should be processed, using the process method

routineJobsQueue.process(function (job, done) {
  const jobData = job.data; // get the data from the job
  console.log(`processing job ${jobData.jobId}`);  
  done(null, { t2: jobData.value * 2, t3: jobData.value * 3 });
});

Here we simply log the jobId and set the result by calling the method done. The first parameter passed to done is the error that might have happened during processing the job and the second parameter is an arbitrary result object. In case, you're wondering who'd use this result, actually bull provides an option to register an eventListener that acts whenever a job is done successfully.

This is our eventListener that simply logs a message:

routineJobsQueue.on('completed', function (job, result) {
  const jobData = job.data;
  console.log(`job ${jobData.jobId} completed with result: ${JSON.stringify(result)}`)
})

Now let's simulate producing jobs by adding an arbitrary job to the queue at equal intervals:

let count = 0;
setInterval(async () => {
  const job = {
    jobId: count,
    value: count,
    jobType: 'routine'
  };
  await routineJobsQueue.add(job);
  console.log(`scheduled job: ${count}`);
  count++;
}, intervalInMilli);

Just worth mentioning all the properties of the job are arbitrary, and this is a sample output for this code:

Screen Shot 2021-12-30 at 5.41.50 pm.png

const Queue = require('bull');

const redisHost = process.env.REDIS_HOST || '127.0.0.1';
const redisPort = process.env.REDIS_PORT || 6379;
const intervalInMilli = 1000; // 1000 milliseconds
const queueName = 'routine_jobs';

// A queue for the jobs scheduled based on a routine without any external requests
const routineJobsQueue = new Queue(queueName, { redis: { port: redisPort, host: redisHost } });

routineJobsQueue.process(function (job, done) {
  const jobData = job.data;
  console.log(`processing job ${jobData.jobId}`);  
  done(null, { t2: jobData.value * 2, t3: jobData.value * 3 });
});

routineJobsQueue.on('completed', function (job, result) {
  const jobData = job.data;
  console.log(`job ${jobData.jobId} completed with result: ${JSON.stringify(result)}`)
})

// Generate a routine job every second
let count = 0;
setInterval(async () => {
  const job = {
    jobId: count,
    value: count,
    jobType: 'routine'
  };
  await routineJobsQueue.add(job);
  console.log(`scheduled job: ${count}`);
  count++;
}, intervalInMilli);

Soon I'll update this post and will show you how you can simply host the application with utopiops so stay tuned as you might get some exciting coupon codes too ;) !

Finally, you can find the complete source code here on Github .