About the program: I am trying to implement pipeline pattern via my own first-class type intJob
. The main func, which aggregates the pipelines is ExecutePipeline2
and as I can tell, exactly it causing problems.
Why am I keep getting deadlocks? To me I seem to close all channels, that are used by readers, after every goroutine. Also creating channels with buffer doesn't help, so I've completely ran out of ideas and will be really grateful for your help.
IMPORTANT: I can't change the main
function and implement this idea from others functions only, when the base (mentioned function) remains constant and unchanged.
type intJob func(a, b chan int)
func ExecutePipeline2(jobs ...intJob) {
outs := make([]chan int, len(jobs)+1)
wg := sync.WaitGroup{}
for i := 0; i < len(outs); i++ {
outs[i] = make(chan int)
}
for i, job := range jobs {
job := job
in, out := outs[i], outs[i+1]
i := i
wg.Add(1)
go func() {
job(in, out)
fmt.Printf("job %d closed\n", i)
close(out)
wg.Done()
}()
}
wg.Wait()
}
func pipe(_, b chan int) {
for i := 0; i < 5; i++ {
b <- i
}
}
func main() {
inputData := []int{0, 1, 1, 2, 3, 5, 8}
hashSignJobs := []intJob{
intJob(func(in, out chan int) {
for _, fibNum := range inputData {
out <- fibNum
}
}),
intJob(pipe),
intJob(func(in, out chan int) {
for val := range in {
fmt.Println(val)
}
}),
}
ExecutePipeline2(hashSignJobs...)
}
I think the key issue is with your second pipeline pipe()
, which instead of reading from the output of the previous pipeline, just started generating numbers on a loop. It should have been written as below to read from in
as
func pipe(in, out chan int) {
for i := range in {
out <- i
}
}
Also you could rewrite the execute function to something like below. Full example at https://go.dev/play/p/K_-UFzz0zt5
func ExecutePipeline2(jobs ...intJob) {
outs := make([]chan int, len(jobs)+1)
for i := 0; i < len(outs); i++ {
outs[i] = make(chan int)
}
var wg sync.WaitGroup
for i, job := range jobs {
wg.Add(1)
go func(i int, job intJob) {
defer wg.Done()
// Close the output channel when done
defer close(outs[i+1])
job(outs[i], outs[i+1])
}(i, job)
}
// Close the first channel after all input is written
close(outs[0])
wg.Wait()
}