c++boostboost-serialization

getting "input stream error" when trying to desirealize the object using boost::serialization and arcive


I'm trying to send a class over boost::message queue using boost::serialization, boost::Arcive, and boost::split members (load and save) the problem is when I'm trying to deserialize I'm getting the input stream error exception

#include <iostream>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

#include <boost/version.hpp>
#include <random>


#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/serialization/access.hpp>
#include <boost/archive/impl/basic_text_oarchive.ipp>
#include <boost/archive/impl/text_oarchive_impl.ipp>
#include <boost/archive/impl/text_iarchive_impl.ipp>
#include <boost/serialization/split_member.hpp>


class Data{ 
    public:
    int a_ ;   
    double b_ ;
    std::string s ;
    template<class Archive>   
    void serialize(Archive & ar, const unsigned int version)   {
    boost::serialization::split_member(ar, *this, version); 
    }

  template<class Archive>   void save(Archive & ar, unsigned int version) const   {
    // ar << order_request_type_;
    ar << a_;
    ar << b_;
    ar << s;   }

  template<class Archive>   void load(Archive & ar, unsigned int version)   {
    // ar >> order_request_type_;
    ar >> a_;
    ar >> b_;
    ar >> s;
  }


 private:
  // friend class boost::archive::access;
  friend class   boost::archive::save_access;

};


[[nodiscard]]bool RunChiled() { 
    using namespace boost::interprocess; try {
    message_queue mq(open_only   //open or create
        , "message_queue"     //name
    );
    unsigned int priority = 0;
    message_queue::size_type recvd_size;
    Data d;
    std::stringstream iss;
    std::string serialized_string;
    serialized_string.resize(150);
    long long number = 0;
    while(true)
    {
        mq.receive(&serialized_string[0], 150, recvd_size , priority);
        std::cout << serialized_string << "\n";
        iss << serialized_string;
        try{
          boost::archive::text_iarchive ia(iss);  // <-- getting the exception
          ia >> d;
        } catch  (const std::exception& ex) {
          std::cout << ex.what() << "\n";
        }
        number++;
        std::cout << d.a_ << " " << d.b_ << " " << d.s << "\n";
      }   }catch(const interprocess_exception &ex) {
    message_queue::remove("message_queue");
    std::cout << "interprocess_exception " << ex.what() << std::endl;
    return 1;   } catch (const std::exception& e)   {
    std::cout << "exception " << e.what() << std::endl;
    message_queue::remove("message_queue");
    return 1;   }

  message_queue::remove("message_queue");   return true; }


int main() {   std::cout << "1\n";   std::default_random_engine generator;   std::uniform_real_distribution<double> distribution(0,15);

  using namespace std;
    cout << "Boost version: " << BOOST_LIB_VERSION << endl;   using namespace boost::interprocess;   message_queue::remove("message_queue");   auto pid = fork();

  if(pid > 0)   {
    std::cout << "2\n";
    sleep(2);
    try {
      auto res = RunChiled();
      std::cout << res;
    } catch (...) {
      std::cout << "error\n";
    } 
 } else if(pid == 0)   {
    try{    
        boost::interprocess::message_queue mq(create_only,"message_queue", 100, 150);
        std::stringstream oss;
        Data request;       
        request.b_ = 17.5;      
        request.a_ = I;         
        request.s = to_string(17.5) + " " + to_string(i);       
        try {           
            boost::archive::text_oarchive oa(oss);
            oa << request;
        } catch (const std::exception& e) {
            std::cout << "serialzation:" << e.what() ;
        }
        try{    
           // std::cout << "oss " << oss.str() << "\n";         
           std::string serialized_string(oss.str());
           std::cout << "serialized_string " << oss.str().size() << "\n";                   
           mq.send(&serialized_string, serialized_string.size(), 0); 
        }catch(const std::exception& e){
            std::cout << "\n send     exeption " << e.what() << "\n";
        } 
    }
    }catch (const std::exception& e){             
        message_queue::remove("message_queue");
        std::cout << e.what() ;
    }
}   
return 0;
 }

