goapache-kafkakafka-consumer-apisarama

Possible Reasons of Reconsuming Kafka Messages


Yesterday I found from log the kafka was reconsuming some messages after the Kafka group coordinator initiated a group rebalance. These messages had been consumed two days ago (confirmed from log).

There were two other rebalancing reported in the log, but they didn't reconsume messages anymore. So why the first time reblancing would cause reconsuming messages? What were the problems?

I am using the golang kafka client. here are the code

config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest 

and we are handling messges before claiming messages, so seems we are using the Send At Least Once strategy for kafka. We have three brokers in one machine, and only one consumer thread (go routine) in the other machine.

Any explanations for this phoenomenon? I think the messages must have been committed, coz they were consumed two days ago, or why would kafka keep offsets for more than two days without committing?

Consuming Code sample:

func (consumer *Consumer) ConsumeClaim(session 
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for message := range claim.Messages() {
    realHanlder(message)   // consumed data here
    session.MarkMessage(message, "") // mark offset
}

return nil
}

Added:

  1. Rebalancing happened after app restarted. There were two other restarts which didn't cuase reconnsume

  2. configs of kafka

    log.retention.check.interval.ms=300000
    log.retention.hours=168
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    delete.topic.enable = true
    auto.create.topics.enable=false


Solution

  • By reading the source code of both golang saram client and kafka server, finally I found the reason as below

    1. Consumer group offset retention time is 24hours, which is a default setting by kafka, while log retention is 7days explicitly set by us.

    2. My server app is running in test environment where few people can visit, which means there may be few messages produced by kafka producer, and then the consumer group has few messages to consumes, thus the consumer may not commit any offset for long time.

    3. When the consume offset is not updated for more than 24hours, due to the offset config, the kafka broker/coordinator will remove the consume offset from partitions. Next time the saram queries from kafka broker where the offset is, of course client gets nothing. Notice we are using sarama.OffsetOldest as initial value, then sarama client will consume messages from the start of messages kept by kafka broker, which results in messages reconsuming, and this is likely to happen because log retention is 7days