tl;dr: Is there a way to close a WebSocket that's currently doing (sync) read() operation, if server sends nothing for some time?
I wanted to make a simple WebSocket client with Boost::beast. When I realized that read() is a blocking operation, and that there's no way to tell if there's a message coming - I created a sleeper thread. All the thread does is read() and I can afford to have it blocked if no data is coming.
I want it to be able to close the connection so from non-blocked thread I shoot a websocket::close(). This causes read() to throw a BOOST_ASSERT() at me:
Assertion failed: ! impl.wr_close
How can I close the connection when (sync) read() is ongoing?
Code for reproduction of my scenario:
#include <string>
#include <thread>
#include <chrono>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
using namespace std::chrono_literals;
class HandlerThread {
enum class Status {
UNINITIALIZED,
DISCONNECTED,
CONNECTED,
READING,
};
const std::string _host;
const std::string _port;
std::string _resolvedAddress;
boost::asio::io_context _ioc;
boost::asio::ip::tcp::resolver _resolver;
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> _websocket;
boost::beast::flat_buffer _buffer;
bool isRunning = true;
Status _connectionStatus = Status::UNINITIALIZED;
public:
HandlerThread(const std::string& host, const uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port))
, _ioc()
, _resolver(_ioc)
, _websocket(_ioc) {}
void Run() {
// isRunning is also useless, due to blocking boost::beast operations.
while(isRunning) {
switch (_connectionStatus) {
case Status::UNINITIALIZED:
case Status::DISCONNECTED:
if (!connect()) {
_connectionStatus = Status::DISCONNECTED;
break;
}
case Status::CONNECTED:
case Status::READING:
if (!read()) {
_connectionStatus = Status::DISCONNECTED;
break;
}
}
}
}
void Close()
{
isRunning = false;
_websocket.close(boost::beast::websocket::close_code::normal);
}
private:
bool connect()
{
// All here is copy-paste from the examples.
boost::system::error_code errorCode;
// Look up the domain name
auto const results = _resolver.resolve(_host, _port, errorCode);
if (errorCode) return false;
// Make the connection on the IP address we get from a lookup
auto ep = boost::asio::connect(_websocket.next_layer(), results, errorCode);
if (errorCode) return false;
_resolvedAddress = _host + ':' + std::to_string(ep.port());
_websocket.set_option(boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::request_type& req)
{
req.set(boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-coro");
}));
boost::beast::websocket::response_type res;
_websocket.handshake(res, _resolvedAddress, "/", errorCode);
if (errorCode) return false;
_connectionStatus = Status::CONNECTED;
return true;
}
bool read()
{
boost::system::error_code errorCode;
_websocket.read(_buffer, errorCode);
if (errorCode) return false;
if (_websocket.is_message_done()) {
_connectionStatus = Status::CONNECTED;
// notifyRead(_buffer);
_buffer.clear();
} else {
_connectionStatus = Status::READING;
}
return true;
}
};
int main() {
HandlerThread handler("localhost", 8080);
std::thread([&]{
handler.Run();
}).detach(); // bye!
std::this_thread::sleep_for(3s);
handler.Close(); // Bad idea...
return 0;
}
There is no such thing. You might be able to force something at the TCP stack (so, operating system, usually) level. E.g. disabling the network interface involved.
Note that most synchronous code can be trivially be transformed into asynchronous code with the exact same blocking semantics using asio::use_future
. That means you can use async deadlines. And those are supported by beast out of the box (basic your websocket on beast::tcp_stream
instead of asio::ip::tcp::socket
)
To the added code
Reduced the code removing unneeded bits and adding some fixes and demonstration handler notification so we can test the functioning:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
class WsConnect {
using tcp = net::ip::tcp;
const std::string _host, _port;
net::io_context _ioc;
beast::flat_buffer _buffer;
websocket::stream<tcp::socket> _ws{_ioc};
enum class Status { DISCONNECTED, CONNECTED } _status{Status::DISCONNECTED};
std::atomic_bool _running{true}; // SEHE
public:
boost::signals2::signal<void(std::string)> callback; // SEHE
WsConnect(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Run() {
while (_running)
try {
switch (_status) {
case Status::DISCONNECTED: do_connect(); [[fallthrough]];
case Status::CONNECTED: do_read();
}
} catch (boost::system::system_error const& se) {
// se.code() is the error_code
_status = Status::DISCONNECTED;
std::this_thread::sleep_for(50ms); // SEHE avoid tight reconnect loop
}
}
void Close() {
_running = false;
beast::error_code ec;
_ws.close(websocket::close_code::normal, ec);
}
private:
void do_connect() {
connect(beast::get_lowest_layer(_ws), tcp::resolver(_ioc).resolve(_host, _port));
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
_ws.handshake(_host + ':' + _port, "/");
_buffer.clear(); // SEHE this was missing
}
void do_read() {
do {
_ws.read(_buffer);
} while (!_ws.is_message_done());
callback(beast::buffers_to_string(_buffer.cdata())); // SEHE just for demo
_buffer.clear();
}
};
#include <iomanip>
#include <iostream>
void handle_message(std::string_view msg) { std::cout << "Handling " << quoted(msg) << std::endl; }
int main() {
WsConnect conn("localhost", 8989);
std::thread([&] { conn.Run(); }).detach(); // bye!
boost::signals2::scoped_connection subscription = conn.callback.connect(handle_message);
std::this_thread::sleep_for(3s);
conn.Close(); // Bad idea...
}
Exercising it for demonstration:
I'd argue there are three design issues:
HandlerThread
is not a threaddetach()
-ing threads makes them ungovernable by definition.Here's a fix, naively only time-limiting operations like in your example:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
struct WsConnect {
boost::signals2::signal<void(std::string)> callback;
WsConnect(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Run(std::chrono::steady_clock::duration d) {
_ws.next_layer().expires_after(d);
for (;; std::this_thread::sleep_for(50ms))
try {
do_connect();
for (;;)
do_read();
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
if (se.code() == beast::error::timeout)
break;
}
}
private:
using tcp = net::ip::tcp;
const std::string _host, _port;
beast::flat_buffer _buffer;
net::thread_pool _ioc{1};
websocket::stream<beast::tcp_stream> _ws{_ioc};
void do_connect() {
_ws.next_layer()
.async_connect(tcp::resolver(_ioc).resolve(_host, _port), net::use_future)
.get();
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
_ws.async_handshake(_host + ':' + _port, "/", net::use_future).get();
_buffer.clear();
}
void do_read() {
do
_ws.async_read(_buffer, net::use_future).get();
while (!_ws.is_message_done());
callback(beast::buffers_to_string(_buffer.cdata()));
_buffer.clear();
}
};
void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
int main() {
WsConnect conn("localhost", 8989);
boost::signals2::scoped_connection subscription = conn.callback.connect(handle_message);
conn.Run(3s);
} // close implied by destructors
Note how the code got simpler, shorter, and we even print error information. It's achieved by using use_future
together with beast::tcp_stream::expires_after
:
To allow for /externally triggered/ cancellation (instead of a fixed deadline), we can cheat a little by using 2 threads, so one can be "clogged" doing blocking waits on the futures:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
struct BackgroundWs {
boost::signals2::signal<void(std::string)> callback;
BackgroundWs(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Start() {
post(_ws.get_executor(), [this] { do_run(); });
}
void Stop() { do_stop().get(); }
~BackgroundWs() {
do_stop().wait(); // noexcept, please
}
private:
using tcp = net::ip::tcp;
const std::string _host, _port;
beast::flat_buffer _buffer;
net::thread_pool _ioc{2};
websocket::stream<tcp::socket> _ws{_ioc};
bool _stopped{false};
void do_run() {
for (; !_stopped; std::this_thread::sleep_for(50ms))
try {
do_connect();
for (;;)
do_read();
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
std::future<void> do_stop() {
return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
_stopped = true;
_ws.next_layer().cancel();
_ws.close(websocket::normal);
}));
}
void do_connect() {
async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port), net::use_future)
.get();
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
_ws.async_handshake(_host + ':' + _port, "/", net::use_future).get();
_buffer.clear();
}
void do_read() {
do
_ws.async_read(_buffer, net::use_future).get();
while (!_ws.is_message_done());
callback(beast::buffers_to_string(_buffer.cdata()));
_buffer.clear();
}
};
void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
int main() {
{
BackgroundWs client("localhost", 8989);
boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
client.Start();
std::string input;
while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
if (input == "Stop") break;
// client.Stop(); // or just rely on destructor
} // destructor
std::cout << "Press Enter to quit... ";
std::cin.ignore(1024, '\n');
}
Now, the thing runs, reconnecting happily until the user enters Stop
on the terminal:
In C++ using coro's you can have basically identical code being true async. This gets rid of the "cheating extra thread":
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
struct BackgroundWs {
boost::signals2::signal<void(std::string)> callback;
BackgroundWs(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Start() { net::co_spawn(_ws.get_executor(), do_run(), net::detached); }
void Stop() { do_stop().get(); }
~BackgroundWs() {
do_stop().wait(); // noexcept, please
}
private:
using tcp = net::ip::tcp;
const std::string _host, _port;
beast::flat_buffer _buffer;
net::thread_pool _ioc{1};
websocket::stream<tcp::socket> _ws{_ioc};
bool _stopped{false};
net::awaitable<void> do_run() {
for (; !_stopped; co_await async_sleep(50ms))
try {
co_await do_connect();
for (;;)
co_await do_read();
} catch (boost::system::system_error const& se) {
std::cerr << "Error: " << se.code().message() << std::endl;
}
}
net::awaitable<void> async_sleep(auto duration) {
co_await net::steady_timer(_ws.get_executor(), duration).async_wait(net::use_awaitable);
}
std::future<void> do_stop() {
return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
_stopped = true;
_ws.next_layer().cancel();
_ws.close(websocket::normal);
}));
}
net::awaitable<void> do_connect() {
co_await async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port),
net::use_awaitable);
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
}));
co_await _ws.async_handshake(_host + ':' + _port, "/", net::use_awaitable);
_buffer.clear();
}
net::awaitable<void> do_read() {
do
co_await _ws.async_read(_buffer, net::use_awaitable);
while (!_ws.is_message_done());
callback(beast::buffers_to_string(_buffer.cdata()));
_buffer.clear();
}
};
void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
int main() {
{
BackgroundWs client("localhost", 8989);
boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
client.Start();
std::string input;
while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
if (input == "Stop") break;
// client.Stop(); // or just rely on destructor
} // destructor
std::cout << "Press Enter to quit... ";
std::cin.ignore(1024, '\n');
}
This is conceptually identical, but more tedious as it requires explicit callback functions:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;
struct BackgroundWs {
boost::signals2::signal<void(std::string)> callback;
BackgroundWs(std::string host, uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port)) {}
void Start() {
net::post(_ws.get_executor(), [this] {
_stop_requested = false;
do_step_machine();
});
}
void Stop() { do_stop().get(); }
~BackgroundWs() {
do_stop().wait(); // noexcept, please
}
private:
using tcp = net::ip::tcp;
using error_code = boost::system::error_code;
const std::string _host, _port;
beast::flat_buffer _buffer;
net::thread_pool _ioc{1};
websocket::stream<tcp::socket> _ws{_ioc};
net::steady_timer _timer{_ws.get_executor()};
bool _stop_requested = false;
enum class Status { INIT, CONNECTED, DISCONNECTED, STOPPED } _status{};
void handle(error_code ec) {
std::cerr << "Error: " << ec.message() << std::endl;
if (ec.failed())
_status = _stop_requested ? Status::STOPPED : Status::DISCONNECTED;
do_step_machine();
}
void do_step_machine() {
switch(_status) {
case Status::INIT: return do_connect();
case Status::CONNECTED: return do_read();
case Status::DISCONNECTED: return do_reconnect_delay(50ms);
case Status::STOPPED: break;
};
}
void do_reconnect_delay(std::chrono::steady_clock::duration d) {
_timer.expires_after(d);
_timer.async_wait([this](error_code ec) {
if (ec) return handle(ec);
_status = Status::INIT;
do_step_machine();
});
}
std::future<void> do_stop() {
return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
_stop_requested = true;
_ws.next_layer().cancel();
_ws.close(websocket::normal);
}));
}
void do_connect() {
async_connect( //
_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port),
[this](error_code ec, tcp::endpoint /*ep*/) {
if (ec) return handle(ec);
_ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(beast::http::field::user_agent,
BOOST_BEAST_VERSION_STRING " WsConnect");
}));
_ws.async_handshake(_host + ':' + _port, "/", [this](error_code ec) {
if (ec) return handle(ec);
_status = Status::CONNECTED;
_buffer.clear();
do_step_machine();
});
});
}
void do_read() {
_ws.async_read(_buffer, [this](error_code ec, size_t) {
if (ec) return handle(ec);
if (_ws.is_message_done()) {
callback(beast::buffers_to_string(_buffer.cdata()));
_buffer.clear();
do_step_machine();
} else {
do_read();
}
});
}
};
void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }
int main() {
{
BackgroundWs client("localhost", 8989);
boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
client.Start();
std::string input;
while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
if (input == "Stop") break;
// client.Stop(); // or just rely on destructor
} // destructor
std::cout << "Press Enter to quit... ";
std::cin.ignore(1024, '\n');
}
Again, same behaviour: