c++websocketboostboost-asioboost-beast

Correct way to read & write data to boost::beast::websocket::stream


When we write data to plain socket or ssl::stream, it is recommended to use message queue to send data. In this case, I store messages in queue of type std::queue<std::string> and generally use following implementation:

template<typename socket_t>
class io : public std::enable_shared_from_this<io<socket_t>>
{
public:
    io(socket_t socket) : m_socket(std::move(socket)) {}

    void start()
    {
        read();
    }

    void write(std::string data)
    {
        bool write_in_progress = !m_write_queue.empty();
        m_write_queue.push(std::move(data));
        if (!write_in_progress)
        {
            do_write();
        }
    }

    void close()
    {
        m_socket.close();
    }

private:
    socket_t m_socket;
    std::array<char, 4096> m_buffer;
    std::queue<std::string> m_write_queue;

    void read()
    {
        auto self = this->shared_from_this();

        m_socket.async_read_some(boost::asio::buffer(m_buffer),
            [this, self](boost::system::error_code ec, std::size_t bytes_transferred) 
        {
            if (!ec)
            {
                std::string data(m_buffer.data(), bytes_transferred);
                read();
            }
        });
    }

    void do_write()
    {
        auto self = this->shared_from_this();

        boost::asio::async_write(m_socket, boost::asio::buffer(m_write_queue.front().c_str(), m_write_queue.front().size()),
            [this, self](boost::system::error_code ec, std::size_t /*length*/)
        {
            if (!ec)
            {
                m_write_queue.pop();
                if (!m_write_queue.empty())
                {
                    do_write();
                }
            }
        });
    }
};

My question is, how to write data to boost::beast::websocket::stream asynchronously? Should I create a queue where each element is boost::beast::flat_buffer or do it some other way?

Ideally I would like to use a std::queue<std::string> queue, but I don't know how exactly to do this.

I will also be glad if someone gives an example of correct reading of data from boost::beast::websocket::stream. I think for reading I need to use boost::beast::flat_buffer. However I don't know if I need to clear this buffer after each read operation if this buffer is a class field?


Solution

  • To answer the last part first: the flat_buffer models the dynamic buffer concept.

    You can use its direct interface to prepare(), commit() new content or consume() available data.

    That said, here's a demo implementation that shows how to write this for a WS stream instead of a socket:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <deque>
    #include <iostream>
    
    namespace asio      = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using asio::ip::tcp;
    
    class io : public std::enable_shared_from_this<io> {
      public:
        using error_code = beast::error_code;
        using socket_t   = websocket::stream<tcp::socket>;
        io(socket_t socket) : m_ws(std::move(socket)) {}
    
        void start() { do_handshake(); }
    
        void queue_write(std::string data) {
            asio::post(m_ws.get_executor(), //
                       [this, m = std::move(data), self = shared_from_this()]() mutable {
                           m_outgoing.push_back(std::move(m));
                           if (m_outgoing.size() == 1) // if this is the first write, start the write loop
                               do_write_loop();
                       });
        }
    
        // void close() { beast::beast_close_socket(beast::get_lowest_layer(m_socket)); }
        void stop() {
            asio::post(m_ws.get_executor(), [this, self = shared_from_this()]() { //
                m_ws.async_close(websocket::close_code::going_away, asio::detached);
            });
        }
    
      private:
        socket_t                m_ws;
        std::string             m_incoming;
        std::deque<std::string> m_outgoing;
    
        // beast weirdly requires dynamic buffers to be lvalues; asio does not 
        asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>> m_incoming_buffer =
            asio::dynamic_buffer(m_incoming);
    
        // all private methods are called from the strand
        void do_handshake() {
            m_ws.async_accept([this, self= shared_from_this()](error_code ec) {
                if (ec) {
                    std::cerr << "Handshake failed: " << ec.message() << std::endl;
                    return;
                }
    
                do_read_loop();  // start reading
                do_write_loop(); // just in case writes are queued before handshake
            });
        }
    
        void do_read_loop() {
            m_ws.async_read( //
                m_incoming_buffer, [this, self = shared_from_this()](error_code ec, size_t n) {
                    if (ec) { // TODO partial reads and eof?
                        std::cerr << "Error reading: " << ec.message() << std::endl;
                        return;
                    }
    
                    // examples of processing:
    #if 1
                    // directly clearing m_incoming
                    std::string processing  = std::move(m_incoming);
                    std::cout << "Received: " << quoted(processing) << std::endl;
    #else
                    // or consume (parts of) the buffer
                    m_incoming_buffer.consume(n);
    #endif
    
                    do_read_loop();
                });
        }
    
        void do_write_loop() {
            if (m_outgoing.empty())
                return;
    
            m_ws.async_write( //
                asio::buffer(m_outgoing.front()), [this, self = shared_from_this()](error_code ec, size_t /*n*/) {
                    if (ec) {
                        std::cerr << "Error writing: " << ec.message() << std::endl;
                        return;
                    }
    
                    m_outgoing.pop_front();
                    do_write_loop();
                });
        }
    };
    
    using std::this_thread::sleep_for;
    using namespace std::chrono_literals;
    
    int main() try {
        asio::thread_pool ioc;
    
        auto conn = std::make_shared<io>(        //
            websocket::stream<tcp::socket>(      //
                tcp::acceptor(ioc, {{}, 7878})   //
                    .accept(make_strand(ioc)))); //
    
        conn->start();                         
    
        sleep_for(500ms);
    
        conn->queue_write("hello\n");
        conn->queue_write("world\n");
    
        sleep_for(500ms);
    
        conn->stop();
    
        ioc.join();
    } catch (std::exception const& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }
    

    With a live demo: