c++multithreadingboostasiostdasync

Boost:asio and async in multi-threading


I need to call method which is a request for a remote server. After that i want to wait for an answer, and waiting is not blocked by other asynchronous function/objects(timers for example).

Method got_response(...) tells user that he got an answer from remote server, also method gets entry data which we got as an answer. Below I got my solution, but sometimes timer can be called in single thread, which will lead to method got_response() hanging.

How can I call timer to be guaranteed in other thread for answer simulation. Is there any other solution to my problem?

#include <iostream>
#include <boost/asio.hpp>
#include <future>
#include <thread>
#include <vector>
using namespace std;

namespace io = boost::asio;

struct Reply
{
    atomic<bool> ready;
    atomic<int> result;

    future<void> future_result;

    Reply()
    {
        ready = false;
        result = 0;
    }

    void call()
    {
        cout << "retry called!" << endl;
        future_result = async([&]()
                              {
                                  while (!ready)
                                  {
                                      this_thread::yield();
                                  }
                              });
    }

    int get()
    {
        future_result.wait();
        return result.load();
    }

    void got_response(int res)
    {
        result = res;
        ready = true;
    }
};

int main()
{
    Reply reply;
    reply.call();

    io::io_context context(4);

    io::steady_timer timer1(context, std::chrono::seconds(2));
    timer1.async_wait([&](const boost::system::error_code &ec)
                      { cout << "timer 1, thread name: " << this_thread::get_id() << endl; });

    io::steady_timer timer2(context, std::chrono::seconds(3));
    timer2.async_wait([&](const boost::system::error_code &ec)
                      {
                          cout << "timer 2, thread name: " << this_thread::get_id() << endl;
                          cout << reply.get() << endl;
                      });

    io::steady_timer timer3(context, std::chrono::seconds(10));
    timer3.async_wait([&](const boost::system::error_code &ec)
                      {
                          cout << "timer 3, thread name: " << this_thread::get_id() << endl;
                          reply.got_response(1337);
                      });

    vector<thread> threads;
    auto count = 2;

    for (int n = 0; n < count; ++n)
    {
        threads.emplace_back([&]
                             { context.run(); });
    }

    for (auto &th : threads)
    {
        th.join();
    }
}

Result:

retry called!
timer 1, thread name: 140712511198784
timer 2, thread name: 140712519591488
timer 3, thread name: 140712511198784
1337

Solution

  • Wow. This overcomplicating on several levels.

    Demo

    Here's my expanded but simplified take on the question code:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <future>
    #include <iostream>
    #include <thread>
    namespace io = boost::asio;
    using namespace std::chrono_literals;
    using boost::system::error_code;
    
    // not very useful in practice, but for debug output in main
    std::ostream& debug(error_code);
    template <typename Fut> bool is_ready(Fut const& fut) {
        return fut.wait_for(0s) == std::future_status::ready;
    }
    
    int main() {
        std::promise<int>  reply;
        std::shared_future got_value = reply.get_future();
    
        io::thread_pool              context(2);
        std::deque<io::steady_timer> timers;
    
        for (int i = 1; i <= 10; ++i) {
            timers //
                .emplace_back(context, i * 1s)
                .async_wait([&got_value](error_code ec) {
                    if (is_ready(got_value))
                        debug(ec) << " Reply:" << got_value.get() << std::endl;
                    else
                        debug(ec) << " (reply not ready)" << std::endl;
                });
        }
    
        timers //
            .emplace_back(context, 4'500ms)
            .async_wait([&reply](error_code ec) {
                debug(ec) << " setting value" << std::endl;
                reply.set_value(1337);
            });
    
        context.join();
    }
        
    int friendly_thread_id() {
        return std::hash<std::thread::id>{}(std::this_thread::get_id()) % 256;
    }
    
    #include <iomanip>
    std::ostream& debug(error_code ec) {
        auto        now           = std::chrono::system_clock::now;
        static auto program_start = now();
        return std::cout //
            << ((now() - program_start) / 1ms) << "ms\t"
            << "thread:" << std::hex << std::setfill('0') << std::showbase
            << std::setw(2) << friendly_thread_id() << std::dec << " ";
    }
    
    
    #include <iomanip>
    std::ostream& debug(error_code ec) {
        auto        now           = std::chrono::system_clock::now;
        static auto program_start = now();
        return std::cout //
            << ((now() - program_start) / 1ms) << "ms\t"
            << "thread:" << std::hex << std::setfill('0') << std::showbase
            << std::setw(2) << pretty_thread_id() << std::dec << " ";
    }
    

    Prints

    0ms     thread:0x5f  (reply not ready)
    999ms   thread:0xf3  (reply not ready)
    1999ms  thread:0x5f  (reply not ready)
    2999ms  thread:0x5f  (reply not ready)
    3499ms  thread:0xf3  setting value
    3999ms  thread:0x5f  Reply:1337
    4999ms  thread:0xf3  Reply:1337
    5999ms  thread:0xf3  Reply:1337
    6999ms  thread:0xf3  Reply:1337
    7999ms  thread:0xf3  Reply:1337
    8999ms  thread:0xf3  Reply:1337