sqlitegogo-colly

Golang concurrent R/W to database


I'm writing some Go software that is responsible for downloading and parsing a large number of JSON files and writing that parsed data to a sqlite database. My current design has 10 go routines simultaneously downloading/parsing these JSONs and communicating them to another go routine whose sole job is to listen on a specific channel and write the channel contents to the DB.

The system does some additional read operations after all writing should have been completed, which leads to an issue where queries return incorrect results because not all of the data has been written to the table. Because the JSON data I'm pulling is dynamic, I have no easy way to know when all the data has been written.

I've considered two possibilities for solving this, though I'm not super happy with either solution:

  1. Listen on the channel and wait for it to be empty. This should work in principle, however, it does not ensure that the data has been written, all it ensures is it's been received on the channel.
  2. Synchronize access to the DB. This again should work in principle, however, I would still need to order the query operation to be after all the write operations.

Are there any other design decisions I should consider to rectify this issue? For reference the libraries I'm using to pull this data are go-colly and the go-sqlite3. Appreciate all the help!


Solution

  • You can use a sync.WaitGroup

    e.g.

    package main
    
    import "sync"
    
    func main() {
        // Some sort of job queue for your workers to process. This job queue should be closed by the process
        // that populates it with items. Once the job channel is closed, any for loops ranging over the channel
        // will read items until there are no more items, and then break.
        jobChan := make(chan JobInfo)
    
        // Populate the job queue here...
        // ...
        close(jobChan)
    
        // We now have a full queue of jobs that can't accept new jobs because the channel is closed.
    
        // Number of concurrent workers.
        workerCount := 10
    
        // Initialize the WaitGroup.
        wg := sync.WaitGroup{}
        wg.Add(workerCount)
    
        // Create the worker goroutines.
        for i := 0; i < workerCount; i++ {
            go func() {
                // When the jobChan is closed, and no more jobs are available on the queue, the for loop
                // will exit, causing wg.Done() to be called, and the anonymous function to exit.
                for job := range jobChan {
                    // Process job.
                }
                wg.Done()
            }()
        }
    
        // Wait for all workers to call wg.Done()
        wg.Wait()
    
        // Whatever you want to do after all queue items have been processed goes here.
        // ...
    }