boostwebsocketconcurrencyasiobeast

Why is boost::asio::io_context::run() still blocking after boost::beast::websocket::stream::close()?


I have a pool of threads using concurrently the same io_context to run a websocket stream. I am doing this because first, I have actually 2 websocket streams (I abstracted this because by testing it appears not to be the problem), and because I want to run other io operations aside the websocket ones, which are namely async_read and async_write.

Each websocket stream is using its own strand and additional locking is used to ensure that a async_read (resp. async_write) is not performed before another one reached the handler.

So basically:

io_context context;
std::vector<std::thread> pool(std::thread::hardware_concurrency());
...
wss(make_strand(context),ssl);
...
wss.async_read(&loop_read_handler);
...
for(auto& th:pool)
    th=std::thread([&]{
        try{
            start_read_loop();//give work to do to each thread
            context.run();
        }catch(...){}
        wss.close(...);//closing the websocket stream, expected to cancel all threads
        context.stop();//with or without it, no change
    });
for(auto& th:pool)
    th.join();//hangs here since the other threads did not return from run()

When I want the program to stop, I close(boost::beast::websocket::close_code::normal,ec) the stream, which effectively cancels the io operations in the current thread (empty message with error code boost::beast::websocket::error::closed received), but not in the other threads : Instead of being cancelled, they hang.

Diving into the code, I eliminated the hypothesis of a deadlock of my own and found that the context.run() just didn't notice the websocket stream was closed and continues waiting for an incoming message.

Of course the problem disappears when the pool is limited to a single thread.

calling close(...) from outside or inside a io operation does not change the problem. calling context.stop() has no effect either on the problem, be it called outside or inside.

What can be the problem and how am I supposed to make the context stop running on a graceful websocket close ?

================================= EDIT WITH SOLUTION

I managed to change my code thanks to sehe answer above. Instead of starting the read loop in each thread, i'm doing it once after the pool initialization, but adding auto work=make_work_guard(context); and work.reset():

io_context context;
auto work=make_work_guard(context);//<<<<<<<<<<<<<<
std::vector<std::thread> pool(std::thread::hardware_concurrency());
...
wss(make_strand(context),ssl);//I keep it because I will add other streams
...
for(auto& th:pool)
    th=std::thread([&]{
        try{ context.run(); }catch(...){} //<<<<<<<<<<<<<<<<<<<
        close_wss_streams_once_each(...);//cancels all threads
    });
start_async_read_loop();//<<<<<<<<<<<<<<
work.reset();//<<<<<<<<<<<<<<<<<
for(auto& th:pool)
    th.join();

Apparently I shouldn't post an IO operation in each thread, which I had decided to do to give all threads work to do. Instead, using the work guqrd prevents the threads from returning prematurely.


