c++concurrencyboost-beastboost-beast-websocket

Websocket boost/beast example with client/server


Maybe somebody can provide simple example(or references) on how to setup client and server using websocket from boost/beast library? I need an example on how to handle input message on server and respond/not to it(such that client wont crash), how to send some data to specific "subscribed" connections and how to handle it on client side. I found this example, but it doesn't work as intended: Client side:

#include <boost/lockfree/queue.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;

// Thread-safe queue class
template <typename T>
class ThreadSafeQueue {
public:
    void push(const T& value) {
        std::lock_guard<std::mutex> lock(mtx_);
        queue_.push(value);
        cond_var_.notify_one();
    }

    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lock(mtx_);
        if (queue_.empty()) {
            return false;
        }
        value = queue_.front();
        queue_.pop();
        return true;
    }

    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lock(mtx_);
        cond_var_.wait(lock, [this] { return !queue_.empty(); });
        value = queue_.front();
        queue_.pop();
    }

private:
    std::queue<T> queue_;
    mutable std::mutex mtx_;
    std::condition_variable cond_var_;
};

class Singleton {
public:
    ThreadSafeQueue<json>* data_queue;
    websocket::stream<tcp::socket>* ws;

    Singleton(
        ThreadSafeQueue<json>* data_queue_,
        websocket::stream<tcp::socket>* ws_
    ) :
        data_queue(data_queue_),
        ws(ws_)
    {}
};

void readData(Singleton& data) {
    try {
        beast::flat_buffer buffer;
        while (true) {
            data.ws->read(buffer);
            auto received_message = beast::buffers_to_string(buffer.data());
            json received_json = json::parse(received_message);

            data.data_queue->push(received_json);
            std::cout << "Received from server: " << received_message << "\n";

            buffer.consume(buffer.size());
        }
    }
    catch (std::exception e) {
        std::cout << "Error occured in reader: " << e.what() << "\n";
    }
}

void sendData(Singleton& data) {
    try {
        std::string input;
        while (true) {
            std::cout << "To send: ";
            std::getline(std::cin, input);

            if (input == "stop") break;

            json message = {
                {input[0] == 'h' ? "echo" : "null", input}
            };

            data.ws->write(asio::buffer(message.dump()));
        }
    }
    catch (std::exception e) {
        std::cout << "Error occured in sender: " << e.what() << "\n";
    }
}

int main() {
    std::string const host = "127.0.0.1";
    std::string const port = "9002";

    asio::io_context ioc;
    tcp::resolver resolver(ioc);
    websocket::stream<tcp::socket> ws(ioc);

    auto const results = resolver.resolve(host, port);
    asio::connect(ws.next_layer(), results);

    ws.handshake(host, "/");

    ThreadSafeQueue<json> data_queue;
    Singleton single(&data_queue, &ws);

    std::thread reader(readData, std::ref(single));
    std::thread sender(sendData, std::ref(single));

    reader.join();
    sender.join();

    return 0;
}

Server side:

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>

namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;

class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
    WebSocketSession(tcp::socket socket)
        : ws_(std::move(socket)) {}

    void run() {
        ws_.async_accept(
            beast::bind_front_handler(
                &WebSocketSession::on_accept,
                shared_from_this()
            )
        );
    }

private:
    void on_accept(beast::error_code ec) {
        if (ec) {
            std::cerr << "Accept error: " << ec.message() << std::endl;
            return;
        }
        do_read();
    }

    void do_read() {
        ws_.async_read(
            buffer_,
            beast::bind_front_handler(
                &WebSocketSession::on_read,
                shared_from_this()
            )
        );
    }

    void on_read(beast::error_code ec, std::size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);

        if (ec) {
            if (ec == websocket::error::closed) {
                return;
            }
            std::cerr << "Read error: " << ec.message() << std::endl;
            return;
        }

        try {
            auto received_message = beast::buffers_to_string(buffer_.data());
            json received_json = json::parse(received_message);

            std::string response_message;

            if (received_json.contains("echo")) {
                json response_json = {
                    {"type", "response"},
                    {"original", received_json}
                };
                response_message = response_json.dump();
            }
            else {
                buffer_.consume(buffer_.size());
                do_read();
                return;
            }

            response_ptr_ = std::make_shared<std::string>(std::move(response_message));
            ws_.text(ws_.got_text());
            ws_.async_write(
                asio::buffer(*response_ptr_),
                beast::bind_front_handler(
                    &WebSocketSession::on_write,
                    shared_from_this()));
        }
        catch (const std::exception& e) {
            std::cerr << "Processing error: " << e.what() << std::endl;
        }
    }

    void on_write(beast::error_code ec, std::size_t bytes_transferred) {
        boost::ignore_unused(bytes_transferred);

        if (ec) {
            std::cerr << "Write error: " << ec.message() << std::endl;
            return;
        }

        buffer_.consume(buffer_.size());
        response_ptr_.reset();
        do_read();
    }

    websocket::stream<tcp::socket> ws_;
    beast::flat_buffer buffer_;
    std::shared_ptr<std::string> response_ptr_;
};

class WebSocketServer {
public:
    WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint)
        : acceptor_(ioc, endpoint) {
        do_accept();
    }

private:
    void do_accept() {
        acceptor_.async_accept(
            beast::bind_front_handler(
                &WebSocketServer::on_accept,
                this
            )
        );
    }

    void on_accept(beast::error_code ec, tcp::socket socket) {
        if (ec) {
            std::cerr << "Accept error: " << ec.message() << std::endl;
        }
        else {
            std::make_shared<WebSocketSession>(std::move(socket))->run();
        }
        do_accept();
    }

    tcp::acceptor acceptor_;
};

int main() {
    std::cout << "Web server is running:\n";
    try {
        asio::io_context ioc;
        tcp::endpoint endpoint(tcp::v4(), 9002);
        WebSocketServer server(ioc, endpoint);
        ioc.run();
    }
    catch (const std::exception& e) {
        std::cerr << "Error: " << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    return EXIT_SUCCESS;
}

Solution

  • Looks to me that client doesn't crash, but instead simply only sends one message correctly https://i.imgur.com/0zrvFBP.jpeg

    Running the code under ASan/UBSan reveals no glaring issues (good).

    For the client to be full-duplex you will need threads OR async. I suggest async because it fits the library choice.

    Be careful about consuming from dynamic buffers: don't just consume the entire buffer unless it was all part of your current message.

    You need to queue outgoing writes, because you cannot have overlapping writes and apparently your interface is multi-threaded.

    On the server-side you made it simple by making sure that you don't read the next request until the response has been sent. I'd note that response_ptr_ is redundant because it is already part of shared_from_this() object. So, I'd just make it std::string response_;.

    Server notes:

    Combining I'd suggest:

    With customary demos: