redisnestjsbull

How to execute predefined number of jobs at one time in Nestjs bull


I have 65 same named jobs to execute in one queue with 3 instances of consumers (A, B, C). I want to do at one time each consumer execute 10 jobs. After the 10 execution completed, if there are available jobs greater than 10 in the queue that consumer again execute 10 jobs. If not execute available jobs.

jobs 1 to 65

Consumer A execute 1 to 10 Consumer B execute 11 to 20 Consumer C execute 21 to 30

Lets take B, A, C finished the execution in order. then

B - 31,32,33,.40 A - 41,42,43,.50 C - 51,52,53,.60

if C finish the execution first, C execute the remaining 5 jobs. Please can I know are there any ways to achieve this.

producer

@Injectable()
export class SampleQueueProducerService {
  constructor(@InjectQueue('sample-queue') private sampleQueue: Queue) {}

  async sendDataToJob(message: string) {
    await this.sampleQueue.add('job', { message });
  }
}

consumer

@Processor('sample-queue')
export class SampleQueueConsumerService {
  @Process({ name: 'job' })
  async sampleJob(job: Job<any>) {
    console.log(job.data);
  }
}

all 3 consumers are same as above.


Solution

  • You can get x jobs using this functionality (bull reference)

    However you don't have this exactly supported with NestJS

    What you can do is use @OnGlobalQueueWaiting() or @OnQueueWaiting() to get the waiting jobs, storing them in an array, once it reaches 10, you can send them to be processed.

    Something like:

    @OnGlobalQueueWaiting()
    async onGlobalWaiting(jobId: number, result: any) {
      const job = await this.myQueue.getJob(jobId);
      this.jobArray.push(job)
      if (this.jobArray.length >= 10) {
           await this.processJobs(this.jobArray);
           this.jobArray = [];
       }
    }
    
    async processJobs(jobs: Job[]){'
       jobs.forEach(job => do something)
    }