c++network-programmingboostboost-asio

Sending data over the network with a specified delay


I have a class io which is an API for reading and sending data to a boost::asio socket. Whenever there is new data to send, that data is queued to std::queue<std::string>, which then sends that data over the network via boost::asio::async_write. The length of std::string strings that are added to std::queue is not known in advance.

I need to add a new functionality that would send data with a given delay. For example, if I set io_obj->set_delay(delay_info::1kb_per_second), then the io object should send no more than 1kb of data every second.

How can such functionality be implemented? The current implementation of the io class is given below

template<typename socket_t>
class io : public std::enable_shared_from_this<io<socket_t>>
{
public:
    io(socket_t socket) : m_socket(std::move(socket)) {}

    void start()
    {
        read();
    }

    void write(std::string data)
    {
        bool write_in_progress = !m_write_queue.empty();
        m_write_queue.push(std::move(data));
        if (!write_in_progress)
        {
            do_write();
        }
    }

    void close()
    {
        m_socket.close();
    }

private:
    socket_t m_socket;
    std::array<char, 4096> m_buffer;
    std::queue<std::string> m_write_queue;

    void read()
    {
        auto self = this->shared_from_this();

        m_socket.async_read_some(boost::asio::buffer(m_buffer),
            [this, self](boost::system::error_code ec, std::size_t bytes_transferred) 
        {
            if (!ec)
            {
                std::string data(m_buffer.data(), bytes_transferred);
                read();
            }
        });
    }

    void do_write()
    {
        auto self = this->shared_from_this();

        boost::asio::async_write(m_socket, boost::asio::buffer(m_write_queue.front().c_str(), m_write_queue.front().size()),
            [this, self](boost::system::error_code ec, std::size_t /*length*/)
        {
            if (!ec)
            {
                m_write_queue.pop();
                if (!m_write_queue.empty())
                {
                    do_write();
                }
            }
        });
    }
};

Solution

  • What you describe is not a delay (you cannot wait for "1 kilobyte per second"). What you describe is rate limiting.

    Rate limiting is best implemented on the Stream object. In your code, you already have a template type argument socket_t that you can pass a stream type that has ratelimiting capability.

    So, you can use the Rate Limiting implementation from Boost Beast.

    So for your example you could convert the rate limit to bytes per second:

    m_socket.rate_policy().write_limit(1024); // 1 kibibyte
    

    Demo

    Also renaming the socket stuff to stream to better match the concept requirements:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/beast/core/basic_stream.hpp>
    #include <boost/beast/core/rate_policy.hpp>
    #include <queue>
    
    namespace asio = boost::asio;
    using asio::ip::tcp;
    
    template <typename stream_t> class io : public std::enable_shared_from_this<io<stream_t>> {
      public:
        io(stream_t s) : m_stream(std::move(s)) {}
    
        void start() { read(); }
    
        void write(std::string data) {
            bool write_in_progress = !m_write_queue.empty();
            m_write_queue.push(std::move(data));
            if (!write_in_progress) {
                do_write();
            }
        }
    
        void close() { m_stream.close(); }
    
      private:
        using error_code = boost::system::error_code;
    
        stream_t                m_stream;
        std::array<char, 4096>  m_buffer;
        std::queue<std::string> m_write_queue;
    
        void read() {
            m_stream.async_read_some(
                asio::buffer(m_buffer),
                [this, self = this->shared_from_this()](error_code ec, size_t bytes_transferred) {
                    if (!ec) {
                        write({m_buffer.data(), bytes_transferred}); // TODO real processing
                        read();
                    }
                });
        }
    
        void do_write() {
            if (m_write_queue.empty())
                return;
    
            async_write(m_stream, asio::buffer(m_write_queue.front()),
                        [this, self = this->shared_from_this()](error_code ec, size_t) {
                            if (!ec) {
                                m_write_queue.pop();
                                do_write();
                            }
                        });
        }
    };
    
    asio::awaitable<void> run_server() {
        using Stream = boost::beast::basic_stream<tcp, asio::any_io_executor, boost::beast::simple_rate_policy>;
    
        for (auto acc = tcp::acceptor(co_await asio::this_coro::executor, {{}, 7878});;) {
            Stream s(co_await acc.async_accept(asio::use_awaitable));
            s.rate_policy().write_limit(1024); // max 1 KiB per second
    
            std::make_shared<io<Stream>>(std::move(s))->start();
        }
    }
    
    int main() {
        asio::io_context io_context(1); // single threaded required for io::write()
    
        asio::co_spawn(io_context, run_server(), asio::detached);
        io_context.run();
    }
    

    Local demo: