I'm proofing out an integration test with NestJS/KafkaJS.
I have everything implemented except the function on the event listener (consumer) for the topic I'm emitting to is not being called.
I read somewhere you can't consume a message until the consumer event GROUP_JOIN has completed, not sure if this is right, and/or how I could get my e2e test to wait for this to happen?
Here's the setup of the e2e test -
describe('InventoryController(e2e)', () => {
let app: INestApplication
let client: ClientKafka
beforeAll(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [InventoryModule, KafkaClientModule],
}).compile()
app = moduleFixture.createNestApplication()
await app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'test_clientid',
brokers: process.env.KAFKA_BROKERS.split(' '), // []
ssl: true,
sasl: {
mechanism: 'plain',
username: process.env.KAFKA_CLUSTER_APIKEY,
password: process.env.KAFKA_CLUSTER_SECRET,
},
},
consumer: {
groupId: 'test_consumerids',
},
},
})
await app.startAllMicroservices()
await app.init()
client = moduleFixture.get<ClientKafka>('test_name')
await client.connect()
await app.listen(process.env.port || 3000)
})
afterAll(async () => {
await app.close()
await client.close()
})
it('/ (GET)', async () => {
return request(app.getHttpServer()).get('/inventory/kafka-inventory-test')
})
it('Emits a message to a topic', async () => {
await client.emit('inventory-test', { foo: 'bar' })
})
The client is emitting the message fine,
In my controller I have the event handler for the topic 'inventory-test'
@EventPattern('inventory-test')
async consumeInventoryTest(
// eslint-disable-next-line
@Payload() inventoryMessage: any,
@Ctx() context: KafkaContext,
): Promise<void> {
console.log('inventory-test consumer')
}
I have also logged the microservice with the app.getMicroservices() method and can see under the messageHandlers object it has 'inventory-test' which returns a function
server: ServerKafka {
messageHandlers: Map(1) { 'inventory-test' => [Function] }
Also the message handler is working when I run the app locally.
I've been searching Google a lot and the docs for both kafkajs and nest, there isn't a lot of info out there
I actually finally solved this, you need to await a new promise after your client emits a message for the handler to have time to read it.