I wanted to consume Kafka messages and thus I've invoked the emit()
message first which successfully stored a message in Kafka.
I can see that by running the batch file: bin/windows/kafka-console-consumer.bat --topic signals --from-beginning --bootstrap-server localhost:9092
and the new message is logged. {"botId":"TKS","emailToken":"38fhsf29h","pair":"xy","size":100}
This is my nodejs/nestjs service file. What am I missing in my script - since the same script writes successfully to Kafka? I've tried @MessagePattern
and @EventPattern
to consume the message. Any ideas?
import { Injectable, OnModuleInit } from '@nestjs/common';
import { TVSignal } from './tvsignal';
import { Client, ClientKafka, EventPattern, MessagePattern, Payload, Transport } from "@nestjs/microservices";
import { Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
@Injectable()
export class AppService implements OnModuleInit {
constructor(private eventEmitter: EventEmitter2) {}
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'trading-signal-receiver',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'signals',
allowAutoTopicCreation: true
},
producer: {
}
}
})
kafkaClient: ClientKafka;
async onModuleInit() {
this.kafkaClient.subscribeToResponseOf('signals');
await this.kafkaClient.connect();
Logger.log("consumer assignments: " + JSON.stringify(this.kafkaClient.getConsumerAssignments()))
}
// DOES NOT WORK
@EventPattern('signals')
async handleEntityCreated(payload: any) {
Logger.log("RECEIVED NEW: "+ JSON.stringify(payload));
}
getTradingSignals(): string {
return ""
}
// DOES NOT WORK
@MessagePattern('signals') // Our topic name
getMessage(@Payload() message) {
Logger.log("RECEIVED KAFKA MSG" + message.value);
return 'Hello World';
}
// WORKS
storeSignal(signal: TVSignal){
Logger.log("STORED: " + JSON.stringify(signal))
Logger.log("consumer assignments: " + JSON.stringify(this.kafkaClient.getConsumerAssignments()))
// send message to OnEvent
this.eventEmitter.emit('signal.saved', signal);
// store obj in kafka
return this.kafkaClient.emit('signals', signal); // args - topic, message
}
}
There reason why I could not consume them, was because I missed out on calling app.startAllMicroservices()
(KAFKA is one of them).
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { microserviceConfig} from "./msKafkaConfig";
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.enableCors();
app.connectMicroservice(microserviceConfig);
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();