Recently, I began to study working with kafka. The project I'm working on uses sarama.
For reading messages I use ConsumerGroup
.
I need to read the message again after some time if foo
returns false
. How can this be done?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}
You can reset the offset of a Consumer Group to an older offset by including the following in your Consumer Group's Setup()
callback:
func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
sess.ResetOffset(topic, partition, offset, "")
return nil
}
You can also achieve the same through console:
kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--topic myTopicName \
--reset-offsets \
--to-offfset 100 \
--execute