c++boostboost-asioboost-beastboost-beast-websocket

Boost::beast how to close a (sync) reading websocket?


tl;dr: Is there a way to close a WebSocket that's currently doing (sync) read() operation, if server sends nothing for some time?

I wanted to make a simple WebSocket client with Boost::beast. When I realized that read() is a blocking operation, and that there's no way to tell if there's a message coming - I created a sleeper thread. All the thread does is read() and I can afford to have it blocked if no data is coming.

I want it to be able to close the connection so from non-blocked thread I shoot a websocket::close(). This causes read() to throw a BOOST_ASSERT() at me:

Assertion failed: ! impl.wr_close

How can I close the connection when (sync) read() is ongoing?

Code for reproduction of my scenario:


#include <string>
#include <thread>
#include <chrono>

#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>


using namespace std::chrono_literals;

class HandlerThread {

    enum class Status {
        UNINITIALIZED,
        DISCONNECTED,
        CONNECTED,
        READING,
    };

    const std::string _host;
    const std::string _port;
    std::string _resolvedAddress;

    boost::asio::io_context        _ioc;
    boost::asio::ip::tcp::resolver _resolver;
    boost::beast::websocket::stream<boost::asio::ip::tcp::socket> _websocket;

    boost::beast::flat_buffer _buffer;

    bool isRunning = true;
    Status _connectionStatus = Status::UNINITIALIZED;

public:
    HandlerThread(const std::string& host, const uint16_t port)
    : _host(std::move(host))
    , _port(std::to_string(port))
    , _ioc()
    , _resolver(_ioc)
    , _websocket(_ioc) {}

    void Run() {
        // isRunning is also useless, due to blocking boost::beast operations.
        while(isRunning) {
            switch (_connectionStatus) {
                case Status::UNINITIALIZED:
                case Status::DISCONNECTED:
                    if (!connect()) {
                        _connectionStatus = Status::DISCONNECTED;
                        break;
                    }
                case Status::CONNECTED:
                case Status::READING:
                    if (!read()) {
                        _connectionStatus = Status::DISCONNECTED;
                        break;
                    }
            }
        }
    }

    void Close()
    {
         isRunning = false;
        _websocket.close(boost::beast::websocket::close_code::normal);
    }

private:
    bool connect()
    {
        // All here is copy-paste from the examples.
        boost::system::error_code errorCode;
        // Look up the domain name  
        auto const results = _resolver.resolve(_host, _port, errorCode);
        if (errorCode) return false;

        // Make the connection on the IP address we get from a lookup
        auto ep = boost::asio::connect(_websocket.next_layer(), results, errorCode);
        if (errorCode) return false;

        _resolvedAddress = _host + ':' + std::to_string(ep.port());

        _websocket.set_option(boost::beast::websocket::stream_base::decorator(
            [](boost::beast::websocket::request_type& req)
            {
                req.set(boost::beast::http::field::user_agent,
                    std::string(BOOST_BEAST_VERSION_STRING) +
                        " websocket-client-coro");
            }));

        boost::beast::websocket::response_type res;
        _websocket.handshake(res, _resolvedAddress, "/", errorCode);

        if (errorCode) return false;

        _connectionStatus = Status::CONNECTED;
        return true;
    }

    bool read()
    {
        boost::system::error_code errorCode;
        _websocket.read(_buffer, errorCode);

        if (errorCode) return false;

        if (_websocket.is_message_done()) {
            _connectionStatus = Status::CONNECTED;
            // notifyRead(_buffer);
            _buffer.clear();    
        } else {
            _connectionStatus = Status::READING;
        }

        return true;
    }
};

int main() {
    HandlerThread handler("localhost", 8080);
    std::thread([&]{
        handler.Run();
    }).detach(); // bye!

    std::this_thread::sleep_for(3s);
    handler.Close(); // Bad idea...

    return 0;
}

