gochannelconsumerproducer

golang producer consumer number of messages received


I have written producer-consumer pattern in golang. Reading multiple csv files and processing records. I am reading all records of csv file in one go.

I want to log percentage of processing completion in interval of 5% of total records including all csv files. for e.g I have 3 csv to process & each have 20,30,50 rows/records (so in total 100 records to process) want to log progress when 5 records are processed.

func processData(inputCSVFiles []string) {
    producerCount := len(inputCSVFiles)
    consumerCount := producerCount

    link := make(chan []string, 100)
    wp := &sync.WaitGroup{}
    wc := &sync.WaitGroup{}

    wp.Add(producerCount)
    wc.Add(consumerCount)

    for i := 0; i < producerCount; i++ {
        go produce(link, inputCSVFiles[i], wp)
    }

    for i := 0; i < consumerCount; i++ {
        go consume(link, wc)
    }
    wp.Wait()
    close(link)
    wc.Wait()
    fmt.Println("Completed data migration process for all CSV data files.")
}

func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
    defer wg.Done()
    records := readCsvFile(filePath)
    totalNumberOfRecords := len(records)
    for _, record := range records {
        link <- record
    }
}

func consume(link <-chan []string, wg *sync.WaitGroup) {
    defer wg.Done()
    for record := range link {
        // process csv record
    }
}

Solution

  • I have used atomic variable & counter channel where consumer will push count when record is processed & other goroutine will read from channel & calculate total processed record percentage.

    var progressPercentageStep float64 = 5.0
    var totalRecordsToProcess int32
    
    func processData(inputCSVFiles []string) {
            producerCount := len(inputCSVFiles)
            consumerCount := producerCount
            link := make(chan []string, 100)
            counter := make(chan int, 100)
            defer close(counter)
            wp := &sync.WaitGroup{}
            wc := &sync.WaitGroup{}
        
            wp.Add(producerCount)
            wc.Add(consumerCount)
        
            for i := 0; i < producerCount; i++ {
                go produce(link, inputCSVFiles[i], wp)
            }
    
            go progressStats(counter)
    
            for i := 0; i < consumerCount; i++ {
                go consume(link, wc)
            }
            wp.Wait()
            close(link)
            wc.Wait()
            
        }
        
        func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
            defer wg.Done()
            records := readCsvFile(filePath)
            atomic.AddInt32(&totalRecordsToProcess, int32(len(records)))
            for _, record := range records {
                link <- record
            }
        }
        
        func consume(link <-chan []string,counter chan<- int, wg *sync.WaitGroup) {
            defer wg.Done()
            for record := range link {
                // process csv record
                counter <- 1
            }
        }
        
    func progressStats(counter <-chan int) {
        var feedbackThreshold = progressPercentageStep
        for count := range counter {
            totalRemaining := atomic.AddInt32(&totalRecordsToProcess, -count)
            donePercent := 100.0 * processed / totalRemaining
            // log progress
            if donePercent >= feedbackThreshold {
                log.Printf("Progress ************** Total Records: %d, Processed Records : %d, Processed Percentage: %.2f **************\n", totalRecordsToProcess, processed, donePercent)
                feedbackThreshold += progressPercentageStep
            }
        }
    }