multithreadinggogoroutinewaitgroup

Safe way to terminate an endlessly looping goroutine?


I have a goroutine that is acting as a listener. A stream of input is coming in on a certain buffered channel, and I want my goroutine to process data as it comes in on that channel. However, there may come a time where no data comes in on that channel for awhile. If the channel has nothing to offer for one second, I want my goroutine to do something different. The function looks like this:

func main () {
    var wg sync.WaitGroup
    arr := make([]*myObject, 0)
    wg.Add(1)
    go listener(c, arr, &wg)
    for {
        // sending stuff to c
    }
}

func listener(c chan *myObject, arr []*myObject, wg *sync.WaitGroup) {
    for {
        select {
        case value := <- c:
            arr = append(arr, value)
        case <- time.After(1 * time.Second):
            fmt.Println(arr)
        }
}

The problem is that I want to see everything that went through the channel printed out. If main finishes suddenly, there might be some stuff left in arr that hasn't been printed out yet, and I won't see it. So I need to make sure that this goroutine is finished processing all the data in the channel before the program finishes. That means, I think, that I need to use a WaitGroup and use Wait() to ensure that the program doesn't close before my goroutine is done doing what it needs to do. But I can't figure out where to call Done() on my WaitGroup.

Basically, I need a safe way to "pause" the goroutine at the end, before the program finishes, and print out whatever is left. How can I do that?

Even worse, for things like unit tests, I send the data to the channel myself, and after I've sent a certain amount, I want to see the array. But if I just check the array immediately after the code to send the data to the channel, the goroutine may not have had a chance to process all the data yet. In that case, I want to wait for the goroutine to process all the data I've sent, and then I want to pause it and ask it to show me the array. But how do I know when the goroutine is finished processing? I could sleep for awhile and give it a chance to finish, and then stop it and have a look, but that feels rather hackish. I'm thinking there's a best-practice way to handle this, but I'm not figuring it out.

Here are some ideas I had, zero of which work.

  1. Call Done() outside of the infinite for loop. This doesn't work, because that code, as far as I know, is unreachable.

  2. Call Done() in the timeout case. This doesn't work, because there may be more data on the way after a timeout, which I want my goroutine to keep listening for.


Solution

  • Modify the listener to return when the channel is closed. Call wg.Done() on return:

    func listener(c chan *myObject, arr []*myObject, wg *sync.WaitGroup) {
        defer wg.Done()
        for {
            select {
            case value, ok := <- c:
                if !ok { return }
                arr = append(arr, value)
            case <- time.After(1 * time.Second):
                fmt.Println(arr)
            }
    }
    

    Modify the main to close the channel when done sending. Wait for the goroutine to complete before returning from main.

    var wg sync.WaitGroup
    arr := make([]*myObject, 0)
    wg.Add(1)
    go listener(c, arr, &wg)
    for {
        // sending stuff to c
    }
    close(c)
    wg.Wait()