javaspring-bootapache-kafkaspring-kafkakafka-consumer-api

How Kafka Listener re delivers messages when started consuming messages with auto commit set to false


Let's say suppose I started my consumer with auto commit set to false and consumer started listening to the messages.

  1. My listener processed 50 messages out of total polled as 100 and then only 50 has been committed and rest 51 to 100 are uncommitted.
  2. And I started processing next set of messages from 101 and committed till 200.
  3. After some time will my consumer receive messages from 51 to 100 as I didn't commit them?

How my Kafka Consumer will behave with this Use Case??

I've tried below approach. Polling messages in Batch and commit those batch of records based upon my processing result.

@KafkaListener(id="${listenerID}",topics = "${consumer.topic}", containerFactory = "listenerContainerFactory",autoStartup ="${isListenerEnabled}")
public void messageListener(List<ConsumerRecord<String, String>> list,Consumer<String, String> consumer) {
 try {
        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap=processMessage(list, acknowledgment,consumer); //Here I've logic which validates batch of records and gives result for all the processed records. 
        consumer.commitSync(offsetAndMetadataMap);
    }
    catch (Exception e){
               }
}



public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory()
 {
 ConcurrentKafkaListenerContainerFactory<String, String> factory = new  ConcurrentKafkaListenerContainerFactory<>();

      factory.setConsumerFactory(consumerFactory());
      factory.setBatchListener(true); 
      factory.setAutoStartup(false); 
      factory.setConcurrency(consumerProperties.getConsumerThreads()); 
      factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000,2)));       
     
    if(!consumerProperties.isAutoCommit()) { 
      factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
      }
 return factory; 
}


public ConsumerFactory<String, String> consumerFactory() { 
Map<String, Object> properties = new HashMap<>();
 try {
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStarpServer);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
      
        return new DefaultKafkaConsumerFactory<>(properties);
    }

Solution

  • In Kafka there is no offset commit for a particular record. It can only commit offset per partition. We had implemented this using our own implementation.