goshutdown

How to limited producer and read messages by consumers?


I want get application producer-consumer(with shutdown by signals) with Go.

A producer constantly generate messages in a queue with a limit of 10. Some consumers read and process this channel. If the number of messages in the queue is 0, the producer again generates 10 messages. When a stop signal is received, the producer stops generating new messages, and the consumers process everything that is in the channel.

I found a code, but can't understand if it works correctly because found strange things:

  1. Why, after stopping the program, not all messages from the queue were processed, it seems that part of the data was lost. (in the screenshot, 15 messages were sent, but 5 were processed)
  2. How to correctly limit the queue to a limit of 10 messages, that is, we must write 10 messages, wait for processing when the counter of queue becomes 0 and write 10 again?
  3. Is it possible inform the producer after stop signal so that he no longer generates new messages to the channel? (In the screenshot, the producer managed to write to the queue - 12,13,14,15)

Result:

enter image description here

Code example:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}

Solution

  • Why, after stopping the program, not all messages from the queue were processed, it seems that part of the data was lost.

    That's because when the ctx is done, (Consumer).Consume stops reading from the in channel, but the goroutine created by go p.Produce() still writes to the in channel.

    The demo below fixes this issue and simplify the source code.

    Notes:

    1. Produce stops when the ctx is done. And it closes the in channel.

    2. The field jobs is removed from the Consumer and the workers read from the in channel directly.

    3. The following requirement is ignored because it's weird. A common behavior is when a job is produced, and the in channel is not full, the job will be sent to the in channel immediately; when it's full, the send operation will block until a job is read from the in channel.

      If the number of messages in the queue is 0, the producer again generates 10 messages

    package main
    
    import (
        "context"
        "fmt"
        "math/rand"
        "os/signal"
        "sync"
        "syscall"
        "time"
    )
    
    func main() {
        const nConsumers = 2
    
        ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
        defer stop()
    
        in := make(chan int, 10)
        p := Producer{in}
        c := Consumer{in}
        go p.Produce(ctx)
    
        var wg sync.WaitGroup
        wg.Add(nConsumers)
        for i := 1; i <= nConsumers; i++ {
            go c.Work(&wg, i)
        }
    
        <-ctx.Done()
        fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
        wg.Wait()
    }
    
    type Consumer struct {
        in chan int
    }
    
    func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
        defer wg.Done()
        for job := range c.in {
            fmt.Printf("Worker #%d start job %d\n", i, job)
            time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
            fmt.Printf("Worker #%d finish job %d\n", i, job)
        }
        fmt.Printf("Worker #%d interrupted\n", i)
    }
    
    type Producer struct {
        in chan int
    }
    
    func (p *Producer) Produce(ctx context.Context) {
        task := 1
        for {
            select {
            case p.in <- task:
                fmt.Printf("Send value %d\n", task)
                task++
                time.Sleep(time.Millisecond * 500)
            case <-ctx.Done():
                close(p.in)
                return
            }
        }
    }