I need to run parallel tasks based on an input array and wait for all of them to finish and process their response.
My code waits for all the go routines to finish using wait group and then read the channel for all the responses. But when I am reading the responses, I am only getting half of the responses from the channel.
Is this the right way to get responses from go routines? If so what am I missing here? If not what is the right way to achieve this?
A simplified version of what I am doing:
package main
import (
"fmt"
"sync"
"time"
)
func odd(i int) (int, error) {
time.Sleep(1 * time.Second)
if i%2 == 0 {
return i, fmt.Errorf("even number")
} else {
return i, nil
}
}
func main() {
type R struct {
val int
err error
}
wg := sync.WaitGroup{}
respChan := make(chan R, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
val, err := odd(i)
r := R{val: val, err: err}
respChan <- r
fmt.Printf("Queued Response: %d , size: %d \n", r.val, len(respChan))
}(i)
}
wg.Wait()
fmt.Println("Done Waiting")
fmt.Println("Response Channel Length: ", len(respChan))
for i := 0; i < len(respChan); i++ {
r := <-respChan
if r.err != nil {
fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
} else {
fmt.Printf("[%d] : %d\n", i, r.val)
}
}
fmt.Println("Finished")
}
Output :
Queued Response: 5 , size: 1
Queued Response: 0 , size: 2
Queued Response: 2 , size: 3
Queued Response: 9 , size: 7
Queued Response: 3 , size: 5
Queued Response: 4 , size: 6
Queued Response: 1 , size: 4
Queued Response: 7 , size: 8
Queued Response: 6 , size: 9
Queued Response: 8 , size: 10
Done Waiting
Response Channel Length: 10
[0] : 5
[1] : 0 , even number
[2] : 2 , even number
[3] : 1
[4] : 3
Finished
Playground link : https://go.dev/play/p/x9a3zL3MspR
Option 1: Collect the results in a slice. Eliminate the channel. The approach ensures that the results are processed in order.
type R struct {
val int
err error
}
wg := sync.WaitGroup{}
resp := make([]R, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
val, err := odd(i)
resp[i] = R{val: val, err: err}
}(i)
}
wg.Wait()
fmt.Println("Done Waiting")
for i, r := range resp {
if r.err != nil {
fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
} else {
fmt.Printf("[%d] : %d\n", i, r.val)
}
}
fmt.Println("Finished")
https://go.dev/play/p/o_mHuU0myyS
Option 2: Drop the wait group. Receive N results from the channel where N is the number of goroutines.
type R struct {
val int
err error
}
const N = 10
respChan := make(chan R)
for i := 0; i < N; i++ {
go func(i int) {
val, err := odd(i)
r := R{val: val, err: err}
respChan <- r
fmt.Printf("Queued Response: %d , size: %d \n", r.val, len(respChan))
}(i)
}
for i := 0; i < N; i++ {
r := <-respChan
if r.err != nil {
fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
} else {
fmt.Printf("[%d] : %d\n", i, r.val)
}
}
fmt.Println("Finished")