I am relatively new with the subject Apache Kafka. When I run my test with only one topic, it passes. But when I add more topics, the number of partitions increases and it always returns failure. I have following test class:
@SpringBootTest
@ActiveProfiles("test")
@EmbeddedKafka(partitions = 6, topics = {"topicA", "topicB", "topicC"})
@TestPropertySource(properties = {
"spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}"})
class KafkaEventConsumerIntegrationTest {...}`
During my tests I get following error:
java.lang.IllegalStateException: Expected 6 but got 18 partitions
at org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(ContainerTestUtils.java:85)
at com.mbti.cddm.service.integrationtests.KafkaEventConsumerIntegrationTest.setUp(KafkaEventConsumerIntegrationTest.java:74)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)`
The error occurs here:
@BeforeEach
void setUp() {
for (MessageListenerContainer messageListenerContainer : endpointRegistry.getAllListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
}
}
How to fix this issue?
I tried to write some integration tests for my Kafka Listener class with test containers
That utility method counts the total partitions assigned to a container.
If a single container consumes from all three topics, use
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafkaBroker.getPartitionsPerTopic() * embeddedKafkaBroker.getTopics().size());
Or, you can get the count of topics subscribed for a particular container via its getContainerProperties()
method.