I am using BullMQ to add jobs to a queue, which are set to be processed with a 15-second delay. So, I am doing something like this:
// REDIS CONFIG
const redis = new Redis(<REDIS_CONFIG>, {
maxRetriesPerRequest: null,
})
// QUEUE DEFINITION
const myQueue = new Queue(<QUEUE_NAME>, {
defaultJobOptions: { removeOnComplete: true },
connection: redis,
})
// QUEUE HANDLER
const queueHandler = async (job: Job) => {
const { id } = job.data
console.log(`Processing job ${id} after delay:`, new Date())
}
// WORKER DEFINITION
const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, { concurrency: 1, connection: redis })
// ON COMPLETE EVENT
sendWorker.on('completed', async (job: Job) => {
// SOME CODE HERE
})
// ON FAIL EVENT
sendWorker.on('failed', async (job: any, error: Error) => {
// SOME CODE HERE
})
In another file I am doing something like this:
const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
console.log('Adding job to queue:', object, ' at: ', new Date())
myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000}).catch(console.error)
I am using the "delay" option to set a delay when adding jobs to a queue, as described here. However, it’s not working as expected. The delay is only applied to the first job, while the subsequent jobs are processed consecutively without delay. The console is showing the following output:
Adding job to queue: { id: '1' } at: 2024-09-02T15:08:11.421Z
Adding job to queue: { id: '2' } at: 2024-09-02T15:08:11.426Z
Adding job to queue: { id: '3' } at: 2024-09-02T15:08:11.426Z
Processing job 1 after delay: 2024-09-02T15:08:26.488Z // Note: 15 seconds have passed
Processing job 2 after delay: 2024-09-02T15:08:27.129Z // Expected: 15-second delay before processing
Processing job 3 after delay: 2024-09-02T15:08:27.425Z // Expected: 15-second delay before processing
What did I try until now:
1- Set the "delay" option on Queue default job options:
const myQueue = new Queue(<QUEUE_NAME>, {
defaultJobOptions: { removeOnComplete: true, delay: 15000 },
connection: redis,
})
Result: No changes observed
2- Add jobs to queue using a "for each" instead of "for of":
const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
some_objects.forEach((object) => {
console.log('Adding job to queue:', object, ' at: ', new Date())
myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000})
}).catch(console.error)
Result: No changes observed
3- Add jobs one-by-one:
const some_objects = [{ id: '1' }, { id: '2' }, { id: '3' }]
myQueue.add('<JOB_NAME>', { id: some_objects[0].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[1].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[2].id }, { delay: 15000 })
Result: No changes observed
4- Use different job names when adding to queue:
const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
console.log('Adding job to queue:', object, ' at: ', new Date())
myQueue.add(`object_${object.id}`, {id: object.id}, {delay: 15000}).catch(console.error)
Result: No changes observed
5- Check Queue on redis:KEYS *
redis-cli command shows this
- "bull:<queue_name>:3"
- "bull:<queue_name>:1"
- "bull:<queue_name>:id"
- "bull:<queue_name>:events"
- "bull:<queue_name>:meta"
- "bull:<queue_name>:delayed"
- "bull:<queue_name>:2"
Next, I execute the command ZRANGE bull:<queue_name>:delayed 0 -1
to list all members of the sorted set bull:<queue_name>:delayed
and I get:
- "1"
- "2"
- "3"
Results: The jobs are correctly stored in the bull:<queue_name>:delayed set until the specified delay time elapses, at which point they should be moved to the main queue for processing. However, the delayed jobs are not being processed as expected.
Only one worker is active with a concurrency of 1 (see worker definition).
So, recently I was digging into the BullMQ documentation and I discovered this. It turns out that the functionality I wanted wasn't achievable with the delay
configuration, but rather with rate limiting. In my worker, I simply added the limiter
configuration, and I achieved the behavior I was looking for: a 15-second delay between processing jobs.
const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, {
concurrency: 1,
connection: redis,
limiter: {
max: 1,
duration: 15000,
},
})
As I understand it now, the delay
option adds all jobs with a delay to a set, and once the delay time has passed, the worker processes the entire set continuously. With the limiter
option, however, the worker waits the defined duration between processing each job.