I have a class io
which is an API for reading and sending data to a boost::asio
socket. Whenever there is new data to send, that data is queued to std::queue<std::string>
, which then sends that data over the network via boost::asio::async_write
. The length of std::string
strings that are added to std::queue
is not known in advance.
I need to add a new functionality that would send data with a given delay. For example, if I set io_obj->set_delay(delay_info::1kb_per_second)
, then the io
object should send no more than 1kb of data every second.
How can such functionality be implemented? The current implementation of the io
class is given below
template<typename socket_t>
class io : public std::enable_shared_from_this<io<socket_t>>
{
public:
io(socket_t socket) : m_socket(std::move(socket)) {}
void start()
{
read();
}
void write(std::string data)
{
bool write_in_progress = !m_write_queue.empty();
m_write_queue.push(std::move(data));
if (!write_in_progress)
{
do_write();
}
}
void close()
{
m_socket.close();
}
private:
socket_t m_socket;
std::array<char, 4096> m_buffer;
std::queue<std::string> m_write_queue;
void read()
{
auto self = this->shared_from_this();
m_socket.async_read_some(boost::asio::buffer(m_buffer),
[this, self](boost::system::error_code ec, std::size_t bytes_transferred)
{
if (!ec)
{
std::string data(m_buffer.data(), bytes_transferred);
read();
}
});
}
void do_write()
{
auto self = this->shared_from_this();
boost::asio::async_write(m_socket, boost::asio::buffer(m_write_queue.front().c_str(), m_write_queue.front().size()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
m_write_queue.pop();
if (!m_write_queue.empty())
{
do_write();
}
}
});
}
};
What you describe is not a delay (you cannot wait for "1 kilobyte per second"). What you describe is rate limiting.
Rate limiting is best implemented on the Stream object. In your code, you already have a template type argument socket_t
that you can pass a stream type that has ratelimiting capability.
So, you can use the Rate Limiting implementation from Boost Beast.
So for your example you could convert the rate limit to bytes per second:
m_socket.rate_policy().write_limit(1024); // 1 kibibyte
Also renaming the socket
stuff to stream
to better match the concept requirements:
#include <boost/asio.hpp>
#include <boost/beast/core/basic_stream.hpp>
#include <boost/beast/core/rate_policy.hpp>
#include <queue>
namespace asio = boost::asio;
using asio::ip::tcp;
template <typename stream_t> class io : public std::enable_shared_from_this<io<stream_t>> {
public:
io(stream_t s) : m_stream(std::move(s)) {}
void start() { read(); }
void write(std::string data) {
bool write_in_progress = !m_write_queue.empty();
m_write_queue.push(std::move(data));
if (!write_in_progress) {
do_write();
}
}
void close() { m_stream.close(); }
private:
using error_code = boost::system::error_code;
stream_t m_stream;
std::array<char, 4096> m_buffer;
std::queue<std::string> m_write_queue;
void read() {
m_stream.async_read_some(
asio::buffer(m_buffer),
[this, self = this->shared_from_this()](error_code ec, size_t bytes_transferred) {
if (!ec) {
write({m_buffer.data(), bytes_transferred}); // TODO real processing
read();
}
});
}
void do_write() {
if (m_write_queue.empty())
return;
async_write(m_stream, asio::buffer(m_write_queue.front()),
[this, self = this->shared_from_this()](error_code ec, size_t) {
if (!ec) {
m_write_queue.pop();
do_write();
}
});
}
};
asio::awaitable<void> run_server() {
using Stream = boost::beast::basic_stream<tcp, asio::any_io_executor, boost::beast::simple_rate_policy>;
for (auto acc = tcp::acceptor(co_await asio::this_coro::executor, {{}, 7878});;) {
Stream s(co_await acc.async_accept(asio::use_awaitable));
s.rate_policy().write_limit(1024); // max 1 KiB per second
std::make_shared<io<Stream>>(std::move(s))->start();
}
}
int main() {
asio::io_context io_context(1); // single threaded required for io::write()
asio::co_spawn(io_context, run_server(), asio::detached);
io_context.run();
}
Local demo: