I am trying to send a struct of data over shared memory, using boost interprocess. I want to serialize the data and send it as binary.
I am basing my approach on this answer:
Send complex data structure via boost message queue
However once I have switched to my struct type, my code does not send the data. Everything compiles and runs fine. However when I close and re-open the receiver application, the data no longer sends / receives. How can I do this in such a way that the receiver can be closed and opened, and will automatically pick up the connection again?
File datastruct.hpp
#ifndef DATASTRUCT_HPP
#define DATASTRUCT_HPP
#include <string>
#include <vector>
#include <boost/serialization/vector.hpp>
#define MAX_SIZE 150000
namespace dataStruct {
struct VUserPoint {
float PositionX;
float PositionY;
float PositionZ;
template <typename Archive> void serialize(Archive& ar, unsigned int const version) {
ar & PositionX;
ar & PositionY;
ar & PositionZ;
}
};
struct FvpData {
// Array of current points in frame
std::vector<VUserPoint> UserPoints;
int frameNumber;
template <typename Archive> void serialize(Archive& ar, unsigned int const version) {
ar & UserPoints;
ar & frameNumber;
}
};
} // namespace dataStruct
#endif // DATASTRUCT_HPP
File sender.cpp
#include "DataSender.h"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
using namespace boost::interprocess;
int num = 1;
void DataSender::Grab(std::vector<Eigen::Vector3d> points) {
dataStruct::FvpData data;
// add userpoints
data.UserPoints.clear();
for (int i = 0; i < points.size(); i++) {
dataStruct::VUserPoint pnt;
pnt.PositionX = points[i].x();
pnt.PositionY = points[i].y();
pnt.PositionZ = points[i].z();
data.UserPoints.push_back(pnt);
}
num += 1;
data.frameNumber = num;
try {
message_queue mq(open_or_create, "mq", 100, MAX_SIZE);
std::stringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << data;
std::string serialized_string(oss.str());
mq.send(serialized_string.data(), serialized_string.size(), 1);
std::cout << data.frameNumber << std::endl;
} catch (interprocess_exception& ex) {
std::cerr << ex.what() << std::endl;
}
}
File receiver.cpp
#include <iostream>
#include <string>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include "DataStruct.hpp"
using namespace boost::interprocess;
int main() {
try {
message_queue mq(open_or_create, "mq", 100, MAX_SIZE);
message_queue::size_type recvd_size;
unsigned int priority;
while (true) {
dataStruct::FvpData me;
std::stringstream iss;
std::string serialized_string;
serialized_string.resize(MAX_SIZE);
mq.receive(&serialized_string[0], MAX_SIZE, recvd_size, priority);
iss << serialized_string;
boost::archive::binary_iarchive ia(iss);
ia >> me;
std::cout << me.frameNumber << std::endl;
// std::cout << me.name << std::endl;
}
} catch (interprocess_exception& ex) {
std::cerr << ex.what() << std::endl;
}
// message_queue::remove("mq");
}
Firstly I simplified and extended the reproducer:
File datastruct.hpp
#pragma once
#include <boost/serialization/vector.hpp>
static constexpr inline size_t MAX_SIZE = 150'000;
namespace dataStruct {
struct VUserPoint {
float PositionX, PositionY, PositionZ;
void serialize(auto& ar, unsigned) { ar & PositionX & PositionY & PositionZ; }
};
struct FvpData {
std::vector<VUserPoint> UserPoints;
int frameNumber;
void serialize(auto& ar, unsigned) { ar & UserPoints & frameNumber; }
};
} // namespace dataStruct
File sender.cpp
#include "datastruct.hpp"
#include <Eigen/Core>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
namespace bip = boost::interprocess;
static int num = 1;
void Grab(std::vector<Eigen::Vector3d> points) {
dataStruct::FvpData data{{}, ++num};
for (auto const& p : points)
data.UserPoints.push_back(dataStruct::VUserPoint{
.PositionX = static_cast<float>(p.x()),
.PositionY = static_cast<float>(p.y()),
.PositionZ = static_cast<float>(p.z()),
});
try {
bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
std::stringstream oss;
{
boost::archive::binary_oarchive oa(oss);
oa << data;
} // completes the archive!
std::string serialized_string(oss.str());
mq.send(serialized_string.data(), serialized_string.size(), 1);
std::cout << data.frameNumber << std::endl;
} catch (bip::interprocess_exception const& ex) {
std::cerr << ex.what() << std::endl;
}
}
#include <random>
#include <thread>
int main() {
std::cout << std::fixed << std::setprecision(3);
std::mt19937 gen(std::random_device{}());
std::uniform_real_distribution<double> dist(-10, 10);
for (int n = 100; n--; std::this_thread::sleep_for(std::chrono::seconds(1))) {
std::vector<Eigen::Vector3d> points;
for (int i = 0; i < 3; ++i) {
auto x = dist(gen), y = dist(gen), z = dist(gen);
points.emplace_back(x, y, z);
std::cout << x << " " << y << " " << z << std::endl;
}
Grab(points);
}
}
File receiver.cpp
#include "datastruct.hpp"
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <string>
namespace bip = boost::interprocess;
int main() {
std::cout << std::fixed << std::setprecision(3);
try {
bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
bip::message_queue::size_type recvd_size;
unsigned int priority;
while (true) {
dataStruct::FvpData me;
std::stringstream iss;
std::string serialized_string;
serialized_string.resize(MAX_SIZE);
mq.receive(&serialized_string[0], MAX_SIZE, recvd_size, priority);
iss << serialized_string;
boost::archive::binary_iarchive ia(iss);
ia >> me;
std::cout << me.frameNumber << std::endl;
for (auto const& p : me.UserPoints)
std::cout << p.PositionX << " " << p.PositionY << " " << p.PositionZ << "\n";
}
} catch (bip::interprocess_exception const& ex) {
std::cerr << ex.what() << std::endl;
}
// message_queue::remove("mq");
}
Next up, I verified that normal operation is okay.
You don't say how you stop the program(s). Since there's nothing in the code shown, I might assume that you use Ctrl-C (or worse):
As you can see you get a lock exception. You need to cleanly exit. You can implement your own graceful shutdown, or use signals to intercept Ctrl-C/termination:
std::atomic_bool shutdown{false};
boost::asio::thread_pool io{1};
boost::asio::signal_set sigs(io, SIGINT, SIGTERM);
sigs.async_wait([&](boost::system::error_code const& ec, int num) {
std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
shutdown = true;
});
e.g. Adjusted sender.cpp
#include "datastruct.hpp"
#include <Eigen/Core>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
namespace bip = boost::interprocess;
static int num = 1;
void Grab(std::vector<Eigen::Vector3d> points) {
dataStruct::FvpData data{{}, ++num};
for (auto const& p : points)
data.UserPoints.push_back(dataStruct::VUserPoint{
.PositionX = static_cast<float>(p.x()),
.PositionY = static_cast<float>(p.y()),
.PositionZ = static_cast<float>(p.z()),
});
try {
bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
std::stringstream oss;
{
boost::archive::binary_oarchive oa(oss);
oa << data;
} // completes the archive!
std::string serialized_string(oss.str());
mq.send(serialized_string.data(), serialized_string.size(), 1);
std::cout << data.frameNumber << std::endl;
} catch (bip::interprocess_exception const& ex) {
std::cerr << ex.what() << std::endl;
}
}
#include <random>
#include <thread>
int main() {
std::cout << std::fixed << std::setprecision(3);
std::mt19937 gen(std::random_device{}());
std::uniform_real_distribution<double> dist(-10, 10);
std::atomic_bool shutdown{false};
boost::asio::thread_pool io{1};
boost::asio::signal_set sigs(io, SIGINT, SIGTERM);
sigs.async_wait([&](boost::system::error_code const& ec, int num) {
std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
shutdown = true;
});
try {
for (; !shutdown; std::this_thread::sleep_for(std::chrono::seconds(1))) {
std::vector<Eigen::Vector3d> points;
for (int i = 0; i < 3; ++i) {
auto x = dist(gen), y = dist(gen), z = dist(gen);
points.emplace_back(x, y, z);
std::cout << x << " " << y << " " << z << std::endl;
}
Grab(points);
}
} catch (std::exception const& ex) {
std::cerr << ex.what() << std::endl;
}
io.join();
}
See it live:
You serialize doubles as floats. That loses precision.
You copy everything multiple times: first from Eigen to your data struct, then to a string stream, from the stream to a string then to the queue, etc.
I'd suggest using serialization directly on the Eigen data. It seems to me it could be zero-copy:
static_assert(std::is_standard_layout_v<Eigen::Vector3d>);
static_assert(sizeof(Eigen::Vector3d) == 3 * sizeof(double));
Sadly, Eigen's implementation is somehow not trivially-copyable (I think this may be incidental?). However you can see a lot of answers on the net (and on this site) showing how to serialize Eigen vectors and matrices directly.
Here's a version that
avoids the double->float conversion
avoids unnecessary copies
avoids unnecessary archive overhead:
static auto constexpr ar_flags =
boost::archive::archive_flags::no_header |
boost::archive::archive_flags::no_codecvt |
boost::archive::archive_flags::no_tracking;
avoid deadlocking on full/empty queue by using timed_send
and timed_receive
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/iostreams/device/array.hpp>
#include <boost/iostreams/stream.hpp>
#include <boost/serialization/is_bitwise_serializable.hpp>
#include <boost/serialization/vector.hpp>
#include <Eigen/Core>
#include <iomanip>
#include <iostream>
#include <random>
#include <thread>
static constexpr inline size_t MAX_SIZE = 150'000;
// tentative despite the types not being trivially-copyable
// static_assert(std::is_trivially_copyable_v<Eigen::Vector3f>);
static_assert(std::is_standard_layout_v<Eigen::Vector3f>);
static_assert(sizeof(Eigen::Vector3f) == 3 * sizeof(float));
BOOST_IS_BITWISE_SERIALIZABLE(Eigen::Vector3f);
// static_assert(std::is_trivially_copyable_v<Eigen::Vector3d>);
static_assert(std::is_standard_layout_v<Eigen::Vector3d>);
static_assert(sizeof(Eigen::Vector3d) == 3 * sizeof(double));
BOOST_IS_BITWISE_SERIALIZABLE(Eigen::Vector3d);
static auto constexpr ar_flags = //
boost::archive::archive_flags::no_header | //
boost::archive::archive_flags::no_codecvt | //
boost::archive::archive_flags::no_tracking;
static std::atomic_bool g_shutdown{false};
using namespace std::chrono_literals;
namespace bip = boost::interprocess;
namespace bio = boost::iostreams;
static auto constexpr QUEUE_NAME = "7be6183b-de45-4c06-9fae-f1fbdcd24b66";
bip::message_queue g_mq(bip::open_or_create, QUEUE_NAME, 10, MAX_SIZE);
static auto now() { return std::chrono::high_resolution_clock::now(); }
static constexpr auto timeres = 500ms;
void receiver() {
for (std::string buf; !g_shutdown;) {
buf.resize(MAX_SIZE);
bip::message_queue::size_type recvd_size;
unsigned int priority;
for (auto t = now() + timeres; !g_shutdown; t += timeres) {
if (g_mq.timed_receive(&buf[0], MAX_SIZE, recvd_size, priority, t))
break;
}
buf.resize(recvd_size);
bio::stream<bio::array_source> arrstr(buf.data(), buf.size());
boost::archive::binary_iarchive ia(arrstr, ar_flags);
int frameNum;
std::vector<Eigen::Vector3f> points;
ia >> frameNum >> points;
std::cout << frameNum << std::endl;
for (auto const& p : points)
std::cout << p.x() << " " << p.y() << " " << p.z() << "\n";
}
}
void sender() {
std::mt19937 gen(std::random_device{}());
std::uniform_real_distribution<double> dist(-10, 10);
std::array<char, MAX_SIZE> buf;
for (int frameNum = 1; !g_shutdown; std::this_thread::sleep_for(timeres)) {
std::vector<Eigen::Vector3f> points;
for (int i = 0; i < 3; ++i) {
auto x = dist(gen), y = dist(gen), z = dist(gen);
points.emplace_back(x, y, z);
std::cout << x << " " << y << " " << z << std::endl;
}
frameNum += 1;
bio::stream<bio::array_sink> arrstr(buf.data(), buf.size());
boost::archive::binary_oarchive(arrstr, ar_flags) << frameNum << points;
for (auto t = now() + timeres; !g_shutdown; t += timeres) {
if (g_mq.timed_send(buf.data(), arrstr.tellp(), 0, t)) {
std::cout << "Sent " << arrstr.tellp() << " bytes, frameNum " << frameNum << std::endl;
break;
} else {
std::this_thread::sleep_until(t);
std::cout << "Waiting..." << std::endl;
}
}
}
}
void on_signal(boost::system::error_code ec, int signum) {
std::cerr << "Signal " << ::strsignal(signum) << " (" << ec.message() << ")" << std::endl;
g_shutdown = true;
}
int main(int argc, char**) try {
std::cout << std::fixed << std::setprecision(3);
boost::asio::signal_set sigs(boost::asio::system_executor{}, SIGINT, SIGTERM);
sigs.async_wait(on_signal);
if (argc > 1) {
receiver();
} else {
sender();
// bip::message_queue::remove(QUEUE_NAME);
}
} catch (std::exception const& ex) {
std::cerr << "Error: " << ex.what() << std::endl;
}
With a proper stress-test: