javaspring-bootintegration-testingspring-boot-testspring-kafka-test

Spring KafkaTestUtils: wait until message in topic would be consumed


So, i faced interesting situation:

In my integration test i send data to 2 topics, and they should be consumed by application by the order. At first - userTopic, then shoud be consumed userOrderTopic.

When i send my messages in test, it sends really rapidly, at the same time. But sometimes, the order of consuming by application is different. (userOrder topic, then userTopic) what breaks my test at all.

I found some triks, how i could avoid it - thread sleep, etc. But i believe that it's bad solution.

I am new with kafka, as result with KafkaTestUtils. Is there any method, which could check that message in topic was consumed/check until that message was consumed?

Something like this:

//check that message in userTopic was consumed, to have a chance send message to userOrderTopic 
KafkaTestUtils.waitUntilBeConsumed(serverBootstrap, "user-consumer", topic, 0, messageToUserTopic)

or

KafkaTestUtils.getNowConsumedMessage(serverBootstrap, "user-consumer", topic, 0)

P.S. in my intergration test i don't have accsess to producer/consumer.

Or maybe in spring.kafka /spring.kafka.test are another tools to do it in proper way?


Solution

  • KafkaTestUtils is only for consuming from test consumers not real application consumers.

    There is no easy solution for this race condition; you need to wait for the first record to be consumed before sending the second.