When we write data to plain socket or ssl::stream
, it is recommended to use message queue to send data. In this case, I store messages in queue of type std::queue<std::string>
and generally use following implementation:
template<typename socket_t>
class io : public std::enable_shared_from_this<io<socket_t>>
{
public:
io(socket_t socket) : m_socket(std::move(socket)) {}
void start()
{
read();
}
void write(std::string data)
{
bool write_in_progress = !m_write_queue.empty();
m_write_queue.push(std::move(data));
if (!write_in_progress)
{
do_write();
}
}
void close()
{
m_socket.close();
}
private:
socket_t m_socket;
std::array<char, 4096> m_buffer;
std::queue<std::string> m_write_queue;
void read()
{
auto self = this->shared_from_this();
m_socket.async_read_some(boost::asio::buffer(m_buffer),
[this, self](boost::system::error_code ec, std::size_t bytes_transferred)
{
if (!ec)
{
std::string data(m_buffer.data(), bytes_transferred);
read();
}
});
}
void do_write()
{
auto self = this->shared_from_this();
boost::asio::async_write(m_socket, boost::asio::buffer(m_write_queue.front().c_str(), m_write_queue.front().size()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
m_write_queue.pop();
if (!m_write_queue.empty())
{
do_write();
}
}
});
}
};
My question is, how to write data to boost::beast::websocket::stream
asynchronously? Should I create a queue where each element is boost::beast::flat_buffer
or do it some other way?
Ideally I would like to use a std::queue<std::string>
queue, but I don't know how exactly to do this.
I will also be glad if someone gives an example of correct reading of data from boost::beast::websocket::stream
. I think for reading I need to use boost::beast::flat_buffer
. However I don't know if I need to clear this buffer after each read operation if this buffer is a class field?
To answer the last part first: the flat_buffer
models the dynamic buffer concept.
You can use its direct interface to prepare()
, commit()
new content or consume()
available data.
That said, here's a demo implementation that shows how to write this for a WS stream instead of a socket:
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using asio::ip::tcp;
class io : public std::enable_shared_from_this<io> {
public:
using error_code = beast::error_code;
using socket_t = websocket::stream<tcp::socket>;
io(socket_t socket) : m_ws(std::move(socket)) {}
void start() { do_handshake(); }
void queue_write(std::string data) {
asio::post(m_ws.get_executor(), //
[this, m = std::move(data), self = shared_from_this()]() mutable {
m_outgoing.push_back(std::move(m));
if (m_outgoing.size() == 1) // if this is the first write, start the write loop
do_write_loop();
});
}
// void close() { beast::beast_close_socket(beast::get_lowest_layer(m_socket)); }
void stop() {
asio::post(m_ws.get_executor(), [this, self = shared_from_this()]() { //
m_ws.async_close(websocket::close_code::going_away, asio::detached);
});
}
private:
socket_t m_ws;
std::string m_incoming;
std::deque<std::string> m_outgoing;
// beast weirdly requires dynamic buffers to be lvalues; asio does not
asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>> m_incoming_buffer =
asio::dynamic_buffer(m_incoming);
// all private methods are called from the strand
void do_handshake() {
m_ws.async_accept([this, self= shared_from_this()](error_code ec) {
if (ec) {
std::cerr << "Handshake failed: " << ec.message() << std::endl;
return;
}
do_read_loop(); // start reading
do_write_loop(); // just in case writes are queued before handshake
});
}
void do_read_loop() {
m_ws.async_read( //
m_incoming_buffer, [this, self = shared_from_this()](error_code ec, size_t n) {
if (ec) { // TODO partial reads and eof?
std::cerr << "Error reading: " << ec.message() << std::endl;
return;
}
// examples of processing:
#if 1
// directly clearing m_incoming
std::string processing = std::move(m_incoming);
std::cout << "Received: " << quoted(processing) << std::endl;
#else
// or consume (parts of) the buffer
m_incoming_buffer.consume(n);
#endif
do_read_loop();
});
}
void do_write_loop() {
if (m_outgoing.empty())
return;
m_ws.async_write( //
asio::buffer(m_outgoing.front()), [this, self = shared_from_this()](error_code ec, size_t /*n*/) {
if (ec) {
std::cerr << "Error writing: " << ec.message() << std::endl;
return;
}
m_outgoing.pop_front();
do_write_loop();
});
}
};
using std::this_thread::sleep_for;
using namespace std::chrono_literals;
int main() try {
asio::thread_pool ioc;
auto conn = std::make_shared<io>( //
websocket::stream<tcp::socket>( //
tcp::acceptor(ioc, {{}, 7878}) //
.accept(make_strand(ioc)))); //
conn->start();
sleep_for(500ms);
conn->queue_write("hello\n");
conn->queue_write("world\n");
sleep_for(500ms);
conn->stop();
ioc.join();
} catch (std::exception const& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
With a live demo: