node.jsqueuebull

Bull queue - ability to mark job as "failed" (depending on result) and then optionally retry manually


I have a queue system that uses Bull (https://optimalbits.github.io/bull/), which receives API requests, and then dispatches them to an ERP system consecutively once each request completes. (e.g. To avoid crashing the ERP system when a user generates 20 API requests at once.)

However, sometimes the ERP system fails to process the API commands for a variety of reasons. Currently this is treated as "completed" in the Bull queue, with a custom status of failed.

However, we'd like to be able to be notified of such failures, and be able to manually retry the failed API command.

In the documentation (https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueprocess) it seems to indicate the done callback can be called with an error instance - but doesn't give an example...

The callback is called everytime a job is placed in the queue. It is passed an instance of the job as first argument.

If the callback signature contains the second optional done argument, the callback will be passed a done callback to be called after the job has been completed. The done callback can be called with an Error instance, to signal that the job did not complete successfully, or with a result as second argument (e.g.: done(null, result);) when the job is successful. Errors will be passed as a second argument to the "failed" event; results, as a second argument to the "completed" event.

If, however, the callback signature does not contain the done argument, a promise must be returned to signal job completion. If the promise is rejected, the error will be passed as a second argument to the "failed" event. If it is resolved, its value will be the "completed" event's second argument.

[emphasis added] Here's a sample of the code I'm using where the queue is being processed:

queue.process(async (job, done) => {
    switch (job.data.type) {
      case 'API_Type':
        const workOrder = await createWorkOrder(job.data)
        socketService.emiter('API_Type', workOrder, job.data.socketId)
        workOrder.status === 'success' ? done(null, workOrder) : placeJobInFailedStatus
        break
      //...
      default:
        done()
    }
  })

Where it says in pseudo code "placeJobInFailedStatus" - how can I instead make it "fail"/"stall"/"pause" just that job in the Bull queue, while continuing with others? (instead of marking it completed)

And is there any way to manually retry a "failed" queue entry, or does it need to be re-added fresh to the queue? (I don't want to just try it again in a few seconds - it may need some manual user input to adjust something first.) I'm not seeing a way to manually retry in the documentation. (There is https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#jobretry, but I'm not following as to how it's invoked. And there is https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#jobpromote, but I'm also not seeing how to put a task into a "delayed" status.)

Before I try designing a separate method to handle such, I'm wondering if some of it is supported natively, as it seems like it should be a common need for a queue system.


Solution

  • I figured out how to do this (2 days after I posted the question, but forgot to update it earlier since there were no responses ever added).

    Essentially, you need to pass the ID, get the job, and then use job.retry()

    Here's an example:

    const webApiRetry = async (key) => {
      // Display date for retry
      var display_format_options = { year: 'numeric', month: 'short', day: 'numeric' };
      let date_object = new Date(Date.now())
      let date_display = date_object.toLocaleDateString("en-US", display_format_options) // provide in specified format
      let time_display = date_object.toLocaleTimeString("en-US", {hour: 'numeric', minute:'2-digit', second: '2-digit', hour12: false, timeZone: "America/Chicago", timeZoneName: 'short'})
      let datetime_display = date_display + ' | ' + time_display.slice(0, -4)
      console.log("webApiRetry ID: ", key, "Date: ", datetime_display)
      // Fetch job by ID
      let job = await webApiQueue.getJob(key)
      // Update job to include retried datetime
      await job.update({...job.data, "retried": datetime_display})
      // Retry job
      await job.retry()
    }