gozeromq

ZMQ Subscriber not receiving all the messages


I am trying PUB SUB ZMQ communication using Golang. I was trying to send a number of messages (for example 10000) from PUB to SUB and checking the time required to receive all the messages in SUB. But I am never getting all the messages in SUB.

What could be the reason and how can I fix this?

Pub code -

package main

import (
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    zmq "github.com/pebbe/zmq4"
)

const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func SampleStrOfSize(size int) string {
    b := make([]byte, size)
    idx := 0
    for i := 0; i < size; i++ {
        b[i] = letters[idx]
        idx = (idx + 1) % len(letters)
    }
    // fmt.Println("message: ", string(b))
    return string(b)
}

func main() {
    publisher, err := zmq.NewSocket(zmq.PUB)
    if err != nil {
        os.Exit()
    }
    defer publisher.Close()
    connectionStr := "tcp://127.0.0.1:5555"

    err = publisher.Bind(connectionStr)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("Bound to", connectionStr)

        // Make some random message string
    message := SampleStrOfSize(1024)
    msgCount := 0

    if len(os.Args) > 1 {
        if msgCount, err = strconv.Atoi(os.Args[1]); err != nil {
            return
        }
    } else {
        fmt.Println("No msg count provided")
        return
    }

    if msgCount <= 0 {
        fmt.Printf("Invalid msg count (%v) provided", msgCount)
        return
    }

        // Giving some time for SUB to connect
    time.Sleep(time.Second * 5)
    fmt.Println("Starting message sending")
    start := time.Now()

    for i := 0; i < msgCount; i++ {
                // Send same message every time
        _, err = publisher.Send(message, 0)
        if err != nil {
            fmt.Printf("Error occured while sending message. %v", err)
        }
    }

    elapsed := time.Since(start)
    fmt.Printf("Sending %d messages took %s\n", msgCount, elapsed)
        // Do not close immediately
        time.Sleep(time.Second * 30)
}

SUB code -

package main

import (
    "fmt"
    "os"
    "strconv"
    "time"

    zmq "github.com/pebbe/zmq4"
)

func main() {
    subscriber, err := zmq.NewSocket(zmq.SUB)
    if err != nil {
        fmt.Println("Failed to open socket")
        os.Exit(1)
    }
    defer subscriber.Close()

    err = subscriber.Connect("tcp://127.0.0.1:5555")
    if err != nil {
        fmt.Println("Connect failed")
        os.Exit(1)
    }

    msgCount := 0
    count := 0
    if len(os.Args) > 1 {
        if msgCount, err = strconv.Atoi(os.Args[1]); err != nil {
            return
        }
    } else {
        fmt.Println("No msg count provided")
        os.Exit(1)
    }
    fmt.Printf("Expecting %d messages\n", msgCount)

    // Subscribe for all messages
    err = subscriber.SetSubscribe("")
    if err != nil {
        fmt.Println("Failed to subscribe for all messages")
        os.Exit(1)
    }

    var start time.Time
    for {
        _, err := subscriber.Recv(0)

        if count == 0 {
            start = time.Now()
        }
        if err != nil {
            fmt.Println("Receive failed")
        }

        count++

        if count == msgCount {
            break
        } else if 0 == count%1000 {
            // Print time for every 1000 messages
            elapsed := time.Since(start)
            fmt.Printf("Received %d messages in %s\n", count, elapsed)
        }
    }

    elapsed := time.Since(start)
    fmt.Printf("Received %d messages in %s\n", msgCount, elapsed)
}

When I am running the both code with say msgCount 10000, I am not getting 10000 messages in SUB. For example one time I got following output in SUB -

Expecting 10000 messages
Received 1000 messages in 17.435321ms
Received 2000 messages in 25.530057ms
Received 3000 messages in 27.80558ms
Received 4000 messages in 1m40.583143061s
Received 5000 messages in 1m40.590513201s
Received 6000 messages in 1m40.597145666s

What could be unusual delay after 3000 messages?


Solution

  • ZMQ has a High water mark

    The high water mark is a hard limit on the maximum number of outstanding messages ZeroMQ is queuing in memory for any single peer that the specified socket is communicating with.

    If this limit has been reached the socket enters an exceptional state and depending on the socket type, ZeroMQ will take appropriate action such as blocking or dropping sent messages.

    https://zeromq.org/socket-api/#high-water-mark

    The default value for this is 1000, you can set it using the pebble/zmq4 lib: https://pkg.go.dev/github.com/pebbe/zmq4#Socket.SetSndhwm

    The high water mark causes messages to be dropped

    When a PUB socket enters the mute state due to having reached the high water mark for a subscriber, then any messages that would be sent to the subscriber in question shall instead be dropped until the mute state ends.

    https://zeromq.org/socket-api/#pub-socket

    Your subscriber will be receiving messages, but not as fast as they are being produced in the publisher. This causes the number of outstanding to grow. It looks like the HWM is hit after about 3000 messages are received, then mute is triggered until 1m40.

    I assume you are not receiving all 10,000 messages? Because the middle 4000 are being dropped by the publisher.