I'm coming back to NestJS after little while and I'm trying to set up a simple Kafka consumer. Here is a basic example of my controller using the EventPattern
decorator to subscribe to a Kafka topic.
import { Controller } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
@Controller()
export class MyController {
@EventPattern('my-topic')
async handleMessage(@Payload() data: Record<string, unknown>) {
console.log(data);
}
}
This setup used to work for me in the past, but I'm now running into some issues. On startup, KafkaJS raises the following error:
KafkaJSNonRetriableError: Invalid topic undefined
It looks to me like the topic name is not being picked up properly. I'm a bit at a loss why this would be the case. For context, here is how I set up the Kafka client in main.ts
:
const app = await NestFactory.create(AppModule);
...
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
brokers: [process.env['KAFKA_BROKER']],
clientId: process.env['KAFKA_CLIENT_ID'],
},
consumer: {
groupId: process.env['KAFKA_CONSUMER_GROUP_ID'],
allowAutoTopicCreation: true,
},
},
});
...
await app.startAllMicroservices();
Any ideas would be much appreciated!
I solved this problem by updating kafkajs
to version 2.2.4