node.jsredisqueuebull-queue

Bull queue: When a job fails, how to stop queue from processing remaining jobs?


I am using bull queue to process some jobs. In the current scenario each job is some kind of an operation. So whenever an operation(job) among a list of operations in the queue fails, queue has to stop processing the remaining jobs(operations).

What have I tried so far?

So i tried to pause the queue when a particular job fails. Next the queue is resumed when it drains. Now when it is resumed the queue does not start from the failed job instead picks up the next job.

var Queue = require('bull');

let redisOptions = {
  redis: { port: 6379, host: '127.0.0.1' },
  limiter: { max: 1, duration: 1000 }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  setTimeout(() => {
    done(job.data.error);
  }, job.data.time);
});

let options = {
  attempts: 3,
  removeOnComplete: false, // removes job from queue on success
  removeOnFail: false // removes job from queue on failure
}

setTimeout(() => {
  myQueue.add('Type-1', { time: 10000, description: "Type-1 One", error: false }, options);
}, 1 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 5000, description: "Type-1 two", error: true }, options);
}, 2 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 3000, description: "Type-1 three", error: false }, options);
}, 3 * 1000);


myQueue.on('completed', function (job, result) {
  console.log("Completed: " + job.data.description);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: " + job.data.description);
  try {
    await myQueue.pause();
  } catch (error) {
    console.log(error);
  }
});

myQueue.on('drained', async function () {
  try {
    await myQueue.resume();
  } catch (error) {
    console.log(error);
  }
});

Current Output:

Current result

Expected Output: if the Type-1 two completes successfully in 3rd attempt.

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Completed: Type-1 two
Completed: Type-1 three

Expected Output: if the Type-1 two failed in 3rd attempt as well.

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Failed: Type-1 two

All i want is the queue has to stop processing new jobs until current job is completed without any failure. In case of any failure, the failed job has to run some x number of time. At the x+1 attempt it has to clear(delete all jobs) the queue. So how to achieve this linear behavior in queue.


Solution

  • In bull Its not possible to repeat the same job immediately after its failure before picking up the next job in the queue.

    Solution:

    1. Create new job and set its priority to a value less than the current job type.
    2. Release the failed job (resolve() or done())
    3. This new job will be picked up immediately by the bull for processing.

    Sample code: In the below code Job-3 will fail and create new job and so on until "purpose of the job" succeeds at some point of time.

    var Queue = require('bull');
    
    let redisOptions = {
      redis: { port: 6379, host: '127.0.0.1' }
    }
    var myQueue = new Queue('Linear-Queue', redisOptions);
    
    myQueue.process('Type-1', function (job, done) {
      console.log(`Processing Job-${job.id} Attempt: ${job.attemptsMade}`);
      downloadFile(job, async function (error) {
        if (error) {
          await repeatSameJob(job, done);
        } else {
          done();
        }
      });
    });
    
    async function repeatSameJob(job, done) {
      let newJob = await myQueue.add('Type-1', job.data, { ...{ priority: 1 }, ...job.opts });
      console.log(`Job-${job.id} failed. Creating new Job-${newJob.id} with highest priority for same data.`);
      done(true);
    }
    
    function downloadFile(job, done) {
      setTimeout(async () => {
        done(job.data.error)
      }, job.data.time);
    }
    
    myQueue.on('completed', function (job, result) {
      console.log("Completed: Job-" + job.id);
    });
    
    myQueue.on('failed', async function (job, error) {
      console.log("Failed: Job-" + job.id);
    });
    
    let options = {
      removeOnComplete: true, // removes job from queue on success
      removeOnFail: true // removes job from queue on failure
    }
    
    for (let i = 1; i <= 5; i++) {
      let error = false;
      if (i == 3) { error = true; }
    
      setTimeout(i => {
        let jobData = {
          time: i * 2000,
          error: error,
          description: `Job-${i}`
        }
        myQueue.add('Type-1', jobData, options);
      }, i * 2000, i);
    }
    

    Output:

    Output