I'm using sarama-cluster
lib to create a kafka group consumer, in a backend service. This example code from godoc works:
for {
if msg, ok := <-consumer.Messages(); ok {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}
since it's a dead loop, I put it in a goroutine to avoid blocking other activities, then it can no longer consume any message:
go func() {
for msg := range consumer.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}()
(service is running, so this goroutine is not terminated. It just fails to consume)
Any idea how to solve this?
sarama-cluster
is in unmaintained for quite some time already with the following notice:
Please note that since https://github.com/Shopify/sarama/pull/1099 was merged and released (>= v1.19.0) this library is officially deprecated. The native implementation supports a variety of use cases that are not available through this library.
I suggesty you to use github.com/Shopify/sarama instead. It has all the functionalities of sarama-cluster
and it's actively maintained.
You can follow a simple consumer group example from their repository.