I'm playing with some code for learning purposes and I am getting a race condition on its execution when using the -race
flag and I want to understand why. The code starts a fixed set of goroutines that act as workers consuming tasks from a channel, there is no fixed number of tasks, as long as the channel receives tasks the workers must keep working.
I'm getting a race condition when calling the WaitGroup
functions. From what I understand (taking a look at the data race report) the race condition happens when the first wg.Add
call is executed by one of the spawned goroutines and the main routine calls wg.Wait
at the same time. Is that correct? If it is, it means that I must always execute calls to Add on the main routine to avoid this kind of race on the resource? But, that also would mean that I need to know how many tasks the workers will need to handle in advance, which kinds of sucks if I need that the code handles any number of tasks that may come once the workers are running...
The code:
func Test(t *testing.T) {
t.Run("", func(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func())
for i := 0; i < 5; i++ {
wID := i + 1
go func(workerID int) {
for task := range queuedTaskC {
wg.Add(1)
task()
}
}(wID)
}
taskFn := func() {
fmt.Println("executing task...")
wg.Done()
}
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
wg.Wait()
close(queuedTaskC)
fmt.Println(len(queuedTaskC))
})
}
The report:
==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
internal/race.Read()
/src/internal/race/race.go:37 +0x206
sync.(*WaitGroup).Add()
/src/sync/waitgroup.go:71 +0x219
workerpool.Test.func1.1()
/workerpool/workerpool_test.go:36 +0x64
Previous write at 0x00c0001280d8 by goroutine 8:
internal/race.Write()
/src/internal/race/race.go:41 +0x125
sync.(*WaitGroup).Wait()
/src/sync/waitgroup.go:128 +0x126
workerpool.Test.func1()
/workerpool/workerpool_test.go:57 +0x292
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 11 (running) created at:
workerpool.Test.func1()
/workerpool/workerpool_test.go:34 +0xe4
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 8 (running) created at:
testing.(*T).Run()
/src/testing/testing.go:1168 +0x5bb
workerpool.Test()
workerpool_test.go:29 +0x4c
testing.tRunner()
/src/testing/testing.go:1123 +0x202
==================
WaitGroup
implementation is based on the internal counter which is changed by Add
and Done
methods. The Wait
method will not return until the counter is zeroed. It is also possible to reuse WaitGroup
but under certain conditions described in the documentation:
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
Although your code is not reusing wg
it's able to zero the WaitGroup
counter multiple times. This happens when no tasks are being processed at a given time, which is entirely possible in concurrent code. And since your code does not wait Wait
to return before calling Add
you get the race condition error.
As everyone suggests in the comments you should abandon the idea of tracking the task with WaitGroup
in favor of controlling running goroutines. Attaching the code proposal.
func Test(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func(), 10)
for i := 0; i < 5; i++ {
wID := i + 1
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range queuedTaskC {
task()
}
}(wID)
}
for i := 0; i < 10; i++ {
queuedTaskC <- func() {
fmt.Println("executing task...")
}
}
close(queuedTaskC)
wg.Wait()
fmt.Println(len(queuedTaskC))
}