spring-bootapache-kafkaspring-kafkaspring-kafka-test

Embedded Kafka Test - The Kafka listener is not invoked


I have created a unit test to test the Kafka listener as shown below.

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092","port=909"})
class ConsumerTest {

    @Autowired
    KafkaTemplate producer;

    @Test
    public void consumeEvents1Test() throws InterruptedException {
        producer.send("events1", "Sample message");
        Thread.sleep(1000);
    }
}

The Consumer is created a shown below.

@Component
public class Consumer {
    Logger LOG = LoggerFactory.getLogger(Consumer.class);

    @KafkaListener(id= "${topic1}" ,
            topics = "${topic1}",
            groupId = "${consumer.group1}", concurrency = "1", containerFactory = "kafkaListenerContainerFactory")
    public void consumeEvents1(String message, @Headers Map<String, String> header, Acknowledgment acknowledgment) {
        LOG.info("Message - {}", message);
        LOG.info(header.get(KafkaHeaders.GROUP_ID) + header.get(KafkaHeaders.RECEIVED_TOPIC)+String.valueOf(header.get(KafkaHeaders.OFFSET)));
        acknowledgment.acknowledge();


    }
}

The Consumer Factory and the Container Factory are created as shown below.

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootStrapServers);
    props.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setAutoStartup(autoStart);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
   


    return factory;
}

The dependencies from the POM are,

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            
        </dependency>
        
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
        </dependency>


    </dependencies>

But, when I invoked the test case, the message is posted to the embedded Kafka, but the actual listener is not invoked. Not sure what is wrong with the test setup.

The application.properties are given below,

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
consumer.group1=events-group1
topic1=events1
kafka.listener.autostart=true

Solution

  • You are creating your own ConsumerFactory so

    spring.kafka.consumer.auto-offset-reset=earliest
    

    is not being applied. The record is published before the consumer starts so you have a race condition.

    You need

    props.put(
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
            "earliest");
    

    Or you should use Boot's auto configured consumer factory.