typescriptmicroservicesnestjsnestjs-config

Dynamic kafka topic name in nestjs microservice


In Nestjs I am using kafka as message broker and set topic name in like this:

@MessagePattern('topic-name')
async getNewRequest(@Payload() message: any): Promise<void> {
  // my code goes here
}

Is there any way to read kafka topic name from config service module?


Solution

  • I handle this by creating a new custom decorator.

    export const KAFKA_TOPIC_METADATA = '__kafka-topic-candidate';
    
    export function KafkaTopic(variable: string | keyof AppConfig): any {
      return (
        target: any,
        key: string | symbol,
        descriptor: PropertyDescriptor,
      ) => {
        Reflect.defineMetadata(
          KAFKA_TOPIC_METADATA,
          variable,
          descriptor.value,
        );
        return descriptor;
      };
    }
    

    and then dynamically replace it by MessagePattern and set the topic name from appConfig:

    export const KAFKA_TOPIC_METADATA = '__kafka-topic-candidate';
    
    @Injectable()
    export class KafkaDecoratorProcessorService {
      constructor(
        private readonly LOG: Logger,
        private readonly appConfig: AppConfig,
      ) {
      }
    
      processKafkaDecorators(types: any[]) {
        for (const type of types) {
          const propNames = Object.getOwnPropertyNames(type.prototype);
          for (const prop of propNames) {
            const propValue = Reflect.getMetadata(
              KAFKA_TOPIC_METADATA,
              Reflect.get(type.prototype, prop),
            );
    
            if (propValue) {
              const topic = this.appConfig[propValue];
              this.LOG.log(`Setting topic ${topic} for ${type.name}#${prop}`);
              Reflect.decorate(
                [MessagePattern(topic)],
                type.prototype,
                prop,
                Reflect.getOwnPropertyDescriptor(type.prototype, prop),
              );
            }
          }
        }
      }
    }
    

    This is how to run processKafkaDecorators in main.ts file:

    const app = await NestFactory.create(AppModule);
      app
        .get(KafkaDecoratorProcessorService)
        .processKafkaDecorators([AppController]);
    
      app.connectMicroservice({
        transport: Transport.KAFKA,
        ...
       })
    

    note that you have to it run before connecting microservice. And use it like this:

    @KafkaTopic('KAFKA_TOPIC_BOOK_UPDATE')
      async processMessage(
        @Payload() { value: payload }: { value: BookUpdateModel },
      ) {
        ...
      }
    

    Source