goapache-kafkakafka-consumer-apisarama

Sarama Kafka consumergroup function return


I am very new to Go Lang and attempting to make some adjustments to an open source library that consumes messages from Kafka using the Sarama library. The original code can be found here.

The original package implements a PartitionConsumer that works just fine if one doesn't need read consistency across multiple consumers consuming the same topic, however, that does not work for me.

I have done some work within the same application to implement the sarama NewConsumerGroup package using some examples I have found online.

Below is the code I currently have running:

package main

import (
    "context"
    // "flag"
    "os"
    "os/signal"
    "sync"
    "syscall"

    "encoding/json"
    "log"
    "strings"

    "github.com/Shopify/sarama"
    // "github.com/Shopify/sarama/mocks"
)

// KafkaInput is used for recieving Kafka messages and
// transforming them into HTTP payloads.
type KafkaInput struct {
    config    *KafkaConfig
    // consumers []sarama.PartitionConsumer
    messages  chan *sarama.ConsumerMessage
}

var (
    brokers  = ""
    version  = ""
    group    = ""
    topics   = ""
    assignor = ""
    oldest   = true
    verbose  = false
)

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready chan bool
}

// NewKafkaInput creates instance of kafka consumer client.
func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {
    /**
     * Construct a new Sarama configuration.
     * The Kafka cluster version has to be defined before the consumer/producer is initialized.
     */
    c := sarama.NewConfig()
    // Configuration options go here

    log.Println("Starting a new Sarama consumer")

    if verbose {
        sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
    }

    version, err := sarama.ParseKafkaVersion("2.1.1")
    if err != nil {
        log.Panicf("Error parsing Kafka version: %v", err)
    }

    c.Version = version

    if oldest {
        c.Consumer.Offsets.Initial = sarama.OffsetOldest
    }

    /**
     * Setup a new Sarama consumer group
     */
    consumer := Consumer{ready: make(chan bool)}

    ctx, cancel := context.WithCancel(context.Background())
    client, err := sarama.NewConsumerGroup(strings.Split(config.host, ","), config.group, c)

    if err != nil {
        log.Panicf("Error creating consumer group client: %v", err)
    }

    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            if err := client.Consume(ctx, []string{config.topic}, &consumer); err != nil {
                log.Panicf("Error from consumer: %v", err)
            }

            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                return
            }

            consumer.ready = make(chan bool)
        }

    }()

    <-consumer.ready // Await till the consumer has been set up
    log.Println("Sarama consumer up and running!...")

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        log.Println("terminating: context cancelled")
    case <-sigterm:
        log.Println("terminating: via signal")
    }
    cancel()
    wg.Wait()
    if err = client.Close(); err != nil {
        log.Panicf("Error closing client: %v", err)
    }

    i := &KafkaInput{
        config: config,
        // consumers: make([]sarama.PartitionConsumer, len(partitions)),
        // messages:  make(chan *sarama.ConsumerMessage, 256),
        messages: make(chan *sarama.ConsumerMessage, 256),
    }


    return i
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
    for message := range claim.Messages() {

        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        session.MarkMessage(message, "")
    }

    return nil
}

// ErrorHandler should receive errors
func (i *KafkaInput) ErrorHandler(consumer sarama.PartitionConsumer) {
    for err := range consumer.Errors() {
        log.Println("Failed to read access log entry:", err)
    }
}

// Read Comment

func (i *KafkaInput) Read(data []byte) (int, error) {
    message := <-i.messages

    if !i.config.useJSON {
        copy(data, message.Value)
        return len(message.Value), nil
    }

    var kafkaMessage KafkaMessage
    json.Unmarshal(message.Value, &kafkaMessage)

    buf, err := kafkaMessage.Dump()
    if err != nil {
        log.Println("Failed to decode access log entry:", err)
        return 0, err
    }

    copy(data, buf)

    return len(buf), nil

}

func (i *KafkaInput) String() string {
    return "Kafka Input: " + i.config.host + "/" + i.config.topic
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

The KafkaConfig carries the groupID and Topic for the consumer. When I run this program the consumer fires up and reads from the proper topic using the correct group and prints it to the STDOUT using the ConsumerClaim created in this function:

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    for message := range claim.Messages() {

        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        session.MarkMessage(message, "")
    }

    return nil
}

What I believe I need however is for the NewKafkaInput function to return *KafkaInput with the messages from the claim added to the struct (forgive me if I am using the wrong terminology here, this is my first Go rodeo).

... 
i := &KafkaInput{
        config: config,
        // consumers: make([]sarama.PartitionConsumer, len(partitions)),
        // messages:  make(chan *sarama.ConsumerMessage, 256),
        messages: make(chan *sarama.ConsumerMessage, 256),
    }


    return i
}

In the original example that is done here:

func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {

    ...

    go func(consumer sarama.PartitionConsumer) {
                defer consumer.Close()

                for message := range consumer.Messages() {
                    i.messages <- message
                }
            }(consumer)

    ...

}

