goconcurrencychannel

Batching channel reads with error handling in Go


Consider the following Go program:

func concurrentProducer() (<-chan int, <-chan error) {
    prodCh, errCh := make(chan int, 5), make(chan error, 1)

    go func() {
        defer close(prodCh)
        for _ = range 13 {
            prodCh <- rand.Int()
        }
    }()

    return prodCh, errCh
}

func batchConsumer(batch []int) {
    fmt.Println(batch)
}

func batchCollector(ctx context.Context, batchSize int) error {
    prodCh, errCh := concurrentProducer()
    batch := make([]int, 0, batchSize)

    // ... Read a batch but also handle errors!
    batchConsumer(batch)

    return nil
}

func main() {
    err := batchCollector(context.Background(), 5)
    if err != nil {
        log.Fatalln(err)
    }
}

I want to implement the batchCollector function such that values from the prodCh are batched and given to the batchConsumer once they hit the batch size (or no more values are available).

At the same time, I want to handle the Context and any potential errors coming from the errCh.

The best I could come up with is:

func batchCollector(ctx context.Context, batchSize int) error {
    prodCh, errCh := concurrentProducer()
    batch := make([]int, 0, batchSize)

    elem, consuming := 0, true
    for consuming {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case err := <-errCh:
            return err
        case elem, consuming = <-prodCh:
            if !consuming {
                break
            }
            batch = append(batch, elem)
            if len(batch) < batchSize {
                continue
            }
        }

        batchConsumer(batch)
        batch = batch[:0]
    }

    return nil
}

(https://goplay.tools/snippet/ohPVw8YW2-C)

But this seems overly involved with a hard to follow control flow. Is there an idiomatic pattern I'm unaware of which handles this gracefully?


Solution

  • The logic is mostly correct, however you would want to handle context cancellation in the producer, not in the consumer because if you cancel the consumer and not the producer, the producer will leak. On context cancelation or error, simply close the channel in the producer:

    func concurrentProducer(ctx context.Context) (<-chan int, <-chan error) {
        prodCh, errCh := make(chan int, 5), make(chan error, 1)
    
        go func() {
            defer close(prodCh)
            for _ = range 13 {
               value, err:=produceValue()
                if err!=nil {
                   errCh<-err
                   return
                }
                select {
                  case prodCh <- value
                  case <-ctx.Done(): return
                }
            }
        }()
    
        return prodCh, errCh
    }
    

    Then the batch function is now more elegant:

    func batchCollector(ctx context.Context, batchSize int) error {
        prodCh, errCh := concurrentProducer(ctx)
        batch := make([]int, 0, batchSize)
        for value:=range prodCh {
            batch = append(batch, elem)
            if len(batch) < batchSize {
                continue
             }
             batchConsumer(batch)
             batch = batch[:0]
         }
         // If error happened, throw away accumulated batch
         // Check nonzero batch before error if you don't want to lose batches
         var err error
         select {
           case err=<-errCh:
           default:
         }
         if len(batch)>0 {
            batchConsumer(batch)
         }
    
        return err
    }