To illustrate better my question, here is an example of the code with comments:
(Obs: I'm using golang v1.20.5)
// start to consume a queue
// deliveries is an unbuffered receiver go channel of the type <-chan Message
deliveries, err := qb.pubSubSubscriber.Consume(ctx, qName)
if err != nil {
return err
}
// infinite loop to consume the messages
for msg := range deliveries {
// and for every msg I execute a function
result := myFunc()
}
The idea here is to consume the messages like a pool of n workers that would get a message if one the worker is free.
To be more clear, the example bellow is not a valid solution:
// for the workerPool is this situation i would use the
// tunny worker pool go package
workerPool := newWorkerPoolWithNWorkers()
for msg := range deliveries {
go func(){
result:=workerPool(myFunc)
}()
}
This is not valid, because the way I see, what this code does is to fetch every message at once and let the workerPool do its job with n workers at a time, but the question is, how get a new message for every "free" worker in an infinite loop?
So lets say we have a queue with 100 messages, the wanted solution is to fetch 3 messages at first, but when one of the fetched messages was processed, the code get another new message in and infinite for loop.
I was trying to do something like
wg := new(sync.WaitGroup)
counter := 0
for msg := range deliveries {
wg.Wait()
go func(){
counter ++
if counter == n { // n could be any integer number wanted to limit the pool size
//this way a new message would be at wg.Wait() if all n goroutines are busy
wg.Add(1)
}
result:= myFunc()
count--
wg.Done()// one of the N "workers" is free, so we can ask for one more message
}()
}
But it seems too complicated, and i dont think it works.
If someone could help me I'll be very grateful!
I think you are overthinking this a bit. To consume messages from a channel using a worker pool, you can:
for i:=0;i<nWorkers;i++ {
go func() {
for msg:=range deliveries {
myFunc(msg)
}
}()
}
In other words, you create n
goroutines all listening from the same channel. The runtime deals with scheduling which goroutine receives it. When the deliveries
channel is closed, all workers terminate.