c++c++11zeromqmsgpackmessagepack

Sending data through ZeroMQ (zmqpp) using MsgPack gives 'msgpack::v1::insufficient_bytes' error


I made a PUB/SUB connection using zmqpp and now I want to send data from the publisher to the subscribers using the header-only, C++11 version of msgpack-c.

The publisher has to send 2 int64_t numbers -- header_1 and header_2 -- followed by a std::vector<T> -- data --, where T is determined by the (header_1, header_2) combination.

Sinse there aren't that many examples on how to combine msgpack and zmqpp, the idea I came up with is to send a 3-part message by using zmqpp::message::add/add_raw. Each part would be packed/unpacked using msgpack.

The publisher packs a single data part as follows:

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());

And the receiver unpacks it like this:

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
                static_cast<const char*>(msg.raw_data(0)),
                msg.size(0));
unpackedData.get().convert(&header_1);

When I run the code, I get the following error on the subscriber side:

terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
  what():  insufficient bytes
Aborted

Also, it seems that zmqpp has generated a 5-part message, even though I called add() only 3 times.

Q1: Am I packing/unpacking the data correctly ?

Q2: Is this the proper method for sending msgpack buffers using zmqpp ?

Here are the important parts of the code:

Publisher

zmqpp::socket publisherSock;
/* connection setup stuff ...*/

// forever send data to the subscribers
while(true)
{
    zmqpp::message msg;

    // meta info about the data
    int64_t header_1 = 1234567;
    int64_t header_2 = 89;
    // sample data
    std::vector<double> data;
    data.push_back(1.2);
    data.push_back(3.4);
    data.push_back(5.6);


    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_1);
        msg.add(buffer.data(), buffer.size());
        cout << "header_1:" << header_1 << endl;  // header_1:1234567
    }

    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_2);
        msg.add(buffer.data(), buffer.size());
        cout << "header_2:" << header_2 << endl;  // header_2:89
    }

    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, data);
        msg.add_raw(buffer.data(), buffer.size());
        std::cout << "data: " << data << std::endl;  // data:[1.2 3.4 5.6]
    }

    std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"... why ?
    publisherSock.send(msg);

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

Subscriber

zmqpp::socket subscriberSock;
/* connection setup stuff ...*/

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
int64_t header_2;
std::vector<double> data;

std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"
{
    // header 1
    {
        msgpack::unpacked unpackedData;
        // crash !
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(0)),
                        msg.size(0));
        unpackedData.get().convert(&header_1);
        cout << "header_1:" << header_1 << endl;
    }
    // header 2
    {
        msgpack::unpacked unpackedData;
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(1)),
                        msg.size(1));
        unpackedData.get().convert(&header_2);
        cout << "header_2:" << header_2 << endl;
    }
    // data
    {
        msgpack::unpacked unpacked_data;
        msgpack::unpack(unpacked_data,
                        static_cast<const char*>(msg.raw_data(2)),
                        msg.size(2));
        unpacked_data.get().convert(&data);
        std::cout << "data:" << data << std::endl;
    }

}

EDIT: Problem solved: As pointed out by @Jens, the correct way of packing/sending data is by using zmqpp::message::add_raw()

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());

Solution

  • I think the calls to msg.add(buffer.data(), buffer.size()do not add a array of buffer.size() bytes, but call message::add(Type const& part, Args &&...args), which

    1. msg << buffer.data(), which probably calls message::operator<<(bool) since a pointer converts to bool
    2. add(buffer.size()) which then calls msg << buffer.size(), which adds a size_t value as the next part.

    Looking at the zmqpp::message class, using message::add_raw should do the trick.

    PS: This is all without any guarantee because I have never used zmqpp or msgpack.