I want get application producer-consumer(with shutdown by signals) with Go.
A producer constantly generate messages in a queue with a limit of 10. Some consumers read and process this channel. If the number of messages in the queue is 0, the producer again generates 10 messages. When a stop signal is received, the producer stops generating new messages, and the consumers process everything that is in the channel.
I found a code, but can't understand if it works correctly because found strange things:
Result:
Code example:
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
const nConsumers = 2
in := make(chan int, 10)
p := Producer{&in}
c := Consumer{&in, make(chan int, nConsumers)}
go p.Produce()
ctx, cancelFunc := context.WithCancel(context.Background())
go c.Consume(ctx)
wg := &sync.WaitGroup{}
wg.Add(nConsumers)
for i := 1; i <= nConsumers; i++ {
go c.Work(wg, i)
}
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
cancelFunc()
wg.Wait()
}
type Consumer struct {
in *chan int
jobs chan int
}
func (c Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.jobs {
fmt.Printf("Worker #%d start job %d\n", i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf("Worker #%d finish job %d\n", i, job)
}
fmt.Printf("Worker #%d interrupted\n", i)
}
func (c Consumer) Consume(ctx context.Context) {
for {
select {
case job := <-*c.in:
c.jobs <- job
case <-ctx.Done():
close(c.jobs)
fmt.Println("Consumer close channel")
return
}
}
}
type Producer struct {
in *chan int
}
func (p Producer) Produce() {
task := 1
for {
*p.in <- task
fmt.Printf("Send value %d\n", task)
task++
time.Sleep(time.Millisecond * 500)
}
}
Why, after stopping the program, not all messages from the queue were processed, it seems that part of the data was lost.
That's because when the ctx
is done, (Consumer).Consume
stops reading from the in
channel, but the goroutine created by go p.Produce()
still writes to the in
channel.
The demo below fixes this issue and simplify the source code.
Notes:
Produce
stops when the ctx
is done. And it closes the in
channel.
The field jobs
is removed from the Consumer
and the workers read from the in
channel directly.
The following requirement is ignored because it's weird. A common behavior is when a job is produced, and the in
channel is not full, the job will be sent to the in
channel immediately; when it's full, the send operation will block until a job is read from the in
channel.
If the number of messages in the queue is 0, the producer again generates 10 messages
package main
import (
"context"
"fmt"
"math/rand"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
const nConsumers = 2
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
in := make(chan int, 10)
p := Producer{in}
c := Consumer{in}
go p.Produce(ctx)
var wg sync.WaitGroup
wg.Add(nConsumers)
for i := 1; i <= nConsumers; i++ {
go c.Work(&wg, i)
}
<-ctx.Done()
fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
wg.Wait()
}
type Consumer struct {
in chan int
}
func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
defer wg.Done()
for job := range c.in {
fmt.Printf("Worker #%d start job %d\n", i, job)
time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
fmt.Printf("Worker #%d finish job %d\n", i, job)
}
fmt.Printf("Worker #%d interrupted\n", i)
}
type Producer struct {
in chan int
}
func (p *Producer) Produce(ctx context.Context) {
task := 1
for {
select {
case p.in <- task:
fmt.Printf("Send value %d\n", task)
task++
time.Sleep(time.Millisecond * 500)
case <-ctx.Done():
close(p.in)
return
}
}
}