I have spent days toying around with moving functions in and out of the NewKafakInput function, attempting to add messages to the KafakInput struct outside the function and everything in between. I just can't get it to work. The NewKafakInput function needs to return the *KafkaInput with any messages so that this function can complete:

func (i *KafkaInput) Read(data []byte) (int, error) {
    message := <-i.messages

    if !i.config.useJSON {
        copy(data, message.Value)
        return len(message.Value), nil
    }

    var kafkaMessage KafkaMessage
    json.Unmarshal(message.Value, &kafkaMessage)

    buf, err := kafkaMessage.Dump()
    if err != nil {
        log.Println("Failed to decode access log entry:", err)
        return 0, err
    }

    copy(data, buf)

    return len(buf), nil

}

Its entirely possible I have made a complete mess of this thing as well, but any help and input is appreciated.

Thanks


Solution

  • Here is the solution to my problem. I had goroutines blocking the main function(s) and they needed to be broken out. If the code below doesn't make any sense, here is a link to the program I was modifying: https://github.com/buger/goreplay. If I can get a response from the owner I plan on cleaning up the code and submitting a pull request, or possibly publishing a fork.

    package main
    
    import (
        "context"
        "encoding/json"
        "strings"
    
        "os"
    
        "log"
    
        "github.com/Shopify/sarama"
    )
    
    // KafkaInput is used for recieving Kafka messages and
    // transforming them into HTTP payloads.
    type KafkaInput struct {
        sarama.ConsumerGroup
        config   *KafkaConfig
        consumer Consumer
        messages chan *sarama.ConsumerMessage
    }
    
    // Consumer represents a Sarama consumer group consumer
    type Consumer struct {
        ready    chan bool
        messages chan *sarama.ConsumerMessage
    }
    
    var (
        brokers  = ""
        version  = ""
        group    = ""
        topics   = ""
        assignor = ""
        oldest   = true
        verbose  = false
    )
    
    // NewKafkaInput creates instance of kafka consumer client.
    func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {
        /**
         * Construct a new Sarama configuration.
         * The Kafka cluster version has to be defined before the consumer/producer is initialized.
         */
        c := sarama.NewConfig()
        // Configuration options go here
    
        log.Printf("KafkaConfig: %s", config.host)
        log.Printf("KafkaConfig: %s", config.group)
        log.Printf("KafkaConfig: %s", config.topic)
    
        log.Println("Starting a new Sarama consumer")
    
        if verbose {
            sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
        }
    
        version, err := sarama.ParseKafkaVersion("2.1.1")
        if err != nil {
            log.Panicf("Error parsing Kafka version: %v", err)
        }
    
        c.Version = version
    
        if oldest {
            c.Consumer.Offsets.Initial = sarama.OffsetOldest
        }
    
        group, err := sarama.NewConsumerGroup(strings.Split(config.host, ","), config.group, c)
    
        /**
         * Setup a new Sarama consumer group
         */
        consumer := Consumer{
            ready:    make(chan bool),
            messages: make(chan *sarama.ConsumerMessage, 256),
        }
    
        i := &KafkaInput{
            ConsumerGroup: group,
            config:        config,
            messages:      make(chan *sarama.ConsumerMessage, 256),
            consumer:      consumer,
        }
    
        go i.loop([]string{config.topic})
        i.messages = consumer.messages
        return i
    }
    
    //ConsumeClaim and stuff
    func (i *KafkaInput) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error {
        for msg := range c.Messages() {
            s.MarkMessage(msg, "")
            i.Push(msg)
        }
        return nil
    }
    
    func (i *KafkaInput) loop(topic []string) {
        ctx := context.Background()
        for {
            if err := i.Consume(ctx, []string{i.config.topic}, i); err != nil {
                return
            }
        }
    }
    
    // Push Messages
    func (i *KafkaInput) Push(m *sarama.ConsumerMessage) {
        if i.consumer.messages != nil {
            log.Printf("MSGPUSH: %s", m)
            i.consumer.messages <- m
        }
    }
    
    func (i *KafkaInput) Read(data []byte) (int, error) {
    
        message := <-i.messages
        log.Printf("Msg: %s", string(message.Value))
        if !i.config.useJSON {
            copy(data, message.Value)
            return len(message.Value), nil
        }
    
        var kafkaMessage KafkaMessage
        json.Unmarshal(message.Value, &kafkaMessage)
    
        buf, err := kafkaMessage.Dump()
        if err != nil {
            log.Println("Failed to decode access log entry:", err)
            return 0, err
        }
    
        copy(data, buf)
    
        return len(buf), nil
    
    }
    
    func (i *KafkaInput) String() string {
        return "Kafka Input: " + i.config.host + "/" + i.config.topic
    }
    
    // Setup is run at the beginning of a new session, before ConsumeClaim
    func (i *KafkaInput) Setup(s sarama.ConsumerGroupSession) error {
        return nil
    }
    
    // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
    func (i *KafkaInput) Cleanup(s sarama.ConsumerGroupSession) error {
        return nil
    }