node.jsjob-queuebull.js

Job processing microservices using bull


I would like to process scheduled jobs using node.js bull. Basically I have two processors that handle 2 types of jobs. There is one configurator that configures the jobs which will be added to the bull queue using cron.

The scheduler will be in one microservice and the each of the processor will be a separate microservice. So I will be having 3 micro services.

My question is am I using the correct pattern with bull?

index.js

const Queue = require('bull');

const fetchQueue = new Queue('MyScheduler');
fetchQueue.add("fetcher", {name: "earthQuakeAlert"}, {repeat: {cron: '1-59/2 * * * *'}, removeOnComplete: true});
fetchQueue.add("fetcher", {name: "weatherAlert"}, {repeat: {cron: '3-59/3 * * * *'}, removeOnComplete: true});

processor-configurator.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("processor", __dirname + "/alert-processor");

fetcher-configurator.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("fetcher", __dirname+"/fetcher");

fetcher.js

const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

alert-processor.js

const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

There will be three microservices -

  1. node index.js
  2. node fetcher-configurator.js
  3. node processor-configurator.js

I see inconsistent behavior from bull. Sometimes I am getting the error Missing process handler for job type


Solution

  • Quoting myself with a hope this will be helpful for someone else

    This is because both workers use the same queue. Worker tries to get next job from queue, receives a job with wrong type (eg "fetcher" instead of "processor") and fails because it knows how to handle "processor" and doesn't know what to do with "fetcher". Bull doesn't allow you to take only compatible jobs from queue, both workers should be able to process all types of jobs. The simplest solution would be to use two different queues, one for processors and one for fetchers. Then you can remove names from jobs and processors, it won't be needed anymore since name is defined by the queue.

    https://github.com/OptimalBits/bull/issues/1481