I`m trying to use a web socket client that connects to a server using Boost library. The situation is that the server sometimes sends pre-determined amounts of JSON messages, but sometimes more.
From stack overflow I have a solution posted by @sehe, which can be found here. This works well for me if I know for sure the amount of messages sent back is 1,2,3, etc.
However it does not work well if:
I have done a little digging and tested the async example client from the Boost website. It works "well", for 1 message. Using that example inside a thread or timer will trigger the assert from Boost.
The ideal solution for me would be what @sehe posted, short, simple; but it should read "all" the messages sent back. I realise this can be done only if you "know" when the message stream "ends", but with my lack of experience in using Boost and web sockets in C++ I am lost.
Please advise what would be the solution for this purpose. To re-iterate:
Many thanks
In response to the comments/chat I have cooked up¹ an example of a straight-forward translation of the example from e.g. https://github.com/aslushnikov/getting-started-with-cdp#targets--sessions into C++ using Beast.
Note that it uses the command IDs to correlate responses to requests. Note as well that these are session-specific, so if you must support multiple sessions, you will need to account for that.
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/signals2.hpp>
#include <iostream>
#include <deque>
#include <ranges>
#include <boost/json.hpp>
//#include <boost/json/src.hpp> // for header-only
namespace json = boost::json;
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace r = std::ranges;
static std::ostream debug(/*nullptr*/ std::cerr.rdbuf());
using namespace std::chrono_literals;
using boost::signals2::scoped_connection;
using boost::system::error_code;
using net::ip::tcp;
// Sends a WebSocket message and prints the response
class CWebSocket_Sync {
websocket::stream<tcp::socket> ws_;
public:
using executor_type = net::any_io_executor;
executor_type get_executor() { return ws_.get_executor(); }
// Resolver and socket require an io_context
explicit CWebSocket_Sync(executor_type ex) : ws_(make_strand(ex)) {}
// call backs are on the strand, not on the main thread
boost::signals2::signal<void(json::object const&)> onMessage;
// public functions not assumed to be on the strand
void Connect(std::string const& host, std::string const& port, std::string const& path)
{
post(get_executor(), [=, this] {
tcp::resolver resolver_(get_executor());
// TODO async_connect prevents potential blocking wait
// TODO async_handshake (idem)
auto ep = net::connect(ws_.next_layer(), //
resolver_.resolve(host, port));
ws_.handshake(host + ':' + std::to_string(ep.port()), path);
do_receive_loop();
});
}
void ServerCommand(json::object const& cmd)
{
post(get_executor(), [text = serialize(cmd), this] {
outbox_.push_back(text);
if (outbox_.size() == 1) // not already sending?
do_send_loop();
});
}
void CloseConnection() {
post(get_executor(), [this] {
ws_.next_layer().cancel();
ws_.async_close(websocket::close_code::normal, [](error_code ec) {
debug << "CloseConnection (" << ec.message() << ")" << std::endl;
});
});
}
private:
// do_XXXX functions assumed to be on the strand
beast::flat_buffer inbox_;
void do_receive_loop() {
debug << "do_receive_loop..." << std::endl;
ws_.async_read(inbox_, [this](error_code ec, size_t n) {
debug << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
auto text = inbox_.cdata();
auto parsed = json::parse(
{buffer_cast<char const*>(text), text.size()}, ec);
inbox_.clear();
if (!ec) {
assert(parsed.is_object());
onMessage(parsed.as_object()); // exceptions will blow up
do_receive_loop();
} else {
debug << "Ignore failed parse (" << ec.message() << ")" << std::endl;
}
}
});
}
std::deque<std::string> outbox_;
void do_send_loop() {
debug << "do_send_loop " << outbox_.size() << std::endl;
if (outbox_.empty())
return;
ws_.async_write( //
net::buffer(outbox_.front()), [this](error_code ec, size_t n) {
debug << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
outbox_.pop_front();
do_send_loop();
}
});
}
};
int main()
{
net::thread_pool ioc(1);
CWebSocket_Sync client(ioc.get_executor());
client.Connect("localhost", "9222", "/devtools/browser/bb8efece-b445-42d0-a4cc-349fccd8514d");
auto trace = client.onMessage.connect([&](json::object const& obj) {
debug << "Received " << obj << std::endl;
});
unsigned id = 1; // TODO make per session
scoped_connection sub = client.onMessage.connect([&](json::object const& obj) {
if ((obj.contains("id") && obj.at("id") == 1)) {
auto& infos = obj.at("result").at("targetInfos").as_array();
if (auto pageTarget = r::find_if(infos,
[](auto& info) { return info.at("type") == "page"; })) //
{
std::cout << "pageTarget " << *pageTarget << std::endl;
sub = client.onMessage.connect([&](json::object const& obj) {
// idea:
// if(obj.contains("method") && obj.at("method") == "Target.attachedToTarget"))
if (obj.contains("id") && obj.at("id") == 2) {
auto sessionId = value_to<std::string>(obj.at("result").at("sessionId"));
std::cout << "sessionId: " << sessionId << std::endl;
sub.release(); // stop expecting a specific response
client.ServerCommand({
{"sessionId", sessionId},
{"id", 1}, // IDs are independent between sessions
{"method", "Page.navigate"},
{"params", json::object{
{"url", "https://stackoverflow.com/q/70768742/85371"},
}},
});
}
});
client.ServerCommand(
{{"id", id++},
{"method", "Target.attachToTarget"},
{
"params",
json::object{
{"targetId", pageTarget->at("targetId")},
{"flatten", true},
},
}});
}
}
});
client.ServerCommand({
{"id", id++},
{"method", "Target.getTargets"},
});
std::this_thread::sleep_for(5s);
client.CloseConnection();
ioc.join();
}
When testing (I hardcoded the websocket URL for now);
The complete output is:
do_receive_loop...
do_send_loop 1
Sent 37 bytes (Success)
do_send_loop 0
Received 10138 bytes (Success)
Received {"id":1,"result":{"targetInfos":[{"targetId":"53AC5A92902F306C626CF3B3A2BB1878","type":"page","title":"Google","url":"https://www.google.com/","attached":false,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"},{"targetId":"D945FE9AC3EBF060805A90097DF2D7EF","type":"page","title":"(1) WhatsApp","url":"https://web.whatsapp.com/","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"6DBC2EDCADF891A4A68FA9A878AAA574","type":"page","title":"aslushnikov/getting-started-with-cdp: Getting Started With Chrome DevTools Protocol","url":"https://github.com/aslushnikov/getting-started-with-cdp#targets--sessions","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"35BE8DA1EE5A0F51EDEF9AA71738968C","type":"background_page","title":"Gnome-shell-integratie","url":"chrome-extension://gphhapmejobijbbhgpjhcjognlahblep/extension.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"477A0D3805F436D95C9D6DC0760862C1","type":"background_page","title":"uBlock Origin","url":"chrome-extension://cjpalhdlnbpafiamejdnhcphjbkeiagm/background.html","attached":false,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"},{"targetId":"B1371BC4FA5117900C2ABF28C69E3098","type":"page","title":"On Software and Languages: Holy cow, I wrote a book!","url":"http://ib-krajewski.blogspot.com/2019/02/holy-cow-i-wrote-book.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"1F3A58D579C18DDD819EF46EBBB0AD4C","type":"page","title":"c++ - Boost Beast Websocket - Send and Read until no more data - Stack Overflow","url":"https://stackoverflow.com/questions/70768742/boost-beast-websocket-send-and-read-until-no-more-data","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"A89EBECFD804FD9D4FF899274CB1E4C5","type":"background_page","title":"Dark Reader","url":"chrome-extension://eimadpbcbfnmbkopoojfekhnkhdbieeh/background/index.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"},{"targetId":"9612E681CCF4E4E47D400B0849FA05E6","type":"background_page","title":"uBlock Origin","url":"chrome-extension://cjpalhdlnbpafiamejdnhcphjbkeiagm/background.html","attached":false,"canAccessOpener":false,"browserContextId":"9806733E4CD80888448B20DA32A515F6"}]}}
pageTarget {"targetId":"53AC5A92902F306C626CF3B3A2BB1878","type":"page","title":"Google","url":"https://www.google.com/","attached":false,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"}
do_receive_loop...
do_send_loop 1
Sent 113 bytes (Success)
do_send_loop 0
Received 339 bytes (Success)
Received {"method":"Target.attachedToTarget","params":{"sessionId":"29AD9FFD2EAE70BAF10076A9E05DD000","targetInfo":{"targetId":"53AC5A92902F306C626CF3B3A2BB1878","type":"page","title":"Google","url":"https://www.google.com/","attached":true,"canAccessOpener":false,"browserContextId":"15E97D88D0D1417314CBCB24D4A0FABA"},"waitingForDebugger":false}}
do_receive_loop...
Received 66 bytes (Success)
Received {"id":2,"result":{"sessionId":"29AD9FFD2EAE70BAF10076A9E05DD000"}}
sessionId: 29AD9FFD2EAE70BAF10076A9E05DD000
do_receive_loop...
do_send_loop 1
Sent 142 bytes (Success)
do_send_loop 0
Received 157 bytes (Success)
Received {"id":1,"result":{"frameId":"53AC5A92902F306C626CF3B3A2BB1878","loaderId":"A3680FBE84DEBDA3444FFA6CD7C5A5A5"},"sessionId":"29AD9FFD2EAE70BAF10076A9E05DD000"}
do_receive_loop...
Received 0 bytes (Operation canceled)
CloseConnection (Operation canceled)
I created a Request
method that returns a future like the nodejs example:
std::future<json::object> Request(json::object const& cmd)
{
auto fut = Expect([id = msgId(cmd)](json::object const& resp) {
return msgId(resp) == id;
});
Send(cmd);
return fut;
}
Note how it got a bit more elegant with the addition of the msgId
extraction helper:
static json::object msgId(json::object const& message) {
return filtered(message, {"id", "sessionId"}); // TODO more ?
};
This neatly facilitates multi-session responses where the "id"
need not be unique across different "sessionId"
s. The condition stays a simple if (msgId(msg) == id)
.
It also uses Send
and Expect
as building blocks:
void Send(json::object const& cmd)
{
post(get_executor(), [text = serialize(cmd), this] {
outbox_.push_back(text);
if (outbox_.size() == 1) // not already sending?
do_send_loop();
});
}
template <typename F> std::future<json::object> Expect(F&& pred)
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
state->_subscription = onMessage.connect( //
[=, pred = std::forward<F>(pred)](json::object const& msg) {
if (pred(msg)) {
state->_promise.set_value(msg);
state->_subscription.disconnect();
}
});
return state->_promise.get_future();
}
Now the main program can be written less backwards:
auto targets = client.Request({
{"id", id++},
{"method", "Target.getTargets"},
}).get().at("result").at("targetInfos");
auto pageTarget = r::find_if(targets.as_array(), [](auto& info) {
return info.at("type") == "page";
});
if (!pageTarget) {
std::cerr << "No page target\n";
return 0;
}
std::cout << "pageTarget " << *pageTarget << std::endl;
auto sessionId = client.Request(
{{"id", id++},
{"method", "Target.attachToTarget"},
{"params", json::object{
{"targetId", pageTarget->at("targetId")},
{"flatten", true},
},
}})
.get().at("result").at("sessionId");
std::cout << "sessionId: " << sessionId << std::endl;
auto response = client.Request({
{"sessionId", sessionId},
{"id", 1}, // IDs are independent between sessions
{"method", "Page.navigate"},
{"params", json::object{
{"url", "https://stackoverflow.com/q/70768742/85371"},
}},
}) .get();
std::cout << "Navigation response: " << response << std::endl;
Which leads to output like:
-- trace {"id":1,"result":{"targetInfos":[{"targetId":"35BE8DA1EE5A0F51EDEF9AA71738968C","type":"background_page","title":"Gnom....
pageTarget {"targetId":"1F3A58D579C18DDD819EF46EBBB0AD4C","type":"page","title":"c++ - Boost Beast Websocket - Send and Read unt....
-- trace {"method":"Target.attachedToTarget","params":{"sessionId":"58931793102C2A5576E4D5D6CDC3D601","targetInfo":{"targetId":....
-- trace {"id":2,"result":{"sessionId":"58931793102C2A5576E4D5D6CDC3D601"}}
sessionId: "58931793102C2A5576E4D5D6CDC3D601"
-- trace {"id":1,"result":{"frameId":"1F3A58D579C18DDD819EF46EBBB0AD4C","loaderId":"9E70C5AAF0B5A503BA2770BB73A4FEC3"},"session....
Navigation response: {"id":1,"result":{"frameId":"1F3A58D579C18DDD819EF46EBBB0AD4C","loaderId":"9E70C5AAF0B5A503BA2770BB73A4FEC3....
I would have one last question if you do not mind? Can I use somehow
std::future<T>::wait_until
so I can find out if the page was loaded completely? (for example checking forNetwork.loadingFinished
object)?
Sure, just code it:
{
std::promise<void> promise;
scoped_connection sub =
client.onMessage.connect([&](json::object const& msg) {
if (auto m = msg.if_contains("method"); *m == "Network.loadingFinished")
promise.set_value();
});
auto loadingFinished = promise.get_future();
loadingFinished.wait(); // OR:
loadingFinished.wait_for(5s); // OR:
loadingFinished.wait_until(std::chrono::steady_clock::now() + 1min);
}
To also have the message:
{
std::promise<json::object> promise;
scoped_connection sub =
client.onMessage.connect([&](json::object const& msg) {
if (auto m = msg.if_contains("method"); *m == "Network.loadingFinished")
promise.set_value(msg);
});
auto message = promise.get_future().get();;
}
Of course you could/should consider encapsulating in a class method again.
UPDATE - I have since refactored the original futures code to use these as building blocks (
Expect
,Send
together makeRequest
)
Now you can just
auto loadingFinished = client.Expect(isMethod("Network.loadingFinished")).get();
std::cout << "Got: " << loadingFinished << "\n";
Of course, assuming a tiny helper like:
auto isMethod = [](auto value) {
return [value](json::object const& msg) {
auto m = msg.if_contains("method");
return m && *m == value;
};
};
As a bonus, to monitor continuously for specific messages:
enum ActionResult { ContinueMonitoring, StopMonitoring };
template <typename A, typename F>
auto Monitor(A action, F&& filter = [](auto&&) noexcept { return true; })
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
auto stop = [state] { state->_subscription.disconnect(); };
state->_subscription = onMessage.connect( //
[=, filter = std::forward<F>(filter)](json::object const& msg) {
if (filter(msg) && StopMonitoring == action(msg))
stop();
});
return stop; // gives the caller an "external" way to stop the monitor
}
A contrived example of usage:
// monitor until 3 messages having an id divisable by 7 have been received
std::atomic_int count = 0;
auto stopMonitor = client.Monitor(
[&count](json::object const& msg) {
std::cout << "Divisable by 7: " << msg << "\n";
return ++count >= 3 ? CDPClient::StopMonitoring
: CDPClient::ContinueMonitoring;
},
[](json::object const& msg) {
auto id = msg.if_contains("id");
return id && (0 == id->as_int64() % 7);
});
std::this_thread::sleep_for(5s);
stopMonitor(); // even if 3 messages had not been reached, stop the monitor
std::cout << count << " messages having an id divisable by 7 had been received in 5s\n";
Sadly Exceeds Compiler Explorer Limits:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/json.hpp>
//#include <boost/json/src.hpp> // for header-only
#include <boost/signals2.hpp>
#include <deque>
#include <iostream>
#include <ranges>
namespace json = boost::json;
namespace net = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace r = std::ranges;
static std::ostream debug(nullptr); // std::cerr.rdbuf()
static const auto filtered(json::object const& obj,
std::initializer_list<json::string_view> props)
{
boost::json::object result;
for (auto prop : props)
if (auto const* v = obj.if_contains(prop))
result[prop] = *v;
return result;
}
using namespace std::chrono_literals;
using boost::signals2::scoped_connection;
using boost::system::error_code;
using net::ip::tcp;
// Sends a WebSocket message and prints the response
class CDPClient {
websocket::stream<tcp::socket> ws_;
public:
using executor_type = net::any_io_executor;
executor_type get_executor() { return ws_.get_executor(); }
// Resolver and socket require an io_context
explicit CDPClient(executor_type ex) : ws_(make_strand(ex)) {}
// call backs are on the strand, not on the main thread
boost::signals2::signal<void(json::object const&)> onMessage;
// public functions not assumed to be on the strand
void Connect(std::string const& host, std::string const& port, std::string const& path)
{
post(get_executor(), [=, this] {
tcp::resolver resolver_(get_executor());
// TODO async_connect prevents potential blocking wait
// TODO async_handshake (idem)
auto ep = net::connect(ws_.next_layer(), //
resolver_.resolve(host, port));
ws_.handshake(host + ':' + std::to_string(ep.port()), path);
do_receive_loop();
});
}
void Send(json::object const& cmd)
{
post(get_executor(), [text = serialize(cmd), this] {
outbox_.push_back(text);
if (outbox_.size() == 1) // not already sending?
do_send_loop();
});
}
template <typename F> std::future<json::object> Expect(F&& pred)
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
state->_subscription = onMessage.connect( //
[=, pred = std::forward<F>(pred)](json::object const& msg) {
if (pred(msg)) {
state->_promise.set_value(msg);
state->_subscription.disconnect();
}
});
return state->_promise.get_future();
}
static json::object msgId(json::object const& message) {
return filtered(message, {"id", "sessionId"}); // TODO more ?
};
std::future<json::object> Request(json::object const& cmd)
{
auto fut = Expect([id = msgId(cmd)](json::object const& resp) {
return msgId(resp) == id;
});
Send(cmd);
return fut;
}
enum ActionResult { ContinueMonitoring, StopMonitoring };
template <typename A, typename F>
auto Monitor(A action, F&& filter = [](auto&&) noexcept { return true; })
{
struct State {
boost::signals2::connection _subscription;
std::promise<json::object> _promise;
};
auto state = std::make_shared<State>();
auto stop = [state] { state->_subscription.disconnect(); };
state->_subscription = onMessage.connect( //
[=, filter = std::forward<F>(filter)](json::object const& msg) {
if (filter(msg) && StopMonitoring == action(msg))
stop();
});
return stop; // gives the caller an "external" way to stop the monitor
}
void CloseConnection() {
post(get_executor(), [this] {
ws_.next_layer().cancel();
ws_.async_close( //
websocket::close_code::normal, [this](error_code ec) {
debug << "CloseConnection (" << ec.message() << ")" << std::endl;
onMessage.disconnect_all_slots();
});
});
}
private:
// do_XXXX functions assumed to be on the strand
beast::flat_buffer inbox_;
void do_receive_loop() {
debug << "do_receive_loop..." << std::endl;
ws_.async_read(inbox_, [this](error_code ec, size_t n) {
debug << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
auto text = inbox_.cdata();
auto parsed = json::parse(
{buffer_cast<char const*>(text), text.size()}, ec);
inbox_.clear();
if (!ec) {
assert(parsed.is_object());
onMessage(parsed.as_object()); // exceptions will blow up
do_receive_loop();
} else {
debug << "Ignore failed parse (" << ec.message() << ")" << std::endl;
}
}
});
}
std::deque<std::string> outbox_;
void do_send_loop() {
debug << "do_send_loop " << outbox_.size() << std::endl;
if (outbox_.empty())
return;
ws_.async_write( //
net::buffer(outbox_.front()), [this](error_code ec, size_t n) {
debug << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
outbox_.pop_front();
do_send_loop();
}
});
}
};
int main()
{
net::thread_pool ioc(1);
CDPClient client(ioc.get_executor());
client.Connect("localhost", "9222", "/devtools/browser/bb8efece-b445-42d0-a4cc-349fccd8514d");
auto trace = client.onMessage.connect([&](json::object const& obj) {
std::cerr << " -- trace " << obj << std::endl;
});
unsigned id = 1; // TODO make per session
auto targets = client.Request({
{"id", id++},
{"method", "Target.getTargets"},
}).get().at("result").at("targetInfos");
auto pageTarget = r::find_if(targets.as_array(), [](auto& info) {
return info.at("type") == "page";
});
if (!pageTarget) {
std::cerr << "No page target\n";
return 0;
}
std::cout << "pageTarget " << *pageTarget << std::endl;
auto sessionId = client.Request(
{{"id", id++},
{"method", "Target.attachToTarget"},
{"params", json::object{
{"targetId", pageTarget->at("targetId")},
{"flatten", true},
},
}})
.get().at("result").at("sessionId");
std::cout << "sessionId: " << sessionId << std::endl;
auto response = client.Request({
{"sessionId", sessionId},
{"id", 1}, // IDs are independent between sessions
{"method", "Page.navigate"},
{"params", json::object{
{"url", "https://stackoverflow.com/q/70768742/85371"},
}},
}) .get();
std::cout << "Navigation response: " << response << std::endl;
auto isMethod = [](auto value) {
return [value](json::object const& msg) {
auto m = msg.if_contains("method");
return m && *m == value;
};
};
auto loadingFinished = client.Expect(isMethod("Network.loadingFinished")).get();
std::cout << "Got: " << loadingFinished << "\n";
// monitor until 3 messages having an id divisable by 7 have been received
std::atomic_int count = 0;
auto stopMonitor = client.Monitor(
[&count](json::object const& msg) {
std::cout << "Divisable by 7: " << msg << "\n";
return ++count >= 3 ? CDPClient::StopMonitoring
: CDPClient::ContinueMonitoring;
},
[](json::object const& msg) {
auto id = msg.if_contains("id");
return id && (0 == id->as_int64() % 7);
});
std::this_thread::sleep_for(5s);
stopMonitor(); // even if 3 messages had not been reached, stop the monitor
std::cout << count << " messages having an id divisable by 7 had been received in 5s\n";
client.CloseConnection();
ioc.join();
}
¹ besides some dinner