Solution

  • There is no such thing. You might be able to force something at the TCP stack (so, operating system, usually) level. E.g. disabling the network interface involved.

    Note that most synchronous code can be trivially be transformed into asynchronous code with the exact same blocking semantics using asio::use_future. That means you can use async deadlines. And those are supported by beast out of the box (basic your websocket on beast::tcp_stream instead of asio::ip::tcp::socket)

    UPDATE

    To the added code

    Review/Simplify

    Reduced the code removing unneeded bits and adding some fixes and demonstration handler notification so we can test the functioning:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/signals2.hpp>
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using namespace std::chrono_literals;
    
    class WsConnect {
        using tcp = net::ip::tcp;
        const std::string              _host, _port;
        net::io_context                _ioc;
        beast::flat_buffer             _buffer;
        websocket::stream<tcp::socket> _ws{_ioc};
    
        enum class Status { DISCONNECTED, CONNECTED } _status{Status::DISCONNECTED};
        std::atomic_bool _running{true}; // SEHE
      public:
        boost::signals2::signal<void(std::string)> callback; // SEHE
    
        WsConnect(std::string host, uint16_t port)
            : _host(std::move(host))
            , _port(std::to_string(port)) {}
    
        void Run() {
            while (_running)
                try {
                    switch (_status) {
                        case Status::DISCONNECTED: do_connect(); [[fallthrough]];
                        case Status::CONNECTED: do_read();
                    }
                } catch (boost::system::system_error const& se) {
                    // se.code() is the error_code
                    _status = Status::DISCONNECTED;
                    std::this_thread::sleep_for(50ms); // SEHE avoid tight reconnect loop
                }
        }
    
        void Close() {
            _running = false;
            beast::error_code ec;
            _ws.close(websocket::close_code::normal, ec);
        }
    
      private:
        void do_connect() {
            connect(beast::get_lowest_layer(_ws), tcp::resolver(_ioc).resolve(_host, _port));
    
            _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
            }));
    
            _ws.handshake(_host + ':' + _port, "/");
            _buffer.clear(); // SEHE this was missing
        }
    
        void do_read() {
            do {
                _ws.read(_buffer);
            } while (!_ws.is_message_done());
    
            callback(beast::buffers_to_string(_buffer.cdata())); // SEHE just for demo
            _buffer.clear();
        }
    };
    
    #include <iomanip>
    #include <iostream>
    void handle_message(std::string_view msg) { std::cout << "Handling " << quoted(msg) << std::endl; }
    
    int main() {
        WsConnect conn("localhost", 8989);
        std::thread([&] { conn.Run(); }).detach(); // bye!
        boost::signals2::scoped_connection subscription = conn.callback.connect(handle_message);
    
        std::this_thread::sleep_for(3s);
        conn.Close(); // Bad idea...
    }
    

    Exercising it for demonstration:

    enter image description here

    Design Issues

    I'd argue there are three design issues:

    Here's a fix, naively only time-limiting operations like in your example:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/signals2.hpp>
    #include <iostream>
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using namespace std::chrono_literals;
    
    struct WsConnect {
        boost::signals2::signal<void(std::string)> callback;
    
        WsConnect(std::string host, uint16_t port)
            : _host(std::move(host))
            , _port(std::to_string(port)) {}
    
        void Run(std::chrono::steady_clock::duration d) {
            _ws.next_layer().expires_after(d);
            for (;; std::this_thread::sleep_for(50ms))
                try {
                    do_connect();
                    for (;;)
                        do_read();
                } catch (boost::system::system_error const& se) {
                    std::cerr << "Error: " << se.code().message() << std::endl;
                    if (se.code() == beast::error::timeout)
                        break;
                }
        }
    
      private:
        using tcp = net::ip::tcp;
        const std::string                    _host, _port;
        beast::flat_buffer                   _buffer;
        net::thread_pool                     _ioc{1};
        websocket::stream<beast::tcp_stream> _ws{_ioc};
    
        void do_connect() {
            _ws.next_layer()
                .async_connect(tcp::resolver(_ioc).resolve(_host, _port), net::use_future)
                .get();
    
            _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
            }));
    
            _ws.async_handshake(_host + ':' + _port, "/", net::use_future).get();
            _buffer.clear();
        }
    
        void do_read() {
            do
                _ws.async_read(_buffer, net::use_future).get();
            while (!_ws.is_message_done());
    
            callback(beast::buffers_to_string(_buffer.cdata()));
            _buffer.clear();
        }
    };
    
    void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
    
    int main() {
        WsConnect conn("localhost", 8989);
        boost::signals2::scoped_connection subscription = conn.callback.connect(handle_message);
        conn.Run(3s);
    } // close implied by destructors
    

    Note how the code got simpler, shorter, and we even print error information. It's achieved by using use_future together with beast::tcp_stream::expires_after:

    enter image description here

    Full Cancellation

    To allow for /externally triggered/ cancellation (instead of a fixed deadline), we can cheat a little by using 2 threads, so one can be "clogged" doing blocking waits on the futures:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/signals2.hpp>
    #include <iostream>
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using namespace std::chrono_literals;
    
    struct BackgroundWs {
        boost::signals2::signal<void(std::string)> callback;
    
        BackgroundWs(std::string host, uint16_t port)
            : _host(std::move(host))
            , _port(std::to_string(port)) {}
    
        void Start() {
            post(_ws.get_executor(), [this] { do_run(); });
        }
    
        void Stop() { do_stop().get(); }
    
        ~BackgroundWs() {
            do_stop().wait(); // noexcept, please
        }
    
      private:
        using tcp = net::ip::tcp;
        const std::string              _host, _port;
        beast::flat_buffer             _buffer;
        net::thread_pool               _ioc{2};
        websocket::stream<tcp::socket> _ws{_ioc};
        bool                           _stopped{false};
    
        void do_run() {
            for (; !_stopped; std::this_thread::sleep_for(50ms))
                try {
                    do_connect();
                    for (;;)
                        do_read();
                } catch (boost::system::system_error const& se) {
                    std::cerr << "Error: " << se.code().message() << std::endl;
                }
        }
    
        std::future<void> do_stop() {
            return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
                                _stopped = true;
                                _ws.next_layer().cancel();
                                _ws.close(websocket::normal);
                            }));
        }
    
        void do_connect() {
            async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port), net::use_future)
                .get();
    
            _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
            }));
    
            _ws.async_handshake(_host + ':' + _port, "/", net::use_future).get();
            _buffer.clear();
        }
    
        void do_read() {
            do
                _ws.async_read(_buffer, net::use_future).get();
            while (!_ws.is_message_done());
    
            callback(beast::buffers_to_string(_buffer.cdata()));
            _buffer.clear();
        }
    };
    
    void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
    
    int main() {
        {
            BackgroundWs client("localhost", 8989);
            boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
            client.Start();
    
            std::string input;
            while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
                if (input == "Stop") break;
    
            // client.Stop(); // or just rely on destructor
        } // destructor
    
        std::cout << "Press Enter to quit... ";
        std::cin.ignore(1024, '\n');
    }
    

    Now, the thing runs, reconnecting happily until the user enters Stop on the terminal:

    enter image description here

    No Cheats, C++20

    In C++ using coro's you can have basically identical code being true async. This gets rid of the "cheating extra thread":

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/signals2.hpp>
    #include <iostream>
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using namespace std::chrono_literals;
    
    struct BackgroundWs {
        boost::signals2::signal<void(std::string)> callback;
    
        BackgroundWs(std::string host, uint16_t port)
            : _host(std::move(host))
            , _port(std::to_string(port)) {}
    
        void Start() { net::co_spawn(_ws.get_executor(), do_run(), net::detached); }
        void Stop() { do_stop().get(); }
    
        ~BackgroundWs() {
            do_stop().wait(); // noexcept, please
        }
    
      private:
        using tcp = net::ip::tcp;
        const std::string              _host, _port;
        beast::flat_buffer             _buffer;
        net::thread_pool               _ioc{1};
        websocket::stream<tcp::socket> _ws{_ioc};
        bool                           _stopped{false};
    
        net::awaitable<void> do_run() {
            for (; !_stopped; co_await async_sleep(50ms))
                try {
                    co_await do_connect();
                    for (;;)
                        co_await do_read();
                } catch (boost::system::system_error const& se) {
                    std::cerr << "Error: " << se.code().message() << std::endl;
                }
        }
    
        net::awaitable<void> async_sleep(auto duration) {
            co_await net::steady_timer(_ws.get_executor(), duration).async_wait(net::use_awaitable);
        }
    
        std::future<void> do_stop() {
            return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
                                _stopped = true;
                                _ws.next_layer().cancel();
                                _ws.close(websocket::normal);
                            }));
        }
    
        net::awaitable<void> do_connect() {
            co_await async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port),
                                   net::use_awaitable);
    
            _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
            }));
    
            co_await _ws.async_handshake(_host + ':' + _port, "/", net::use_awaitable);
            _buffer.clear();
        }
    
        net::awaitable<void> do_read() {
            do
                co_await _ws.async_read(_buffer, net::use_awaitable);
            while (!_ws.is_message_done());
    
            callback(beast::buffers_to_string(_buffer.cdata()));
            _buffer.clear();
        }
    };
    
    void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
    
    int main() {
        {
            BackgroundWs client("localhost", 8989);
            boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
    
            client.Start();
    
            std::string input;
            while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
                if (input == "Stop") break;
    
            // client.Stop(); // or just rely on destructor
        } // destructor
    
        std::cout << "Press Enter to quit... ";
        std::cin.ignore(1024, '\n');
    }
    

    Not Using C++20

    This is conceptually identical, but more tedious as it requires explicit callback functions:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <boost/signals2.hpp>
    #include <iostream>
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using namespace std::chrono_literals;
    
    struct BackgroundWs {
        boost::signals2::signal<void(std::string)> callback;
    
        BackgroundWs(std::string host, uint16_t port)
            : _host(std::move(host))
            , _port(std::to_string(port)) {}
    
        void Start() {
            net::post(_ws.get_executor(), [this] {
                _stop_requested = false;
                do_step_machine();
            });
        }
        void Stop() { do_stop().get(); }
    
        ~BackgroundWs() {
            do_stop().wait(); // noexcept, please
        }
    
      private:
        using tcp = net::ip::tcp;
        using error_code = boost::system::error_code;
        const std::string              _host, _port;
        beast::flat_buffer             _buffer;
        net::thread_pool               _ioc{1};
        websocket::stream<tcp::socket> _ws{_ioc};
        net::steady_timer              _timer{_ws.get_executor()};
    
        bool _stop_requested = false;
        enum class Status { INIT, CONNECTED, DISCONNECTED, STOPPED } _status{};
    
        void handle(error_code ec) {
            std::cerr << "Error: " << ec.message() << std::endl;
            if (ec.failed())
                _status = _stop_requested ? Status::STOPPED : Status::DISCONNECTED;
            do_step_machine();
        }
    
        void do_step_machine() {
            switch(_status) {
                case Status::INIT:         return do_connect();
                case Status::CONNECTED:    return do_read();
                case Status::DISCONNECTED: return do_reconnect_delay(50ms);
                case Status::STOPPED:      break;
            };
        }
    
        void do_reconnect_delay(std::chrono::steady_clock::duration d) {
            _timer.expires_after(d);
            _timer.async_wait([this](error_code ec) {
                if (ec) return handle(ec);
                _status = Status::INIT;
                do_step_machine();
            });
        }
    
        std::future<void> do_stop() {
            return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
                                _stop_requested = true;
                                _ws.next_layer().cancel();
                                _ws.close(websocket::normal);
                            }));
        }
    
        void do_connect() {
            async_connect( //
                _ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port),
                [this](error_code ec, tcp::endpoint /*ep*/) {
                    if (ec) return handle(ec);
    
                    _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                        req.set(beast::http::field::user_agent,
                                BOOST_BEAST_VERSION_STRING " WsConnect");
                    }));
    
                    _ws.async_handshake(_host + ':' + _port, "/", [this](error_code ec) {
                        if (ec) return handle(ec);
                        _status = Status::CONNECTED;
                        _buffer.clear();
                        do_step_machine();
                    });
                });
        }
    
        void do_read() {
            _ws.async_read(_buffer, [this](error_code ec, size_t) {
                if (ec) return handle(ec);
    
                if (_ws.is_message_done()) {
                    callback(beast::buffers_to_string(_buffer.cdata()));
                    _buffer.clear();
                    do_step_machine();
                } else {
                    do_read();
                }
            });
        }
    };
    
    void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
    
    int main() {
        {
            BackgroundWs client("localhost", 8989);
            boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
    
            client.Start();
    
            std::string input;
            while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
                if (input == "Stop") break;
    
            // client.Stop(); // or just rely on destructor
        } // destructor
    
        std::cout << "Press Enter to quit... ";
        std::cin.ignore(1024, '\n');
    }
    

    Again, same behaviour:

    enter image description here