Maybe somebody can provide simple example(or references) on how to setup client and server using websocket from boost/beast library? I need an example on how to handle input message on server and respond/not to it(such that client wont crash), how to send some data to specific "subscribed" connections and how to handle it on client side. I found this example, but it doesn't work as intended: Client side:
#include <boost/lockfree/queue.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
// Thread-safe queue class
template <typename T>
class ThreadSafeQueue {
public:
void push(const T& value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
cond_var_.notify_one();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cond_var_.wait(lock, [this] { return !queue_.empty(); });
value = queue_.front();
queue_.pop();
}
private:
std::queue<T> queue_;
mutable std::mutex mtx_;
std::condition_variable cond_var_;
};
class Singleton {
public:
ThreadSafeQueue<json>* data_queue;
websocket::stream<tcp::socket>* ws;
Singleton(
ThreadSafeQueue<json>* data_queue_,
websocket::stream<tcp::socket>* ws_
) :
data_queue(data_queue_),
ws(ws_)
{}
};
void readData(Singleton& data) {
try {
beast::flat_buffer buffer;
while (true) {
data.ws->read(buffer);
auto received_message = beast::buffers_to_string(buffer.data());
json received_json = json::parse(received_message);
data.data_queue->push(received_json);
std::cout << "Received from server: " << received_message << "\n";
buffer.consume(buffer.size());
}
}
catch (std::exception e) {
std::cout << "Error occured in reader: " << e.what() << "\n";
}
}
void sendData(Singleton& data) {
try {
std::string input;
while (true) {
std::cout << "To send: ";
std::getline(std::cin, input);
if (input == "stop") break;
json message = {
{input[0] == 'h' ? "echo" : "null", input}
};
data.ws->write(asio::buffer(message.dump()));
}
}
catch (std::exception e) {
std::cout << "Error occured in sender: " << e.what() << "\n";
}
}
int main() {
std::string const host = "127.0.0.1";
std::string const port = "9002";
asio::io_context ioc;
tcp::resolver resolver(ioc);
websocket::stream<tcp::socket> ws(ioc);
auto const results = resolver.resolve(host, port);
asio::connect(ws.next_layer(), results);
ws.handshake(host, "/");
ThreadSafeQueue<json> data_queue;
Singleton single(&data_queue, &ws);
std::thread reader(readData, std::ref(single));
std::thread sender(sendData, std::ref(single));
reader.join();
sender.join();
return 0;
}
Server side:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
WebSocketSession(tcp::socket socket)
: ws_(std::move(socket)) {}
void run() {
ws_.async_accept(
beast::bind_front_handler(
&WebSocketSession::on_accept,
shared_from_this()
)
);
}
private:
void on_accept(beast::error_code ec) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
return;
}
do_read();
}
void do_read() {
ws_.async_read(
buffer_,
beast::bind_front_handler(
&WebSocketSession::on_read,
shared_from_this()
)
);
}
void on_read(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
if (ec == websocket::error::closed) {
return;
}
std::cerr << "Read error: " << ec.message() << std::endl;
return;
}
try {
auto received_message = beast::buffers_to_string(buffer_.data());
json received_json = json::parse(received_message);
std::string response_message;
if (received_json.contains("echo")) {
json response_json = {
{"type", "response"},
{"original", received_json}
};
response_message = response_json.dump();
}
else {
buffer_.consume(buffer_.size());
do_read();
return;
}
response_ptr_ = std::make_shared<std::string>(std::move(response_message));
ws_.text(ws_.got_text());
ws_.async_write(
asio::buffer(*response_ptr_),
beast::bind_front_handler(
&WebSocketSession::on_write,
shared_from_this()));
}
catch (const std::exception& e) {
std::cerr << "Processing error: " << e.what() << std::endl;
}
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
std::cerr << "Write error: " << ec.message() << std::endl;
return;
}
buffer_.consume(buffer_.size());
response_ptr_.reset();
do_read();
}
websocket::stream<tcp::socket> ws_;
beast::flat_buffer buffer_;
std::shared_ptr<std::string> response_ptr_;
};
class WebSocketServer {
public:
WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint)
: acceptor_(ioc, endpoint) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
beast::bind_front_handler(
&WebSocketServer::on_accept,
this
)
);
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
}
else {
std::make_shared<WebSocketSession>(std::move(socket))->run();
}
do_accept();
}
tcp::acceptor acceptor_;
};
int main() {
std::cout << "Web server is running:\n";
try {
asio::io_context ioc;
tcp::endpoint endpoint(tcp::v4(), 9002);
WebSocketServer server(ioc, endpoint);
ioc.run();
}
catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
Looks to me that client doesn't crash, but instead simply only sends one message correctly https://i.imgur.com/0zrvFBP.jpeg
Running the code under ASan/UBSan reveals no glaring issues (good).
For the client to be full-duplex you will need threads OR async. I suggest async because it fits the library choice.
Be careful about consuming from dynamic buffers: don't just consume the entire buffer unless it was all part of your current message.
You need to queue outgoing writes, because you cannot have overlapping writes and apparently your interface is multi-threaded.
On the server-side you made it simple by making sure that you don't read the next request until the response has been sent. I'd note that
response_ptr_
is redundant because it is already part ofshared_from_this()
object. So, I'd just make itstd::string response_;
.
Server notes:
ws_.got_text()
since you're absolutely guaranteeing json textCombining I'd suggest:
File client.cpp
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
#include <nlohmann/json.hpp>
#include <queue>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
// Thread-safe queue class
template <typename T> class ThreadSafeQueue {
public:
void push(T const& value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
cond_var_.notify_one();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cond_var_.wait(lock, [this] { return !queue_.empty(); });
value = queue_.front();
queue_.pop();
}
private:
std::queue<T> queue_;
mutable std::mutex mtx_;
std::condition_variable cond_var_;
};
using MsgQueue = ThreadSafeQueue<json>;
struct WSClient {
WSClient(std::string const& host, std::string const& port, MsgQueue& inbox) : inbox_(inbox) {
// connect, handshake
connect(ws.next_layer(), tcp::resolver (ioc).resolve(host, port));
ws.handshake(host, "/");
// start async read chain
do_read_loop();
};
void stop() {
beast::get_lowest_layer(ws).cancel();
ioc.join();
}
void send(json&& message) {
asio::post(ws.get_executor(), [this, m = std::move(message)]() mutable { //
outbox_.push_back(std::move(m).dump());
if (outbox_.size() == 1)
do_write_loop(); // start the pump
});
}
private:
asio::thread_pool ioc{1}; // single thread should suffice
beast::flat_buffer incoming_;
MsgQueue& inbox_;
std::deque<std::string> outbox_; // serialized form for buffer stability
websocket::stream<tcp::socket> ws{ioc};
void do_read_loop() {
ws.async_read(incoming_, [this](beast::error_code ec, size_t n) {
std::cout << "Received " << n << " bytes (" << ec.message() << ")" << std::endl;
if (ec)
return;
auto received_message = beast::buffers_to_string(incoming_.data()).substr(0, n);
json received_json = json::parse(received_message);
inbox_.push(received_json);
std::cout << "Received from server: " << received_message << std::endl;
incoming_.consume(n);
do_read_loop();
});
}
void do_write_loop() {
if (outbox_.empty())
return;
ws.async_write(asio::buffer(outbox_.front()), [this](beast::error_code ec, size_t n) {
std::cout << "Sent " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec) {
outbox_.pop_front();
do_write_loop();
}
});
}
};
int main() try {
MsgQueue received;
WSClient client("127.0.0.1", "9002", received);
std::cout << "To send: ";
for (std::string input; std::getline(std::cin, input); std::cout << "To send: ") {
if (input == "stop")
break;
if (input.empty())
continue;
client.send({{input.starts_with('h') ? "echo" : "null", std::move(input)}});
for (json msg; received.try_pop(msg);)
std::cout << " - Processing queued: " << msg << std::endl;
}
client.stop();
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
File server.cpp
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <iostream>
#include <nlohmann/json.hpp>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
WebSocketSession(tcp::socket socket) : ws_(std::move(socket)) {}
void run() {
ws_.async_accept(beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
}
~WebSocketSession() { std::cerr << "Session " << peer_ << " closed" << std::endl; }
private:
void on_accept(beast::error_code ec) {
std::cerr << "Accept: " << ec.message() << " for " << peer_ << std::endl;
if (!ec)
do_read_loop();
}
void do_read_loop() {
ws_.async_read(buffer_, beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, size_t n) {
if (ec) {
if (ec == websocket::error::closed) {
return;
}
std::cerr << "Read error: " << ec.message() << std::endl;
return;
}
try {
response_.clear();
auto it = buffers_begin(buffer_.data());
json msg = json::parse(it, it + n);
buffer_.consume(n);
if (msg.contains("echo"))
response_ = json{{"type", "response"}, {"original", std::move(msg)}}.dump();
if (response_.empty()) {
do_read_loop();
} else {
ws_.text(true);
ws_.async_write(asio::buffer(response_),
beast::bind_front_handler(&WebSocketSession::on_write, shared_from_this()));
}
} catch (std::exception const& e) {
std::cerr << "Processing error: " << e.what() << std::endl;
}
}
void on_write(beast::error_code ec, size_t n) {
std::cerr << "Write: " << n << " bytes (" << ec.message() << ")" << std::endl;
if (!ec)
do_read_loop();
}
websocket::stream<tcp::socket> ws_;
beast::flat_buffer buffer_;
std::string response_;
tcp::endpoint peer_ = ws_.next_layer().remote_endpoint();
};
class WebSocketServer {
public:
WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint) : acceptor_(ioc, endpoint) { do_accept(); }
private:
void do_accept() { acceptor_.async_accept(beast::bind_front_handler(&WebSocketServer::on_accept, this)); }
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
} else {
std::make_shared<WebSocketSession>(std::move(socket))->run();
}
do_accept();
}
tcp::acceptor acceptor_;
};
int main() {
std::cout << "Web server is running:\n";
try {
asio::io_context ioc;
tcp::endpoint endpoint(tcp::v4(), 9002);
WebSocketServer server(ioc, endpoint);
ioc.run();
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
}
With customary demos: