node.jsredispriority-queuejob-queuebull.js

Remove duplicated outputs from BullJS job queue


I am running a nodejs/express API server on Heroku (my web server dyno) that queues up jobs in Redis that are managed by BullJS on a worker server (a separate worker dyno)

These jobs can take a while (up to 2 hours). However, I have noticed that sometimes, a job with id x will start, and then a job with id x+1 will start 5 minutes later with the exact same inputs. These are duplicate tasks (they're not both coming in from user actions in the app), and I'd like to either prevent them from happening or clean them up later

I have considered deleting the job when it fails (something like the below approach), but I'm worried that because Redis seems to reuse jobIds over time if this is a bad idea, plus I don't exactly want jobs to fail silently and the work never gets done

workQueue.on('global:failed', async function (jobId, error) {
  console.log("Failed: " + JSON.stringify(jobId), error);
  await workQueue.removeJobs(jobId);
  await deleteDBEntryWithJobId(jobId); // A fnc that looks at DB and deleted the entry
});

Here is some more info about the codebase if helpful. I'd take any design tips or suggestions to make this process more robust. I'm less concerned with duplicated jobs and more concerned with duplicated data in the DB which gets shown to a user

// Data model for output from the job
{
  jobId: string // This is the BullJS jobId
  name: string
  data: any[] // A lot of data that's pulled from various APIs during the job
}

// API endpoint that takes input and kicks off job
let workQueue = new Queue('work', REDIS_URL);
app.post('/createSegment', async (req: Request, res: Response) => {
  corsHandler(req, res, async () => {
    const jobOptions: JobOptions = { removeOnComplete: true, removeOnFail: true };
    let job = await workQueue.add({ ...req.body, jobType: NEW_SEGMENT_JOB }, jobOptions);
    res.json({ jobId: job.id, data: job.data });
    return;
  })
});

// The worker code
let REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";

let workers = process.env.WEB_CONCURRENCY || 4;
let maxJobsPerWorker = 50;

function start() {
  let workQueue = new Queue('work', REDIS_URL);

  workQueue.process(maxJobsPerWorker, async (job) => {
    const { jobType } = job.data;
    if (jobType === NEW_SEGMENT_JOB) {
      const segmentId = await createNewSegment(job.data, job.id); // This creates the initial record in the DB (relatively simple / safe operation, which is why is gets duplicated)
      await analyzeSegment(segmentId);
    }
  })
}

throng({ workers, start });

Solution

  • I suspect that the job might be failing & BullJs is re-attempting. To be on safer side, put explicitly attempts option to zero (i'm not sure what's the default value for it).

    
    const jobOptions: JobOptions = { removeOnComplete: true, removeOnFail: true, attempts:0 };
    
    

    As per https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueadd, removeOnFail If true, removes the job when it fails after all attempts. So the current removeOnComplete and removeOnFail may still cause the default attempts to be valid.