nestjssentrybullmq

NestJS Bull Log errors to Sentry


I've recently added Bull to my project to offload things like synchronizing documents to 3rd party services and everything's working well, except errors occurring while processing jobs don't end up in Sentry. They're only logged on the jobs themselves, but since we're running our application on multiple configurations, it means I have to constantly monitor all these instances for job processing errors.

I know I can add an error handler to a processor, but I have quite a few processors already, so I'd prefer another, more global, solution

Is there any way to make sure these errors are also sent to Sentry?


Solution

  • I wasn't able to find a way to do this globally, but I was able to create a base processor class that implemented the OnQueueFailed Event Listener and reported failures to sentry. I have all my processors inherit from it and it seems to work well.

    Base Class:

    import { OnQueueFailed } from '@nestjs/bull';
    import { Logger } from '@nestjs/common';
    import * as Sentry from '@sentry/node';
    import { Job } from 'bull';
    
    export abstract class BaseProcessor {
      protected abstract logger: Logger;
    
      @OnQueueFailed()
      onError(job: Job<any>, error: any) {
        Sentry.captureException(error);
        this.logger.error(
          `Failed job ${job.id} of type ${job.name}: ${error.message}`,
          error.stack,
        );
      }
    }
    

    Processor:

    import { InjectQueue, Process, Processor } from '@nestjs/bull';
    import { Logger } from '@nestjs/common';
    import { Job, Queue } from 'bull';
    import { BaseProcessor } from 'src/common/BaseProcessor';
    import { BULL_QUEUES } from 'src/common/queues';
    
    @Processor(BULL_QUEUES.SOME_QUEUE_NAME)
    export class SomeProcessor extends BaseProcessor {
      protected readonly logger = new Logger(SomeProcessor.name);
    
      constructor(
        // dependencies
      ) {
        super();
      }
    
      @Process()
      async processTask(job: Job) {
        // processor code here
      }
    }