javaspring-bootapache-kafka

ConsumerRecords is always empty in Kafka, Java, but Future<RecordMetadata> isDone method result true


I try to send message to exact Kafka topic and then receive message from it. I have 3 configuration classes for Consumer, Producer and Topic:

public class KafkaTopicConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
        return new KafkaAdmin(configs);
    }
}
public class KafkaConsumerConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public Consumer<String, String> consumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                globalConfiguration.kafka().bootstrapAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                globalConfiguration.kafka().groupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        return new KafkaConsumer<>(props);
    }
}
public class KafkaProducerConfiguration {

    @Autowired
    GlobalConfiguration globalConfiguration;

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new KafkaProducer<>(configProps);
    }
}

GlobalConfiguration class stores all my properties. For Kafka:

bootstrapAddress = "localhost:9092"
groupId = "KafkaExampleConsumer"

Then I send message in this way

    private void sendMessage(final String topic, final String message) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
        Future<RecordMetadata> sendResponse = kafkaProducer.send(producerRecord);
        kafkaProducer.flush();
        boolean isSent = sendResponse.isDone();
    } 

I check if message is sent with sendResponse.isDone() and it returns true. But then I try to receive message:

protected String receiveMessage(final String topic, final String message) {
        kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
        consumer.subscribe(Collections.singleton(topic));
        ConsumerRecords<String, String> consumerRecords =
                consumer.poll(Duration.ofMillis(10000));
        consumerRecords.isEmpty();
    }

And ConsumerRecords always empty. What problem it can be?


Solution

  • If you want to consume previously sent records, rather than poll records sent within 10 seconds of you starting the consumer, you need to add

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                OffsetResetStrategy.EARLIEST.name().toLowerCase());
    

    Note - this only works on a brand new consumer group id / no existing committed offsets