We're using consumer kafka client 0.10.2.0 with the following configuration:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
So as you can see we're using autocommit. The consumer API version that we're using has a dedicated thread for doing autocommit. So every one second we have an autocommit which means that we have an heartbeat every one second.
Our application processing time may actually take(from time to time) more than 40 seconds (the request time out interval)
What I wanted to ask is:
1 - if the processing time will take , for example , a minute . will there be a rebalance although there is the autocommit heartbean every second?
2 - What more weird is that in case of long execution time it seems that we're getting the same message more than once. Is it normal? If the consumer has committed an offset , why the rebalance make the same offset being used again?
Thanks, Orel
You can use KafkaConsumer.pause()
/ KafkaConsumer.resume()
to prevent consumer re-balancing during long processing pauses. JavaDocs. Take a look at this question.
Re.2. Are you sure that these offsets are commited?