c++boostinterprocess

What is the right way to send a serialized struct over boost.interprocess.?


I am trying to send a struct of data over shared memory, using boost interprocess. I want to serialize the data and send it as binary.

I am basing my approach on this answer:

Send complex data structure via boost message queue

However once I have switched to my struct type, my code does not send the data. Everything compiles and runs fine. However when I close and re-open the receiver application, the data no longer sends / receives. How can I do this in such a way that the receiver can be closed and opened, and will automatically pick up the connection again?


Solution

  • Firstly I simplified and extended the reproducer:

    Next up, I verified that normal operation is okay.

    The Problem: Safe Stop

    You don't say how you stop the program(s). Since there's nothing in the code shown, I might assume that you use Ctrl-C (or worse):

    As you can see you get a lock exception. You need to cleanly exit. You can implement your own graceful shutdown, or use signals to intercept Ctrl-C/termination:

    std::atomic_bool shutdown{false};
    
    boost::asio::thread_pool io{1};
    boost::asio::signal_set  sigs(io, SIGINT, SIGTERM);
    sigs.async_wait([&](boost::system::error_code const& ec, int num) {
        std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
        shutdown = true;
    });
    

    e.g. Adjusted sender.cpp

    #include "datastruct.hpp"
    #include <Eigen/Core>
    #include <boost/archive/binary_iarchive.hpp>
    #include <boost/archive/binary_oarchive.hpp>
    #include <boost/archive/text_oarchive.hpp>
    #include <boost/asio.hpp>
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <iostream>
    
    namespace bip = boost::interprocess;
    
    static int num = 1;
    
    void Grab(std::vector<Eigen::Vector3d> points) {
        dataStruct::FvpData data{{}, ++num};
        for (auto const& p : points)
            data.UserPoints.push_back(dataStruct::VUserPoint{
                .PositionX = static_cast<float>(p.x()),
                .PositionY = static_cast<float>(p.y()),
                .PositionZ = static_cast<float>(p.z()),
            });
    
        try {
            bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
    
            std::stringstream oss;
    
            {
                boost::archive::binary_oarchive oa(oss);
                oa << data;
            } // completes the archive!
    
            std::string serialized_string(oss.str());
            mq.send(serialized_string.data(), serialized_string.size(), 1);
    
            std::cout << data.frameNumber << std::endl;
        } catch (bip::interprocess_exception const& ex) {
            std::cerr << ex.what() << std::endl;
        }
    }
    
    #include <random>
    #include <thread>
    int main() {
        std::cout << std::fixed << std::setprecision(3);
    
        std::mt19937                           gen(std::random_device{}());
        std::uniform_real_distribution<double> dist(-10, 10);
    
        std::atomic_bool shutdown{false};
    
        boost::asio::thread_pool io{1};
        boost::asio::signal_set  sigs(io, SIGINT, SIGTERM);
        sigs.async_wait([&](boost::system::error_code const& ec, int num) {
            std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
            shutdown = true;
        });
    
        try {
            for (; !shutdown; std::this_thread::sleep_for(std::chrono::seconds(1))) {
                std::vector<Eigen::Vector3d> points;
                for (int i = 0; i < 3; ++i) {
                    auto x = dist(gen), y = dist(gen), z = dist(gen);
                    points.emplace_back(x, y, z);
                    std::cout << x << " " << y << " " << z << std::endl;
                }
    
                Grab(points);
            }
        } catch (std::exception const& ex) {
            std::cerr << ex.what() << std::endl;
        }
    
        io.join();
    }
    

    See it live:

    Observations

    You serialize doubles as floats. That loses precision.

    You copy everything multiple times: first from Eigen to your data struct, then to a string stream, from the stream to a string then to the queue, etc.

    I'd suggest using serialization directly on the Eigen data. It seems to me it could be zero-copy:

    static_assert(std::is_standard_layout_v<Eigen::Vector3d>);
    static_assert(sizeof(Eigen::Vector3d) == 3 * sizeof(double));
    

    Sadly, Eigen's implementation is somehow not trivially-copyable (I think this may be incidental?). However you can see a lot of answers on the net (and on this site) showing how to serialize Eigen vectors and matrices directly.

    BONUS

    Here's a version that

    Live On Compiler Explorer

    #include <boost/archive/binary_iarchive.hpp>
    #include <boost/archive/binary_oarchive.hpp>
    #include <boost/asio.hpp>
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <boost/iostreams/device/array.hpp>
    #include <boost/iostreams/stream.hpp>
    #include <boost/serialization/is_bitwise_serializable.hpp>
    #include <boost/serialization/vector.hpp>
    
    #include <Eigen/Core>
    #include <iomanip>
    #include <iostream>
    #include <random>
    #include <thread>
    static constexpr inline size_t MAX_SIZE = 150'000;
    
    // tentative despite the types not being trivially-copyable
    // static_assert(std::is_trivially_copyable_v<Eigen::Vector3f>);
    static_assert(std::is_standard_layout_v<Eigen::Vector3f>);
    static_assert(sizeof(Eigen::Vector3f) == 3 * sizeof(float));
    BOOST_IS_BITWISE_SERIALIZABLE(Eigen::Vector3f);
    
    // static_assert(std::is_trivially_copyable_v<Eigen::Vector3d>);
    static_assert(std::is_standard_layout_v<Eigen::Vector3d>);
    static_assert(sizeof(Eigen::Vector3d) == 3 * sizeof(double));
    BOOST_IS_BITWISE_SERIALIZABLE(Eigen::Vector3d);
    
    static auto constexpr ar_flags =                //
        boost::archive::archive_flags::no_header |  //
        boost::archive::archive_flags::no_codecvt | //
        boost::archive::archive_flags::no_tracking;
    
    static std::atomic_bool g_shutdown{false};
    using namespace std::chrono_literals;
    namespace bip = boost::interprocess;
    namespace bio = boost::iostreams;
    
    static auto constexpr QUEUE_NAME = "7be6183b-de45-4c06-9fae-f1fbdcd24b66";
    bip::message_queue g_mq(bip::open_or_create, QUEUE_NAME, 10, MAX_SIZE);
    
    static auto now() { return std::chrono::high_resolution_clock::now(); }
    static constexpr auto timeres = 500ms;
    
    void receiver() {
        for (std::string buf; !g_shutdown;) {
            buf.resize(MAX_SIZE);
            bip::message_queue::size_type recvd_size;
            unsigned int                  priority;
    
            for (auto t = now() + timeres; !g_shutdown; t += timeres) {
                if (g_mq.timed_receive(&buf[0], MAX_SIZE, recvd_size, priority, t))
                    break;
            }
            buf.resize(recvd_size);
    
            bio::stream<bio::array_source>  arrstr(buf.data(), buf.size());
            boost::archive::binary_iarchive ia(arrstr, ar_flags);
    
            int                          frameNum;
            std::vector<Eigen::Vector3f> points;
    
            ia >> frameNum >> points;
    
            std::cout << frameNum << std::endl;
            for (auto const& p : points)
                std::cout << p.x() << " " << p.y() << " " << p.z() << "\n";
        }
    }
    
    void sender() {
        std::mt19937                           gen(std::random_device{}());
        std::uniform_real_distribution<double> dist(-10, 10);
        std::array<char, MAX_SIZE>             buf;
    
        for (int frameNum = 1; !g_shutdown; std::this_thread::sleep_for(timeres)) {
            std::vector<Eigen::Vector3f> points;
            for (int i = 0; i < 3; ++i) {
                auto x = dist(gen), y = dist(gen), z = dist(gen);
                points.emplace_back(x, y, z);
                std::cout << x << " " << y << " " << z << std::endl;
            }
    
            frameNum += 1;
            bio::stream<bio::array_sink> arrstr(buf.data(), buf.size());
            boost::archive::binary_oarchive(arrstr, ar_flags) << frameNum << points;
    
            for (auto t = now() + timeres; !g_shutdown; t += timeres) {
                if (g_mq.timed_send(buf.data(), arrstr.tellp(), 0, t)) {
                    std::cout << "Sent " << arrstr.tellp() << " bytes, frameNum " << frameNum << std::endl;
                    break;
                } else {
                    std::this_thread::sleep_until(t);
                    std::cout << "Waiting..." << std::endl;
                }
            }
        }
    }
    
    void on_signal(boost::system::error_code ec, int signum) {
        std::cerr << "Signal " << ::strsignal(signum) << " (" << ec.message() << ")" << std::endl;
        g_shutdown = true;
    }
    
    int main(int argc, char**) try {
        std::cout << std::fixed << std::setprecision(3);
        boost::asio::signal_set sigs(boost::asio::system_executor{}, SIGINT, SIGTERM);
        sigs.async_wait(on_signal);
    
        if (argc > 1) {
            receiver();
        } else {
            sender();
            // bip::message_queue::remove(QUEUE_NAME);
        }
    } catch (std::exception const& ex) {
        std::cerr << "Error: " << ex.what() << std::endl;
    }
    

    With a proper stress-test: