goconcurrencydeadlock

Deadlock while implementing a pipeline concurrency pattern


About the program: I am trying to implement pipeline pattern via my own first-class type intJob. The main func, which aggregates the pipelines is ExecutePipeline2 and as I can tell, exactly it causing problems.

Why am I keep getting deadlocks? To me I seem to close all channels, that are used by readers, after every goroutine. Also creating channels with buffer doesn't help, so I've completely ran out of ideas and will be really grateful for your help.

IMPORTANT: I can't change the main function and implement this idea from others functions only, when the base (mentioned function) remains constant and unchanged.

type intJob func(a, b chan int)

func ExecutePipeline2(jobs ...intJob) {
    outs := make([]chan int, len(jobs)+1)
    wg := sync.WaitGroup{}

    for i := 0; i < len(outs); i++ {
        outs[i] = make(chan int)
    }

    for i, job := range jobs {
        job := job
        in, out := outs[i], outs[i+1]
        i := i
        wg.Add(1)
        go func() {
            job(in, out)
            fmt.Printf("job %d closed\n", i)
            close(out)
            wg.Done()
        }()
    }

    wg.Wait()
}

func pipe(_, b chan int) {
    for i := 0; i < 5; i++ {
        b <- i
    }
}

func main() {
    inputData := []int{0, 1, 1, 2, 3, 5, 8}

    hashSignJobs := []intJob{
        intJob(func(in, out chan int) {
            for _, fibNum := range inputData {
                out <- fibNum
            }
        }),
        intJob(pipe),
        intJob(func(in, out chan int) {
            for val := range in {
                fmt.Println(val)
            }
        }),
    }

    ExecutePipeline2(hashSignJobs...)
}


Solution

  • I think the key issue is with your second pipeline pipe(), which instead of reading from the output of the previous pipeline, just started generating numbers on a loop. It should have been written as below to read from in as

    func pipe(in, out chan int) {
        for i := range in { 
            out <- i
        }
    }
    

    Also you could rewrite the execute function to something like below. Full example at https://go.dev/play/p/K_-UFzz0zt5

    func ExecutePipeline2(jobs ...intJob) {
        outs := make([]chan int, len(jobs)+1)
        for i := 0; i < len(outs); i++ {
            outs[i] = make(chan int)
        }
    
        var wg sync.WaitGroup
    
        for i, job := range jobs {
            wg.Add(1)
            go func(i int, job intJob) {
                defer wg.Done()
                // Close the output channel when done
                defer close(outs[i+1]) 
                job(outs[i], outs[i+1])
            }(i, job)
        }
    
        // Close the first channel after all input is written
        close(outs[0])
    
        wg.Wait()
    }