I'm using the v2 of the lib https://github.com/confluentinc/confluent-kafka-go to consume from my topic from multiple partitions.
when I start my consumer, looks fine, but for business reasons, I need to stop the consumer for a while during a specific period of time, but seems that my Pause method is not working.
I suspect it's because of the PartitionAny, but not sure about that.
This is my adapter:
const (
saslMechanismSha512 = "SCRAM-SHA-512"
sessionTimeoutInMs = 180000 // => 3 minutes
)
type KafkaAdapter struct {
Consumer *kafka.Consumer
}
func NewKafkaAdapter(ctx context.Context, consumer *kafka.Consumer, topic string) (*KafkaAdapter, error) {
err := consumer.Subscribe(topic, nil)
if err != nil {
return nil, fmt.Errorf("error subscribing to topic %s: %v", topic, err)
}
return &KafkaAdapter{
Consumer: consumer,
}, nil
}
func (k *KafkaAdapter) Consume(ctx context.Context) (*port.Message, error) {
select {
case <-ctx.Done():
return nil, context.Canceled
default:
message, err := k.Consumer.ReadMessage(-1) // -1 keeps undefined timeout while seeking for new messages
if err != nil {
return nil, err
}
headers := getMessageHeaders(message.Headers)
streamName := getStreamName(headers)
return &port.Message{
Value: message.Value,
Key: message.Key,
Headers: headers,
Stream: streamName,
Timestamp: message.Timestamp,
Offset: int64(message.TopicPartition.Offset),
}, nil
}
}
func (k *KafkaAdapter) CommitMessage(ctx context.Context) error {
_, err := k.Consumer.Commit()
return err
}
func (k *KafkaAdapter) Unsubscribe(ctx context.Context) {
k.Consumer.Unsubscribe()
}
func SetupKafkaConsumer(ctx context.Context, topic item.Topic) (*kafka.Consumer, error) {
consumerConfig := &kafka.ConfigMap{
"bootstrap.servers": strings.Join(topic.Endpoints, ","),
"group.id": topic.Name,
"session.timeout.ms": sessionTimeoutInMs,
"enable.auto.commit": false,
}
if topic.User != "" && topic.Password != "" {
consumerConfig.SetKey("sasl.username", topic.User)
consumerConfig.SetKey("sasl.password", topic.Password)
consumerConfig.SetKey("security.protocol", "SASL_SSL")
consumerConfig.SetKey("sasl.mechanism", saslMechanismSha512)
consumerConfig.SetKey("sasl.mechanisms", saslMechanismSha512)
}
consumer, err := kafka.NewConsumer(consumerConfig)
if err != nil {
log.Fatalf("error creating Kafka consumer: %v", err)
return nil, err
}
return consumer, nil
}
func getMessageHeaders(messageHeaders []kafka.Header) []port.MessageHeader {
var headers []port.MessageHeader
for _, kafkaHeader := range messageHeaders {
header := port.MessageHeader{
Key: string(kafkaHeader.Key),
Value: kafkaHeader.Value,
}
headers = append(headers, header)
}
return headers
}
func getStreamName(headers []port.MessageHeader) string {
var streamName string
for _, header := range headers {
if header.Key == "sn" {
streamName = string(header.Value)
break
}
}
return streamName
}
And this is my main.go file:
const (
saslMechanismSha512 = "SCRAM-SHA-512"
)
var (
topicExample = "topic-example"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
if err != nil {
log.Fatalf("Error getting configurations: %v", err)
}
topicConfiguration := item.ReadTopic(appConfig, topicExample, false) // false = will get reader configuration
consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
if err != nil {
log.Fatalf("error creating Kafka consumer: %v", err)
}
defer consumer.Close()
topicPartition := kafka.TopicPartition{
Topic: &topicExample,
Offset: kafka.OffsetStored,
Partition: kafka.PartitionAny,
}
err = consumer.Assign([]kafka.TopicPartition{topicPartition})
if err != nil {
panic(fmt.Sprintf("error assigning topic/partitions: %v", err))
}
kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, topicExample)
if err != nil {
log.Fatalf("error creating Kafka adapter: %v", err)
}
repo, err := bootstrap.NewRepositories(appConfig)
if err != nil {
log.Fatalf("error creating a repository: %v", err)
}
dataManager := models.NewGormDataManager(repo.Db)
messageService := service.NewMessageService(kafkaReader, dataManager)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
go func() {
<-signalChannel
cancel()
}()
c := cron.New()
c.AddFunc("22 18 * * *", func() {
// I need to pause during a specific time
consumer.Pause([]kafka.TopicPartition{topicPartition})
})
c.AddFunc("28 18 * * *", func() {
// And then, resume it when needed
consumer.Resume([]kafka.TopicPartition{topicPartition})
})
c.Start()
messageService.StartConsuming(ctx, topicExample)
<-ctx.Done()
}
I'm using consumer.Pause([]kafka.TopicPartition{topicPartition})
, but with no effect.
And I'm algo not sure if my consumer is connecting to all partitions or to only one.
I figured out a way to make it work.
I changed the Assign
method by the method Subscribe
, this way I dont have to get worried about kafka partition balancing.
And also, instead of using method .Pause
and .Resume
, the Subscribe
and Unsubscribe
methods worked fine for me.
Something like this:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
if err != nil {
log.Fatalf("Error getting configurations: %v", err)
}
topicConfiguration := item.ReadTopic(appConfig, someTopic, false) // false = will get reader configuration
consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
if err != nil {
log.Fatalf("error creating Kafka consumer: %v", err)
}
defer consumer.Close()
kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, someTopic)
if err != nil {
log.Fatalf("error creating Kafka adapter: %v", err)
}
repo, err := bootstrap.NewRepositories(appConfig)
if err != nil {
log.Fatalf("error creating a repository for my-topic flow: %v", err)
}
dataManager := models.NewGormDataManager(repo.Db)
messageService := service.NewMessageService(kafkaReader, dataManager)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
go func() {
<-signalChannel
cancel()
}()
c := cron.New()
c.AddFunc("59 23 * * *", func() {
messageService.Subscribe(ctx, someTopic)
})
c.AddFunc("30 2 * * *", func() {
messageService.Unsubscribe(ctx)
})
c.Start()
messageService.StartConsuming(ctx, someTopic)
<-ctx.Done()
}