Solution

  • A number of big issues.

    1. Firstly

      mq.send(&serialized_string, serialized_string.size(), 0);
      

      That's Undefined Behaviour because serialzed_string isn't POD and the size doesn't match either. You probbably meant something like on the receive side:

      mq.send(serialized_string.data(), serialized_string.size(), 0);
      
    2. You're resizing your serialized_string message to 150, and never back to the actual size. This means it will not work correctly as there will be trailing data.

      Fixing it:

      std::string buffer(buffer_size, '\0');
      unsigned int priority = 0;
      message_queue::size_type recvd_size;
      
      mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
      buffer.resize(recvd_size);
      

    Other Notes

    The serialization can be simpler without splitting:

    class Data {
      public:
        int a_;
        double b_;
        std::string s;
    
        template <class Archive> void serialize(Archive& ar, unsigned) {
            ar & a_ & b_ & s;
        }
    
      private:
        friend class boost::serialization::access; // not required
    };
    

    Other notes

    Live Demo With Fixes

    Live On Coliru

    Live On Wandbox

    #include <iostream>
    #include <iomanip>
    #include <boost/interprocess/shared_memory_object.hpp>
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <boost/date_time.hpp>
    
    #include <boost/core/demangle.hpp>
    #include <boost/version.hpp>
    #include <random>
    #include <thread> // this_thread
    
    #include <boost/archive/text_oarchive.hpp>
    #include <boost/archive/text_iarchive.hpp>
    #include <boost/serialization/access.hpp>
    #include <boost/serialization/split_member.hpp>
    
    namespace {
        namespace bip = boost::interprocess;
        using bip::message_queue;
        auto constexpr queuename = "message_queue";
        auto constexpr max_queued = 5;
        auto constexpr buffer_size = 150;
    
        auto sleep_for = [](auto d) { std::this_thread::sleep_for(d); };
        using namespace std::chrono_literals;
    }
    
    class Data {
      public:
        int a_{};
        double b_{};
        std::string s;
    
        template <class Ar> void serialize(Ar& ar, unsigned /*unused*/) {
            ar& a_& b_& s;
        }
    };
    
    [[noreturn]] void RunChild()
    {
        std::cout << "2" << std::endl;
        sleep_for(2s);
    
        try {
            message_queue mq(bip::open_only,queuename);
            size_t number = 0;
            for (std::string buffer(buffer_size, '\0');; buffer.resize(buffer_size)) {
                {
                    unsigned int priority = 0;
                    message_queue::size_type recvd_size = 0;
    #ifdef COLIRU
                    // make the process terminate for online compiler
                    {
                        using namespace boost::posix_time;
                        auto deadline = second_clock::universal_time() + seconds(3);
                        if (not mq.timed_receive(buffer.data(), buffer.size(),
                                recvd_size, priority, deadline))
                        {
                            throw std::runtime_error("no more messages");
                        }
                    }
    #else
                    mq.receive(buffer.data(), buffer.size(), recvd_size, priority);
    #endif
                    buffer.resize(recvd_size);
                }
                //std::cout << buffer << std::endl;
    
                Data d;
                try {
                    std::stringstream iss(buffer);
                    boost::archive::text_iarchive ia(iss);
                    ia >> d;
                } catch (const std::exception& ex) {
                    std::cout << ex.what() << std::endl;
                }
                ++number;
                std::cout << "Received: " << d.a_ << " " << d.b_ << " "
                          << std::quoted(d.s) << std::endl;
            }
        } catch (const std::exception& e) {
            std::cout << boost::core::demangle(typeid(e).name()) << " " << e.what()
                      << std::endl;
            message_queue::remove(queuename);
            throw; // re-raise
        }
    
        message_queue::remove(queuename);
    }
    
    static void RunParent()
    {
        std::cout << "1" << std::endl;
        std::default_random_engine generator { std::random_device{}() };
        std::uniform_real_distribution<double> distribution(0, 15);
    
        message_queue mq(bip::create_only, queuename, max_queued, buffer_size);
    
        for (auto i = 0; i < 10; ++i) {
            auto value = distribution(generator);
            Data const request {
                i,
                value,
                std::to_string(value) + " " + std::to_string(i)
            };
    
            std::stringstream oss;
            try {
                boost::archive::text_oarchive oa(oss);
                oa << request;
    
                std::string buffer = std::move(oss).str();
                std::cout << "Sending " << buffer.size() << " bytes" << std::endl;
                mq.send(buffer.data(), buffer.size(), 0);
            } catch (const std::exception& e) {
                std::cout << "\nsend exeption " << e.what() << std::endl;
            }
        }
    }
    
    int main() {
        std::cout << "Boost version: " << BOOST_LIB_VERSION << std::endl;
        message_queue::remove(queuename);
    
        try {
            if (auto pid = ::fork(); pid > 0) {
                RunChild();
            } else if (pid == 0) {
                RunParent();
            }
        } catch (const std::exception& e) {
            std::cout << e.what() << std::endl;
            message_queue::remove(queuename);
        }
     }
    

    Prints

    Boost version: 1_75
    2
    1
    Sending 72 bytes
    Sending 73 bytes
    Sending 72 bytes
    Sending 72 bytes
    Sending 73 bytes
    Sending 72 bytes
    Sending 72 bytes
    Received: 0 1.12642 "1.126420 0"
    Received: 1 14.2474 "14.247412 1"
    Received: 2 3.22163 "3.221631 2"
    Sending 73 bytes
    Sending 72 bytes
    Received: 3 3.20471 "3.204709 3"
    Sending 72 bytes
    Received: 4 10.7838 "10.783761 4"
    Received: 5 5.74063 "5.740629 5"
    Received: 6 6.98008 "6.980078 6"
    Received: 7 11.6643 "11.664257 7"
    Received: 8 3.80561 "3.805614 8"
    Received: 9 7.79641 "7.796408 9"
    std::runtime_error no more messages
    no more messages