concurrencygosemaphoregoroutineworker-process

Idiomatic variable-size worker pool in Go


I'm trying to implement a pool of workers in Go. The go-wiki (and Effective Go in the Channels section) feature excellent examples of bounding resource use. Simply make a channel with a buffer that's as large as the worker pool. Then fill that channel with workers, and send them back into the channel when they're done. Receiving from the channel blocks until a worker is available. So the channel and a loop is the entire implementation -- very cool!

Alternatively one could block on sending into the channel, but same idea.

My question is about changing the size of the worker pool while it's running. I don't believe there's a way to change the size of a channel. I have some ideas, but most of them seem way too complicated. This page actually implements a semaphore using a channel and empty structs in much the same way, but it has the same problem (these things come up all the time while Googling for "golang semaphore".


Solution

  • I would do it the other way round. Instead of spawning many goroutines (which still require a considerable amount of memory) and use a channel to block them, I would model the workers as goroutines and use a channel to distribute the work. Something like this:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    type Task string
    
    func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) {
        defer wg.Done()
        for {
            select {
            case task, ok := <-tasks:
                if !ok {
                    return
                }
                fmt.Println("processing task", task)
            case <-quit:
                return
            }
        }
    }
    
    func main() {
        tasks := make(chan Task, 128)
        quit := make(chan bool)
        var wg sync.WaitGroup
    
        // spawn 5 workers
        for i := 0; i < 5; i++ {
            wg.Add(1)
            go worker(tasks, quit, &wg)
        }
    
        // distribute some tasks
        tasks <- Task("foo")
        tasks <- Task("bar")
    
        // remove two workers
        quit <- true
        quit <- true
    
        // add three more workers
        for i := 0; i < 3; i++ {
            wg.Add(1)
            go worker(tasks, quit, &wg)
        }
    
        // distribute more tasks
        for i := 0; i < 20; i++ {
            tasks <- Task(fmt.Sprintf("additional_%d", i+1))
        }
    
        // end of tasks. the workers should quit afterwards
        close(tasks)
        // use "close(quit)", if you do not want to wait for the remaining tasks
    
        // wait for all workers to shut down properly
        wg.Wait()
    }
    

    It might be a good idea to create a separate WorkerPool type with some convenient methods. Also, instead of type Task string it is quite common to use a struct that also contains a done channel that is used to signal that the task had been executed successfully.

    Edit: I've played around a bit more and came up with the following: http://play.golang.org/p/VlEirPRk8V. It's basically the same example, with a nicer API.