I have written producer-consumer pattern in golang. Reading multiple csv files and processing records. I am reading all records of csv file in one go.
I want to log percentage of processing completion in interval of 5% of total records including all csv files. for e.g I have 3 csv to process & each have 20,30,50 rows/records (so in total 100 records to process) want to log progress when 5 records are processed.
func processData(inputCSVFiles []string) {
producerCount := len(inputCSVFiles)
consumerCount := producerCount
link := make(chan []string, 100)
wp := &sync.WaitGroup{}
wc := &sync.WaitGroup{}
wp.Add(producerCount)
wc.Add(consumerCount)
for i := 0; i < producerCount; i++ {
go produce(link, inputCSVFiles[i], wp)
}
for i := 0; i < consumerCount; i++ {
go consume(link, wc)
}
wp.Wait()
close(link)
wc.Wait()
fmt.Println("Completed data migration process for all CSV data files.")
}
func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
defer wg.Done()
records := readCsvFile(filePath)
totalNumberOfRecords := len(records)
for _, record := range records {
link <- record
}
}
func consume(link <-chan []string, wg *sync.WaitGroup) {
defer wg.Done()
for record := range link {
// process csv record
}
}
I have used atomic variable & counter channel where consumer will push count when record is processed & other goroutine will read from channel & calculate total processed record percentage.
var progressPercentageStep float64 = 5.0
var totalRecordsToProcess int32
func processData(inputCSVFiles []string) {
producerCount := len(inputCSVFiles)
consumerCount := producerCount
link := make(chan []string, 100)
counter := make(chan int, 100)
defer close(counter)
wp := &sync.WaitGroup{}
wc := &sync.WaitGroup{}
wp.Add(producerCount)
wc.Add(consumerCount)
for i := 0; i < producerCount; i++ {
go produce(link, inputCSVFiles[i], wp)
}
go progressStats(counter)
for i := 0; i < consumerCount; i++ {
go consume(link, wc)
}
wp.Wait()
close(link)
wc.Wait()
}
func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
defer wg.Done()
records := readCsvFile(filePath)
atomic.AddInt32(&totalRecordsToProcess, int32(len(records)))
for _, record := range records {
link <- record
}
}
func consume(link <-chan []string,counter chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for record := range link {
// process csv record
counter <- 1
}
}
func progressStats(counter <-chan int) {
var feedbackThreshold = progressPercentageStep
for count := range counter {
totalRemaining := atomic.AddInt32(&totalRecordsToProcess, -count)
donePercent := 100.0 * processed / totalRemaining
// log progress
if donePercent >= feedbackThreshold {
log.Printf("Progress ************** Total Records: %d, Processed Records : %d, Processed Percentage: %.2f **************\n", totalRecordsToProcess, processed, donePercent)
feedbackThreshold += progressPercentageStep
}
}
}