I currently have a program that creates a workergroup of size 1, which then calls startworker:
package main
import (
"db_write_consumer/db"
"db_write_consumer/worker"
"os"
"os/signal"
"syscall"
)
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
for _, w := range workers {
w_ := w
worker.StartWorker(w_, []string{"test-topic"}, sigchan, mySQLClient)
}
}
where CreateGroup is written:
func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
consumers := []*kafka.Consumer{}
for i := 0; i < numWorkers; i++ {
c := NewWorker(bootstrapServers, groupId)
consumers = append(consumers, c)
}
return consumers
}
and Startworker is written:
func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
_ = c.SubscribeTopics(topics, nil)
fmt.Println(c)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev, _ := c.ReadMessage(100)
if ev == nil {
continue
}
msg := &pb.Person{}
proto.Unmarshal(ev.Value, msg)
WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
if ev.Headers != nil {
fmt.Printf("%% Headers: %v\n", ev.Headers)
}
_, err := c.StoreMessage(ev)
if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
ev.TopicPartition)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
this works fine for workergroup size 1, but every attempt to make this work for a larger workergroup size fails--all i've learned so far is that i'll want context.WithCancel(context.Background())
passed down into the worker funcs from main, but i'm lost with how to set up a waitgroup or goroutines to actually do this work
I understand that your question is how to manage lifetime of workers using context (instead of sigchan
). Easiest way is to use signal.NotifyContext - this gives you a context which gets cancelled when one of the signals is sent. So the main would become
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
var wg sync.WaitGroup
for _, w := range workers {
w_ := w
wg.Add(1)
go func() {
defer wg.Done()
worker.StartWorker(ctx, w_, []string{"test-topic"}, mySQLClient)
}()
}
wg.Wait()
}
Note also the use of WaitGroup to avoid the main
exiting before all the workers finish. And StartWorker
would be like
func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
_ = c.SubscribeTopics(topics, nil)
fmt.Println(c)
for {
select {
case <-ctx.Done:
return
default:
...