I'm trying to send a class over boost::message queue
using boost::serialization
, boost::Arcive
, and boost::split members
(load and save)
the problem is when I'm trying to deserialize I'm getting the input stream error
exception
#include <iostream>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/version.hpp>
#include <random>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/archive/impl/basic_text_oarchive.ipp>
#include <boost/archive/impl/text_oarchive_impl.ipp>
#include <boost/archive/impl/text_iarchive_impl.ipp>
#include <boost/serialization/split_member.hpp>
class Data{
public:
int a_ ;
double b_ ;
std::string s ;
template<class Archive>
void serialize(Archive & ar, const unsigned int version) {
boost::serialization::split_member(ar, *this, version);
}
template<class Archive> void save(Archive & ar, unsigned int version) const {
// ar << order_request_type_;
ar << a_;
ar << b_;
ar << s; }
template<class Archive> void load(Archive & ar, unsigned int version) {
// ar >> order_request_type_;
ar >> a_;
ar >> b_;
ar >> s;
}
private:
// friend class boost::archive::access;
friend class boost::archive::save_access;
};
[[nodiscard]]bool RunChiled() {
using namespace boost::interprocess; try {
message_queue mq(open_only //open or create
, "message_queue" //name
);
unsigned int priority = 0;
message_queue::size_type recvd_size;
Data d;
std::stringstream iss;
std::string serialized_string;
serialized_string.resize(150);
long long number = 0;
while(true)
{
mq.receive(&serialized_string[0], 150, recvd_size , priority);
std::cout << serialized_string << "\n";
iss << serialized_string;
try{
boost::archive::text_iarchive ia(iss); // <-- getting the exception
ia >> d;
} catch (const std::exception& ex) {
std::cout << ex.what() << "\n";
}
number++;
std::cout << d.a_ << " " << d.b_ << " " << d.s << "\n";
} }catch(const interprocess_exception &ex) {
message_queue::remove("message_queue");
std::cout << "interprocess_exception " << ex.what() << std::endl;
return 1; } catch (const std::exception& e) {
std::cout << "exception " << e.what() << std::endl;
message_queue::remove("message_queue");
return 1; }
message_queue::remove("message_queue"); return true; }
int main() { std::cout << "1\n"; std::default_random_engine generator; std::uniform_real_distribution<double> distribution(0,15);
using namespace std;
cout << "Boost version: " << BOOST_LIB_VERSION << endl; using namespace boost::interprocess; message_queue::remove("message_queue"); auto pid = fork();
if(pid > 0) {
std::cout << "2\n";
sleep(2);
try {
auto res = RunChiled();
std::cout << res;
} catch (...) {
std::cout << "error\n";
}
} else if(pid == 0) {
try{
boost::interprocess::message_queue mq(create_only,"message_queue", 100, 150);
std::stringstream oss;
Data request;
request.b_ = 17.5;
request.a_ = I;
request.s = to_string(17.5) + " " + to_string(i);
try {
boost::archive::text_oarchive oa(oss);
oa << request;
} catch (const std::exception& e) {
std::cout << "serialzation:" << e.what() ;
}
try{
// std::cout << "oss " << oss.str() << "\n";
std::string serialized_string(oss.str());
std::cout << "serialized_string " << oss.str().size() << "\n";
mq.send(&serialized_string, serialized_string.size(), 0);
}catch(const std::exception& e){
std::cout << "\n send exeption " << e.what() << "\n";
}
}
}catch (const std::exception& e){
message_queue::remove("message_queue");
std::cout << e.what() ;
}
}
return 0;
}
A number of big issues.
Firstly
mq.send(&serialized_string, serialized_string.size(), 0);
That's Undefined
Behaviour because
serialzed_string
isn't POD and the size doesn't match either. You
probbably meant something like on the receive side:
mq.send(serialized_string.data(), serialized_string.size(), 0);
You're resizing your serialized_string
message to 150, and never
back to the actual size. This means it will not work correctly as
there will be trailing data.
Fixing it:
std::string buffer(buffer_size, '\0');
unsigned int priority = 0;
message_queue::size_type recvd_size;
mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
buffer.resize(recvd_size);
The serialization can be simpler without splitting:
class Data {
public:
int a_;
double b_;
std::string s;
template <class Archive> void serialize(Archive& ar, unsigned) {
ar & a_ & b_ & s;
}
private:
friend class boost::serialization::access; // not required
};
Other notes
return 1
and return true
from bool runChiled
look iffy - one of them is probably a bug
[[nodiscard]]
seems a little bit tricky on a function that normally will not return ([[noreturn]]
might be more apt, optionally just passing exceptions out?)
you will want to seed your random generator with something actually random:
std::default_random_engine generator { std::random_device{}() };
#include <iostream>
#include <iomanip>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <boost/core/demangle.hpp>
#include <boost/version.hpp>
#include <random>
#include <thread> // this_thread
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/serialization/split_member.hpp>
namespace {
namespace bip = boost::interprocess;
using bip::message_queue;
auto constexpr queuename = "message_queue";
auto constexpr max_queued = 5;
auto constexpr buffer_size = 150;
auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };
using namespace std::chrono_literals;
}
class Data {
public:
int a_{};
double b_{};
std::string s;
template <class Ar> void serialize(Ar& ar, unsigned /*unused*/) {
ar& a_& b_& s;
}
};
[[noreturn]] void RunChild()
{
std::cout << "2" << std::endl;
sleep_for(2s);
try {
message_queue mq(bip::open_only,queuename);
size_t number = 0;
for (std::string buffer(buffer_size, '\0');; buffer.resize(buffer_size)) {
{
unsigned int priority = 0;
message_queue::size_type recvd_size = 0;
#ifdef COLIRU
// make the process terminate for online compiler
{
using namespace boost::posix_time;
auto deadline = second_clock::universal_time() + seconds(3);
if (not mq.timed_receive(buffer.data(), buffer.size(),
recvd_size, priority, deadline))
{
throw std::runtime_error("no more messages");
}
}
#else
mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
#endif
buffer.resize(recvd_size);
}
//std::cout << buffer << std::endl;
Data d;
try {
std::stringstream iss(buffer);
boost::archive::text_iarchive ia(iss);
ia >> d;
} catch (const std::exception& ex) {
std::cout << ex.what() << std::endl;
}
++number;
std::cout << "Received: " << d.a_ << " " << d.b_ << " "
<< std::quoted(d.s) << std::endl;
}
} catch (const std::exception& e) {
std::cout << boost::core::demangle(typeid(e).name()) << " " << e.what()
<< std::endl;
message_queue::remove(queuename);
throw; // re-raise
}
message_queue::remove(queuename);
}
static void RunParent()
{
std::cout << "1" << std::endl;
std::default_random_engine generator { std::random_device{}() };
std::uniform_real_distribution<double> distribution(0, 15);
message_queue mq(bip::create_only, queuename, max_queued, buffer_size);
for (auto i = 0; i < 10; ++i) {
auto value = distribution(generator);
Data const request {
i,
value,
std::to_string(value) + " " + std::to_string(i)
};
std::stringstream oss;
try {
boost::archive::text_oarchive oa(oss);
oa << request;
std::string buffer = std::move(oss).str();
std::cout << "Sending " << buffer.size() << " bytes" << std::endl;
mq.send(buffer.data(), buffer.size(), 0);
} catch (const std::exception& e) {
std::cout << "\nsend exeption " << e.what() << std::endl;
}
}
}
int main() {
std::cout << "Boost version: " << BOOST_LIB_VERSION << std::endl;
message_queue::remove(queuename);
try {
if (auto pid = ::fork(); pid > 0) {
RunChild();
} else if (pid == 0) {
RunParent();
}
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
message_queue::remove(queuename);
}
}
Prints
Boost version: 1_75
2
1
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Sending 73 bytes
Sending 72 bytes
Sending 72 bytes
Received: 0 1.12642 "1.126420 0"
Received: 1 14.2474 "14.247412 1"
Received: 2 3.22163 "3.221631 2"
Sending 73 bytes
Sending 72 bytes
Received: 3 3.20471 "3.204709 3"
Sending 72 bytes
Received: 4 10.7838 "10.783761 4"
Received: 5 5.74063 "5.740629 5"
Received: 6 6.98008 "6.980078 6"
Received: 7 11.6643 "11.664257 7"
Received: 8 3.80561 "3.805614 8"
Received: 9 7.79641 "7.796408 9"
std::runtime_error no more messages
no more messages