gotcpprotoprotobuf-go

GoLang Protobuf: How to send multiple messages using the same tcp connection?


I am using GoLang protobuf for encoding (and decoding) messages that are sent through a single tcp connection.

The .proto struct

message Prepare{
   int64 instance = 1;
   int64 round = 2;
   int64 nodeId = 3;
}

Then I use the protoc tool to generate the corresponding stubs.

This is how I write the contents to the wire.

func (t *Prepare) Marshal(wire io.Writer) {

    data, err := proto.Marshal(t)
    if err != nil {
        panic(err)
    }
    _, err = wire.Write(data)
    if err != nil {
        panic(err)
    }
}

And this is how I read and unmarshall in the receiver side.

func (t *Prepare) Unmarshal(wire io.Reader) error {
    data := make([]byte, 8*1024*1024) 
    length, err := wire.Read(data)
    if err != nil {
        panic(err)
    }
    err = proto.Unmarshal(data[:length], t)
    if err != nil {
        panic(err)
    }
    return nil
}

If for each protobuf message, a new tcp connection is spawn, the above approach works fine. But when a single tcp connection is used to transmit multiple messages (persistent connections), then the unmarshalling fails with the error proto: invalid field number

This problem occurs because, protobuf messages when sent using a single connection does not enforce any message boundaries, thus when reading length, err := wire.Read(data) the data buffer can contain bytes corresponding to 1) multiple protobuff messages, and 2) partial protobuff messages.

The protobuf documentation mentions the following as a solution.

If you want to write multiple messages to a single file or stream, it is up to you to keep track of where one message ends and the next begins. The Protocol Buffer wire format is not self-delimiting, so protocol buffer parsers cannot determine where a message ends on their own. The easiest way to solve this problem is to write the size of each message before you write the message itself. When you read the messages back in, you read the size, then read the bytes into a separate buffer, then parse from that buffer. (If you want to avoid copying bytes to a separate buffer, check out the CodedInputStream class (in both C++ and Java) which can be told to limit reads to a certain number of bytes.)

While this is an intuitive method, it boils down to a chicken-and-egg problem. The length of the byte array written to the wire (as taken from data, err := proto.Marshal(t); len(data) ) is not fixed, and its not known how many bytes will be required for representing this number (len(data)). Now we have the same problem as in, how to send the length of the byte array to read in the receiver side, without actually knowing how many bytes will be taken for that length (stated differently, how can the receiver know how many bytes are corresponding to the length field)

Any suggestions for this?

Thanks


Solution

  • I would recommend using gRPC, but you already stated you don't want that. I can also recommend sending simple UTP packages, since UDP doesn't need a connection at all.

    If you want to stick to your current approach, the solution is simple though: After marshalling protobuf to a byte array, you know it's length. It's len(data) and that's the value you want to write first. The actual number of bytes written by wire.Write() will be the same. If not, there was a problem with the connection, and the package was only written partialy. So the receiver can't unmarshal it anways.

    When receiving, first read the length, prepare a buffer with the correct size or, even better, make a LimitedReader and unmarshal from there.

    The number-of-bytes should be encoded as an integer. You can either use a 32bit or 64bit value, and you also need to decide between little and big endian - what you use is irrelevant, as long as the size and endianess is the same on the sender and receiver side.

    Take a look at https://pkg.go.dev/encoding/binary and the functions defined on ByteOrder:

    binary.LittleEndian.PutUint64(w, uint64(len(data)))
    length := int64(binary.LittleEndian.Uint64(r))
    

    Of course, if there is even a simple bug or you are wrong by only one byte, all the remaining data is effectively useless. By sending messages as dedicated UDP packages, you can avoid this issue.