Let's say suppose I started my consumer with auto commit set to false and consumer started listening to the messages.
How my Kafka Consumer will behave with this Use Case??
I've tried below approach. Polling messages in Batch and commit those batch of records based upon my processing result.
@KafkaListener(id="${listenerID}",topics = "${consumer.topic}", containerFactory = "listenerContainerFactory",autoStartup ="${isListenerEnabled}")
public void messageListener(List<ConsumerRecord<String, String>> list,Consumer<String, String> consumer) {
try {
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap=processMessage(list, acknowledgment,consumer); //Here I've logic which validates batch of records and gives result for all the processed records.
consumer.commitSync(offsetAndMetadataMap);
}
catch (Exception e){
}
}
public ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setAutoStartup(false);
factory.setConcurrency(consumerProperties.getConsumerThreads());
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000,2)));
if(!consumerProperties.isAutoCommit()) {
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
}
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
try {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStarpServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
return new DefaultKafkaConsumerFactory<>(properties);
}
In Kafka there is no offset commit for a particular record. It can only commit offset per partition. We had implemented this using our own implementation.