I have created a unit test to test the Kafka listener as shown below.
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092","port=909"})
class ConsumerTest {
@Autowired
KafkaTemplate producer;
@Test
public void consumeEvents1Test() throws InterruptedException {
producer.send("events1", "Sample message");
Thread.sleep(1000);
}
}
The Consumer is created a shown below.
@Component
public class Consumer {
Logger LOG = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(id= "${topic1}" ,
topics = "${topic1}",
groupId = "${consumer.group1}", concurrency = "1", containerFactory = "kafkaListenerContainerFactory")
public void consumeEvents1(String message, @Headers Map<String, String> header, Acknowledgment acknowledgment) {
LOG.info("Message - {}", message);
LOG.info(header.get(KafkaHeaders.GROUP_ID) + header.get(KafkaHeaders.RECEIVED_TOPIC)+String.valueOf(header.get(KafkaHeaders.OFFSET)));
acknowledgment.acknowledge();
}
}
The Consumer Factory and the Container Factory are created as shown below.
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootStrapServers);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setAutoStartup(autoStart);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
The dependencies from the POM are,
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>
</dependencies>
But, when I invoked the test case, the message is posted to the embedded Kafka, but the actual listener is not invoked. Not sure what is wrong with the test setup.
The application.properties are given below,
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
consumer.group1=events-group1
topic1=events1
kafka.listener.autostart=true
You are creating your own ConsumerFactory
so
spring.kafka.consumer.auto-offset-reset=earliest
is not being applied. The record is published before the consumer starts so you have a race condition.
You need
props.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
Or you should use Boot's auto configured consumer factory.