goapache-kafkagoroutinewaitgroup

Goroutine Kafka Consumers


I currently have a program that creates a workergroup of size 1, which then calls startworker:

package main

import (
    "db_write_consumer/db"
    "db_write_consumer/worker"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
    workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
    for _, w := range workers {
        w_ := w
        worker.StartWorker(w_, []string{"test-topic"}, sigchan, mySQLClient)
    }
}

where CreateGroup is written:

func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
    consumers := []*kafka.Consumer{}
    for i := 0; i < numWorkers; i++ {
        c := NewWorker(bootstrapServers, groupId)
        consumers = append(consumers, c)
    }
    return consumers
}

and Startworker is written:

func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
    _ = c.SubscribeTopics(topics, nil)
    fmt.Println(c)
    run := true
    for run {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev, _ := c.ReadMessage(100)
            if ev == nil {
                continue
            }
            msg := &pb.Person{}
            proto.Unmarshal(ev.Value, msg)
            WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
            if ev.Headers != nil {
                fmt.Printf("%% Headers: %v\n", ev.Headers)
            }
            _, err := c.StoreMessage(ev)
            if err != nil {
                fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
                    ev.TopicPartition)
            }
        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()
}

this works fine for workergroup size 1, but every attempt to make this work for a larger workergroup size fails--all i've learned so far is that i'll want context.WithCancel(context.Background()) passed down into the worker funcs from main, but i'm lost with how to set up a waitgroup or goroutines to actually do this work


Solution

  • I understand that your question is how to manage lifetime of workers using context (instead of sigchan). Easiest way is to use signal.NotifyContext - this gives you a context which gets cancelled when one of the signals is sent. So the main would become

    func main() {
        ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
        defer stop()
    
        mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
        workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
        var wg sync.WaitGroup
        for _, w := range workers {
            w_ := w
            wg.Add(1)
            go func() {
                defer wg.Done()
                worker.StartWorker(ctx, w_, []string{"test-topic"}, mySQLClient)
            }()
        }
        wg.Wait()
    }
    

    Note also the use of WaitGroup to avoid the main exiting before all the workers finish. And StartWorker would be like

    func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
        _ = c.SubscribeTopics(topics, nil)
        fmt.Println(c)
        for {
            select {
            case <-ctx.Done:
                return
            default:
            ...