bull.js

How can server 1 know reliably when server 2 has completed a specific job? (bull.js)


Server 1 is calling this function:

let job = await transactions.add({ foo: 'bar' });
let status = await job.finished();
console.log(status);

Server 2 is consuming the job as follows:

transactions.process(QueueOptions.maxConcurrent, async (job, done) => {
    done(null, {result: job.data});
});

However, server 1 is not getting a notification when that job completes. The only time job.finished() resolves, is if server 1 queues the job, and then server 2 is started up. In this case, the job.finished() promise works. Otherwise it is always silent.

How can server 1 know reliably when server 2 has completed the job? (A job takes 10 seconds or less to complete)


Solution

  • You have at least two options to achieve notifying the results of one job completion from a processor in one server to another server.

    Option 1

    Use global events and listen for the "completed" event. For example:

    // Server 1
    transactions.on("global:completed", (jobId, result) => {
      // Job completed
    }); 
    

    The pro for this option is that is straightforward. However, in Bull 3.x, events are not guaranteed to always reach its destination, for example in the case of network partitions or Redis disconnection. You can use this method if you can accept that a very small number of results are not notified completely.

    Option 2

    Use the returning job completions pattern described here: https://github.com/OptimalBits/bull/blob/develop/PATTERNS.md#returning-job-completions

    It works by defining a new Queue, lets call it results, where you add the results of the processors right before finalising, like this:

    // Server 2
    transactions.process(QueueOptions.maxConcurrent, async (job) => {
      const result = {result: job.data}
      await results.add(job.data);
      return result
    });
    

    On the server (or servers) that need to listen for results, just create a processor that handles the result:

    // Server 1
    results.process( (job) => {
      // Do something with the result from the transaction queue...
    });
    

    The pro for this solution is robustness. Basically you will never lose any results from the transactions queue, at the expense of some more code complexity.