apache-kafka

Why is the kafka consumer consuming the same message hundreds of times?


I see from the logs that exact same message is consumed by the 665 times. Why does this happen?

I also see this in the logs

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies 
that the poll loop is spending too much time message processing. You can address this either by increasing the session 
timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Consumer properties

group.id=someGroupId
bootstrap.servers=kafka:9092
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
session.timeout.ms=30000
max.poll.records=20

PS: Is it possible to consume only a specific number of messages like 10 or 50 or 100 messages from the 1000 that are in the queue? I was looking at 'fetch.max.bytes' config, but it seems like it is for a message size rather than number of messages.

Thanks


Solution

  • The answer lies in the understanding of the following concepts:

    In your case, your consumer receives a message via poll() but is not able to complete the processing in max.poll.interval.ms time. Therefore, it is assumed hung by the Broker and re-balancing of partitions happen due to which this consumer loses the ownership of all partitions. It is marked dead and is no longer part of a consumer group.

    Then when your consumer completes the processing and calls poll() again two things happen:

    1. Commit fails as the consumer no longer owns the partitions.
    2. Broker identifies that the consumer is up again and therefore a re-balance is triggered and the consumer again joins the Consumer Group, start owning partitions and request messages from the Broker. Since the earlier message was not marked as committed (refer #1 above, failed commit) and is pending processing, the broker delivers the same message to consumer again.

    Consumer again takes a lot of time to process and since is unable to finish processing in less than max.poll.interval.ms, 1. and 2. keep repeating in a loop.

    To fix the problem, you can increase the max.poll.interval.ms to a large enough value based on how much time your consumer needs for processing. Then your consumer will not get marked as dead and will not receive duplicate messages. However, the real fix is to check your processing logic and try to reduce the processing time.