I want to test the reading of events from a kafka topic in a Spring boot te
@Slf4j
@SpringBootTest
@ActiveProfiles("kafka-test")
class EmbeddedKafkaIntegrationTest {
private static final String TOPIC = "my-topic";
private final BlockingQueue<ConsumerRecord<String, MyEvent>> consumptionQueue = new LinkedBlockingDeque<>();
@EventListener(condition = "event.listenerId.startsWith('test-listener')")
public void idleEventHandler(ListenerContainerIdleEvent event) {
log.info(event.toString());
}
@KafkaListener(id = "test-listener", idIsGroup = false, topics = "${beb.topic.prefix}" + TOPIC,
autoStartup = "true", containerFactory = "kafkaListenerContainerFactory")
private void listen(ConsumerRecord<String, MyEvent> consumerRecord) throws InterruptedException {
log.info("Consume key={}, value={}", consumerRecord.key(), consumerRecord.value());
consumptionQueue.put(consumerRecord);
}
}
For some reason the @EventListener is not working. The @KafkaListener works fine. If I put the @EventListener method in one of the application component classes, it also works fine. It just refuses to work within the EmbeddedKafkaIntegrationTest.
I want to use the eventlistener as part of my test conditions.
You probably don't show the whole picture of your configuration, but that @EventListener
has to be moved to the @Configuration
class. For example, you can add a @TestConfiguration
in addition to the rest of your Spring Boot configuration involved in this test suite.
I also believe that @KafkaListener
you show is not a part of that EmbeddedKafkaIntegrationTest
. Since, as Martin, pointed out it is not supposed to work from a test class by itself. Just because the test class is not a bean for scanning.
See more info in Spring Boot docs: https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.testing