spring-bootapache-kafkakafka-consumer-apispring-kafkaspring-kafka-test

Numbers of partitions during EmbeddedKafka tests with multiple topics


I am relatively new with the subject Apache Kafka. When I run my test with only one topic, it passes. But when I add more topics, the number of partitions increases and it always returns failure. I have following test class:

@SpringBootTest
@ActiveProfiles("test")
@EmbeddedKafka(partitions = 6, topics = {"topicA", "topicB", "topicC"})
@TestPropertySource(properties = {
        "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}"})
class KafkaEventConsumerIntegrationTest {...}`

During my tests I get following error:

java.lang.IllegalStateException: Expected 6 but got 18 partitions

    at org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(ContainerTestUtils.java:85)
    at com.mbti.cddm.service.integrationtests.KafkaEventConsumerIntegrationTest.setUp(KafkaEventConsumerIntegrationTest.java:74)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)`

The error occurs here:

    @BeforeEach
    void setUp() {
        for (MessageListenerContainer messageListenerContainer : endpointRegistry.getAllListenerContainers()) {
            ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
        }
    }

How to fix this issue?

I tried to write some integration tests for my Kafka Listener class with test containers


Solution

  • That utility method counts the total partitions assigned to a container.

    If a single container consumes from all three topics, use

    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
        embeddedKafkaBroker.getPartitionsPerTopic() * embeddedKafkaBroker.getTopics().size());
    

    Or, you can get the count of topics subscribed for a particular container via its getContainerProperties() method.