nestjsintegration-testingkafka-consumer-apie2e-testingkafkajs

How to set up NestJS microservices integration/e2e testing KafkaJS consumer (MessagePattern/EventPattern handler)


I'm proofing out an integration test with NestJS/KafkaJS.

I have everything implemented except the function on the event listener (consumer) for the topic I'm emitting to is not being called.

I read somewhere you can't consume a message until the consumer event GROUP_JOIN has completed, not sure if this is right, and/or how I could get my e2e test to wait for this to happen?

Here's the setup of the e2e test -

describe('InventoryController(e2e)', () => {
  let app: INestApplication
  let client: ClientKafka
  beforeAll(async () => {
    const moduleFixture: TestingModule = await Test.createTestingModule({
      imports: [InventoryModule, KafkaClientModule],
    }).compile()
    app = moduleFixture.createNestApplication()
    await app.connectMicroservice({
      transport: Transport.KAFKA,
      options: {
        client: {
          clientId: 'test_clientid',
          brokers: process.env.KAFKA_BROKERS.split(' '), // []
          ssl: true,
          sasl: {
            mechanism: 'plain',
            username: process.env.KAFKA_CLUSTER_APIKEY,
            password: process.env.KAFKA_CLUSTER_SECRET,
          },
        },
        consumer: {
          groupId: 'test_consumerids',
        },
      },
    })
    await app.startAllMicroservices()
    await app.init()
    client = moduleFixture.get<ClientKafka>('test_name')
    await client.connect()
    await app.listen(process.env.port || 3000)
  })

  afterAll(async () => {
    await app.close()
    await client.close()
  })

  it('/ (GET)', async () => {
    return request(app.getHttpServer()).get('/inventory/kafka-inventory-test')
  })

  it('Emits a message to a topic', async () => {
    await client.emit('inventory-test', { foo: 'bar' })
  })

The client is emitting the message fine,

In my controller I have the event handler for the topic 'inventory-test'

  @EventPattern('inventory-test')
  async consumeInventoryTest(
    // eslint-disable-next-line
    @Payload() inventoryMessage: any,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    console.log('inventory-test consumer')
  }

I have also logged the microservice with the app.getMicroservices() method and can see under the messageHandlers object it has 'inventory-test' which returns a function

        server: ServerKafka {
          messageHandlers: Map(1) { 'inventory-test' => [Function] }

Also the message handler is working when I run the app locally.

I've been searching Google a lot and the docs for both kafkajs and nest, there isn't a lot of info out there


Solution

  • I actually finally solved this, you need to await a new promise after your client emits a message for the handler to have time to read it.