node.jsapache-kafkanestjsnestjs-gateways

fail to consume kafka events with nestjs


I wanted to consume Kafka messages and thus I've invoked the emit() message first which successfully stored a message in Kafka.

I can see that by running the batch file: bin/windows/kafka-console-consumer.bat --topic signals --from-beginning --bootstrap-server localhost:9092

and the new message is logged. {"botId":"TKS","emailToken":"38fhsf29h","pair":"xy","size":100}

This is my nodejs/nestjs service file. What am I missing in my script - since the same script writes successfully to Kafka? I've tried @MessagePattern and @EventPattern to consume the message. Any ideas?

import { Injectable, OnModuleInit } from '@nestjs/common';
import { TVSignal } from './tvsignal';
import { Client, ClientKafka, EventPattern, MessagePattern, Payload, Transport } from "@nestjs/microservices";
import { Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';

@Injectable()
export class AppService implements OnModuleInit {

  constructor(private eventEmitter: EventEmitter2) {}

  @Client({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: 'trading-signal-receiver',
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'signals',
        allowAutoTopicCreation: true
      },
      producer: {
      }
    }
  })
  kafkaClient: ClientKafka;


  async onModuleInit() {
    this.kafkaClient.subscribeToResponseOf('signals');
    await this.kafkaClient.connect();
    Logger.log("consumer assignments: " + JSON.stringify(this.kafkaClient.getConsumerAssignments()))

  }
  
  // DOES NOT WORK
  @EventPattern('signals')
  async handleEntityCreated(payload: any) {
    Logger.log("RECEIVED NEW: "+ JSON.stringify(payload));
  }

  getTradingSignals(): string {
    return ""
  }

  // DOES NOT WORK
  @MessagePattern('signals') // Our topic name
  getMessage(@Payload() message) {
    Logger.log("RECEIVED KAFKA MSG" + message.value);
    return 'Hello World';
  }


    // WORKS
    storeSignal(signal: TVSignal){
    Logger.log("STORED: " + JSON.stringify(signal))  
    Logger.log("consumer assignments: " + JSON.stringify(this.kafkaClient.getConsumerAssignments()))
    // send message to OnEvent
    this.eventEmitter.emit('signal.saved', signal);
    // store obj in kafka
    return this.kafkaClient.emit('signals', signal); // args - topic, message
    
  }

  

}

Solution

  • There reason why I could not consume them, was because I missed out on calling app.startAllMicroservices() (KAFKA is one of them).

    import { NestFactory } from '@nestjs/core';
    import { AppModule } from './app.module';
    import { microserviceConfig} from "./msKafkaConfig";
    
    async function bootstrap() {
      const app = await NestFactory.create(AppModule);
      app.enableCors();   
      app.connectMicroservice(microserviceConfig);
      await app.startAllMicroservices();
      
      await app.listen(3000);
    
    
    
    }
    bootstrap();