node.jstypescriptdelayfastifybullmq

Issues with Delayed Job Processing in BullMQ


I am using BullMQ to add jobs to a queue, which are set to be processed with a 15-second delay. So, I am doing something like this:


// REDIS CONFIG
const redis = new Redis(<REDIS_CONFIG>, {
    maxRetriesPerRequest: null,
})

// QUEUE DEFINITION
const myQueue = new Queue(<QUEUE_NAME>, {
    defaultJobOptions: { removeOnComplete: true },
    connection: redis,
})

// QUEUE HANDLER
const queueHandler = async (job: Job) => {
    const { id } = job.data
    console.log(`Processing job ${id} after delay:`, new Date())
}

// WORKER DEFINITION
const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, { concurrency: 1, connection: redis })

// ON COMPLETE EVENT
sendWorker.on('completed', async (job: Job) => {
    // SOME CODE HERE
})

// ON FAIL EVENT
sendWorker.on('failed', async (job: any, error: Error) => {
    // SOME CODE HERE
})

In another file I am doing something like this:

const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
    console.log('Adding job to queue:', object, ' at: ', new Date())
    myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000}).catch(console.error)

I am using the "delay" option to set a delay when adding jobs to a queue, as described here. However, it’s not working as expected. The delay is only applied to the first job, while the subsequent jobs are processed consecutively without delay. The console is showing the following output:

Adding job to queue: { id: '1' } at: 2024-09-02T15:08:11.421Z

Adding job to queue: { id: '2' } at: 2024-09-02T15:08:11.426Z

Adding job to queue: { id: '3' } at: 2024-09-02T15:08:11.426Z

Processing job 1 after delay: 2024-09-02T15:08:26.488Z // Note: 15 seconds have passed

Processing job 2 after delay: 2024-09-02T15:08:27.129Z // Expected: 15-second delay before processing

Processing job 3 after delay: 2024-09-02T15:08:27.425Z // Expected: 15-second delay before processing

What did I try until now:

1- Set the "delay" option on Queue default job options:

const myQueue = new Queue(<QUEUE_NAME>, {
    defaultJobOptions: { removeOnComplete: true, delay: 15000 },
    connection: redis,
})

Result: No changes observed

2- Add jobs to queue using a "for each" instead of "for of":

const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
some_objects.forEach((object) => {
    console.log('Adding job to queue:', object, ' at: ', new Date())
    myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000})
}).catch(console.error)

Result: No changes observed

3- Add jobs one-by-one:

const some_objects = [{ id: '1' }, { id: '2' }, { id: '3' }]
myQueue.add('<JOB_NAME>', { id: some_objects[0].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[1].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[2].id }, { delay: 15000 })

Result: No changes observed

4- Use different job names when adding to queue:

const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
    console.log('Adding job to queue:', object, ' at: ', new Date())
    myQueue.add(`object_${object.id}`, {id: object.id}, {delay: 15000}).catch(console.error)

Result: No changes observed

5- Check Queue on redis:KEYS * redis-cli command shows this

  1. "bull:<queue_name>:3"
  2. "bull:<queue_name>:1"
  3. "bull:<queue_name>:id"
  4. "bull:<queue_name>:events"
  5. "bull:<queue_name>:meta"
  6. "bull:<queue_name>:delayed"
  7. "bull:<queue_name>:2"

Next, I execute the command ZRANGE bull:<queue_name>:delayed 0 -1 to list all members of the sorted set bull:<queue_name>:delayed and I get:

  1. "1"
  2. "2"
  3. "3"

Results: The jobs are correctly stored in the bull:<queue_name>:delayed set until the specified delay time elapses, at which point they should be moved to the main queue for processing. However, the delayed jobs are not being processed as expected.

Only one worker is active with a concurrency of 1 (see worker definition).


Solution

  • So, recently I was digging into the BullMQ documentation and I discovered this. It turns out that the functionality I wanted wasn't achievable with the delay configuration, but rather with rate limiting. In my worker, I simply added the limiter configuration, and I achieved the behavior I was looking for: a 15-second delay between processing jobs.

    const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, {
        concurrency: 1,
        connection: redis,
        limiter: {
            max: 1,
            duration: 15000,
        },
    })
    

    As I understand it now, the delay option adds all jobs with a delay to a set, and once the delay time has passed, the worker processes the entire set continuously. With the limiter option, however, the worker waits the defined duration between processing each job.