gosaramaaws-mskamazon-msk

Sarama unable to produce message for Amazon MSK version 2.3.1


I was using sarama golang library for pushing the messages to Amazon MSK. Till now I was using msk version 2.2.1 my code was working fine, But now the msk version has been changed to 2.3.1. Now, I am unable to push the message to the Topic.

Error:

Partition -1

Offset -1

Request was for a topic or partition that does not exist on this broker.

Code:

func getKafkaEventClient() (sarama.Client, error) {

    if !setupDone {
        return nil, errors.New("Invalid setup")
    }

    if kafkaEventClient != nil {
        return kafkaEventClient, nil
    }

    err := initKafkaEventClient()
    if err != nil {
        return nil, err
    }

    return kafkaEventClient, nil
}

func initKafkaEventClient() (err error) {
      config := sarama.NewConfig()
      config.Net.TLS.Enable = false
      config.Producer.Return.Successes = true
      config.Version = sarama.V0_10_0_0

      brokers := strings.Split(kafkaEventHost, ",") //split the host into brokers

      kafkaEventClient, err = sarama.NewClient(brokers, config)
      if err != nil {
         log.Println("initKafkaClient: failed to create new kafka client", err)
         return
      }
}

func PushMessageToKafka(message string) {
    client, err := getKafkaEventClient()
    if err != nil {
        return
    }

    producer, err := sarama.NewSyncProducerFromClient(kafkaEventClient)
    if err != nil {
    fmt.Println("PushMessageToKafka: failed to get producer", err)
    return
    }
    var msg sarama.ProducerMessage
    msg.Topic = "some_topic"
    msg.Value = sarama.StringEncoder("some_message")
    p, o, err := producer.SendMessage(&msg)

    fmt.Println("Partition", p)
    fmt.Println("Offset", o)

    if err != nil {
        fmt.Println("PushMessageToKafka: failed to push message to be displayed", err)
     }
}

I have changed the sarama version to maxVersion also config.Version = sarama.MaxVersion, but it is not working for Amazon MSK 2.3.1.

Please provide some solution.


Solution

  • I found the solution after debugging so many times. It was not the version problem, in fact, the code which returns the client

    func getKafkaEventClient() (sarama.Client, error) {
    
        if !setupDone {
            return nil, errors.New("Invalid setup")
        }
    
        if kafkaEventClient != nil {
            return kafkaEventClient, nil
        }
    
        err := initKafkaEventClient()
        if err != nil {
            return nil, err
        }
    
        return kafkaEventClient, nil
    }
    

    Here if kafkaEventClient != nil then return the previous client which is wrong. For each client, if the broker/host is changing, then we have to create a new client and that client will be able to find the topic in which we want to push our message. If we are getting the old client and pushing the message to the topic which exists in different broker/host then we will get the error as I mentioned above.

    Error:

    Partition -1

    Offset -1

    Request was for a topic or partition that does not exist on this broker.

    I hope it solves someone's problem who is facing the same issue.