gogoroutine

How to wait for in process operations before canceling go routine


I have a message consumer (eg: Kafka) that runs on a go routine with a for-select with a default case where it processes the received message:

type Consumer struct{}

func (c *Consumer) Start(ctx context.Context) {
    fmt.Println("Consumer is starting")
    defer func() {
        fmt.Println("Consumer is stopping")
    }()

    i := 0
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Context is done")
            return
        default:
            // msg, err := kafka.FetchMessage(ctx)
            fmt.Printf("[%d] Received message\n", i)
            <-time.After(3 * time.Second)
            fmt.Printf("[%d] Stopped processing\n", i)
            i++
        }
    }
}

In my main function, I have a channel that listen for cancelation signals. Whenever the cancel signal is received, I want to cancel the go routine by calling the context cancel function.

func main() {
    fmt.Println("Main Start")
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    consumer := &Consumer{}
    go func() {
        consumer.Start(ctx)
    }()

    <-sigchan
    fmt.Println("Received signal to shutdown")
    cancel()
}

The requisites of the system are:

However, when I cancel the program by clicking CTRL + C, if a message is in processing the processing is not completed. How can I implement a feature that will cancel the consumer only after the last message is processed.

Go Playgroud: https://go.dev/play/p/kRUPPhwxwn7


Solution

  • The reason it exits immediately is because when main() returns, all goroutines abruptly stopped. There's nothing waiting until consumer.Start is finished.

    Using signal.NotifyContext is a bit more straight-forward, but we can accomplish it your way too.

    // Your way, modified:
    func main() {
        fmt.Println("Main Start")
        sigchan := make(chan os.Signal, 1)
        signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
    
        go func() {
            <-sigchan
            fmt.Println("Received signal to shutdown")
            cancel()
        }()
    
        // Block until it exists, cancel in the background
        consumer := &Consumer{}
        consumer.Start(ctx)
    }
    
    // Allowing signal.NotifyContext to deal with it
    func main() {
        fmt.Println("Main Start")
        ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
        defer cancel()
    
        consumer := &Consumer{}
        consumer.Start(ctx)
        fmt.Println("Shutting down")
    }
    

    Also, note that CTRL+C is not exactly equivalent to kill -INT $PID. The former will send SIGINT to every process in the foreground proc's PGID. This means forks may get killed before the main process can do it gracefully.