Solution

  • the same io_context to run a websocket stream

    A stream is not a process (or even an operation). You cannot "run a [websocket] stream". You basically only ever run an event loop that executes enqueued handlers, aside from the synchronous code.

    the other threads : Instead of being cancelled, they hang

    The code shown begs the opposite question: why don't all the thread all immediately return (because no work exists before starting the threads)? It is clear your actual code is sufficiently different for this to not happen.

    Perhaps you even have an explicit work_guard around. If so, that of course explains why things are not shutting down.

    Of course the problem disappears when the pool is limited to a single thread.

    I'm not sure that helps make sense to me. Logically the chances of a deadlock increase with fewer threads. Regardless, that wasn't your problem.

    Imagined Problem Code, But Working

    Here's what I imagine, just adding that work-guard to make it so that the threads don't all complete before you even post the first async_read:

    net::io_context ioc;
    std::vector<std::thread> pool(std::thread::hardware_concurrency());
    
    auto work = make_work_guard(ioc);
    
    for (auto& th : pool)
        th = std::thread{[&ioc] { try { ioc.run(); } catch (...) { } }};
    

    Now, let's construct, connect, ssl handshake and ws handshake a websocket client (synchronously for simplicity):

    sctx ctx(sctx::tlsv13_client);
    Ws wss(make_strand(ioc), ctx);
    
    auto& s = beast::get_lowest_layer(wss);
    s.connect({{}, 8989});
    wss.next_layer().handshake(Ws::next_layer_type::handshake_type::client);
    wss.handshake("localhost", "/");
    

    Now let's add your loop_read_handler. Apparently that is some kind of (member) function, but we don't have a class here. So let's drop in a closure:

    std::function<void(error_code, size_t)> loop_read_handler;
    
    beast::flat_buffer buf;
    loop_read_handler = [&](error_code ec, size_t n) {
        std::cout << "loop_read_handler " << ec.message() << ", " << n << std::endl;
        if (n)
            std::cout << "Received " << quoted(beast::buffers_to_string(buf.cdata())) << std::endl;
    
        if (!ec) {
            buf.consume(n);
            wss.async_read(buf, loop_read_handler);
        }
    };
    

    Of course, we have to kick off the first read:

    wss.async_read(buf, loop_read_handler); // not on strand, because nothing is yet on the pool
    

    Now, I could do a timer, but realistically you want graceful shutdown when your application receives the signal to terminate, so let's do that for demo:

    net::signal_set ss(ioc, SIGINT, SIGTERM); // SIGINT e.g. from Ctrl-C in a terminal
    ss.async_wait([&](error_code ec, int sig) {
        std::cout << "signal " << ::strsignal(sig) << " (" << ec.message() << ")" << std::endl;
        if (!ec) {
            // on strand:
            post(wss.get_executor(), [&wss] { wss.close(websocket::normal); });
        }
    });
    

    That's all! Now, all we have to do is wait. So, we can remove the scaffolding:

    // from this point we're okay returning, as soon as the read loop stops
    work.reset();
    std::cout << "waiting for graceful shutdown" << std::endl;
    
    for (auto& th : pool)
        th.join();
    
    std::cout << "graceful shutdown complete" << std::endl;
    

    Full Listing

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/websocket/ssl.hpp>
    #include <iomanip>
    #include <iostream>
    namespace net       = boost::asio;
    namespace ssl       = net::ssl;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    
    using boost::system::error_code;
    using net::ip::tcp;
    using sctx = ssl::context;
    using Ws   = websocket::stream<ssl::stream<tcp::socket>>;
    
    int main() {
        net::io_context ioc;
        std::vector<std::thread> pool(std::thread::hardware_concurrency());
    
        auto work = make_work_guard(ioc);
    
        for (auto& th : pool)
            th = std::thread{[&ioc] { try { ioc.run(); } catch (...) { } }};
    
        sctx ctx(sctx::tlsv13_client);
        Ws wss(make_strand(ioc), ctx);
    
        auto& s = beast::get_lowest_layer(wss);
        s.connect({{}, 8989});
        wss.next_layer().handshake(Ws::next_layer_type::handshake_type::client);
        wss.handshake("localhost", "/");
    
        std::function<void(error_code, size_t)> loop_read_handler;
    
        beast::flat_buffer buf;
        loop_read_handler = [&](error_code ec, size_t n) {
            std::cout << "loop_read_handler " << ec.message() << ", " << n << std::endl;
            if (n)
                std::cout << "Received " << quoted(beast::buffers_to_string(buf.cdata())) << std::endl;
    
            if (!ec) {
                buf.consume(n);
                wss.async_read(buf, loop_read_handler);
            }
        };
        wss.async_read(buf, loop_read_handler); // not on strand, because nothing is yet on the pool
    
        net::signal_set ss(ioc, SIGINT, SIGTERM); // SIGINT e.g. from Ctrl-C in a terminal
        ss.async_wait([&](error_code ec, int sig) {
            std::cout << "signal " << ::strsignal(sig) << " (" << ec.message() << ")" << std::endl;
            if (!ec) {
                // on strand:
                post(wss.get_executor(), [&wss] { wss.close(websocket::normal); });
            }
        });
    
        // from this point we're okay returning, as soon as the read loop stops
        work.reset();
        std::cout << "waiting for graceful shutdown" << std::endl;
    
        for (auto& th : pool)
            th.join();
    
        std::cout << "graceful shutdown complete" << std::endl;
    }
    

    Running it against a simple demo wss server:

    websocketd -port 8989 -ssl --sslcert server.pem --sslkey server.pem ping www.google.com            
    

    And either terminating with Ctrl-C in a terminal, or sending it SIGTERM signal:

    enter image description here

    BONUS

    The whole thread pool can be replaced (more correctly!) with asio::thread_pool:

    int main() {
        net::thread_pool ioc;
    
        // ...
        Ws wss(make_strand(ioc), ctx);
    
        // ...
    
        // from this point we're okay returning, as soon as the read loop stops
        std::cout << "waiting for graceful shutdown" << std::endl;
        ioc.join();
        std::cout << "graceful shutdown complete" << std::endl;
    }
    

    That way, you don't have to meddle with a work-guard at all (or worry about correct handling of exceptions).