c++boostboost-asioboost-beast

Is it possible to apply rate limiting to UDP sockets in Boost?


As is well known, in Boost we can use a rate limit to restrict the read and write speed of a TCP stream, as indicated in the documentation: https://www.boost.org/doc/libs/latest/libs/beast/doc/html/beast/using_io/rate_limiting.html

My question is: How can I apply the same rate limit to reading from and writing to a UDP socket?


Solution

  • You can't. Besides, it would only apply to the specific udp socket instance being used, which is kinda arbitrary, because any other socket is just as connection-less, so would count.

    However, you can employ any rate limiting algorithm (e.g. "Leaky bucket") of your choosing by queuing up reads/writes.

    See for examples and descriptions:

    One interesting consideration for datagram protocols is that you cannot re-fragment packets at all. Therefore it might be convenient to measure rate in datagrams/s, not bytes/s.

    Sketch

    A quick sketch using the leaky-bucket approach to limit to N datagrams per second.

    Using a channel with a fixed capacity (max_buffer_size) as well to show back-pressure. You can see this in action by the reported latency between enqueueing and send operation completion.

    Using co-routines for ease of implementation, but showing general purpose initiation for async_send_to so you can consume it in non-coroutines as well (see parallel_producer).

    Using concurrent channels and strand to be thread safe.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/any_completion_handler.hpp>
    #include <boost/asio/experimental/concurrent_channel.hpp>
    #include <iostream>
    #include <syncstream>
    
    namespace asio = boost::asio;
    namespace aex  = asio::experimental;
    using namespace std::literals; // _sv and chrono
    using asio::ip::udp;
    using boost::system::error_code;
    
    using duration            = std::chrono::high_resolution_clock::duration;
    static constexpr auto now = std::chrono::high_resolution_clock::now;
    
    auto trace() {
        static auto const       start = now();
        thread_local auto const t_id  = [] {
            static std::atomic_uint32_t gen{};
            return gen++;
        }();
    
        return std::osyncstream(std::cout)
            << "T:" << t_id << " " << std::setw(6) << (now() - start) / 1ms << "ms ";
    }
    
    struct MyUdp {
        MyUdp(size_t rate, asio::any_io_executor ex, udp::endpoint local_ep = {}, size_t max_buffer_size = 100)
            : sock_(ex, local_ep)                                            //
            , queue_(ex, max_buffer_size)                                    //
            , interval_(duration_cast<std::chrono::nanoseconds>(1.s) / rate) //
        {
            co_spawn(queue_.get_executor(), std::bind_front(&MyUdp::drip, this), asio::detached);
        }
    
        using Sig     = void(error_code, size_t, duration);
        using Handler = asio::any_completion_handler<Sig>;
    
        template <asio::completion_token_for<Sig> Token = asio::deferred_t>
        auto async_send_to(udp::endpoint to, asio::const_buffer buf, Token&& token = {}) {
            return asio::async_initiate<Token, Sig>(
                [](auto handler, queue& q, udp::endpoint to, asio::const_buffer buf) {
                    auto wrap = [heap = std::make_shared<decltype(handler)>(std::move(handler))] //
                        (auto&&... args) mutable {                                               //
                            std::move (*heap)(std::forward<decltype(args)>(args)...);
                        };
    
                    q.async_send({}, to, buf, wrap, now(), [wrap](error_code ec) mutable {
                        if (ec)
                            std::move(wrap)(ec, 0, duration{});
                    });
                },
                token, std::ref(queue_), to, buf);
        }
    
        void close() { queue_.close(); }
    
      private:
        using stamp   = decltype(now());
        using message = void(error_code, udp::endpoint, asio::const_buffer, Handler, stamp);
        using queue   = aex::concurrent_channel<message>;
        udp::socket    sock_;
        queue          queue_;
        duration const interval_;
    
        asio::awaitable<void> drip() {
            trace() << "drip: interval_ = " << interval_ << std::endl;
            asio::high_resolution_timer next_drip{co_await asio::this_coro::executor};
    
            error_code ec;
            for (auto token = asio::redirect_error(ec); !ec; co_await next_drip.async_wait(token)) {
                auto [to, buf, h, ts] = co_await queue_.async_receive(token);
    
                if (ec)
                    break;
                next_drip.expires_after(interval_);
                auto n = co_await sock_.async_send_to(buf, to, token);
                // trace() << "drip: sent " << n << " bytes (" << ec.message() << ")" << std::endl;
                asio::dispatch(std::bind(std::move(h), std::move(ec), n, now() - ts));
            }
            trace() << "drip: " << ec.message() << std::endl;
        }
    };
    
    static asio::awaitable<void> sequential_producer(auto id, MyUdp& udp) {
        auto trace = [id] { return ::trace() << "sequential_producer:" << id << " "; };
        try {
            for (auto i = 0; i < 10; ++i) {
                // trace() << "Initiating send #" << i << std::endl;
                auto [ec, n, latency] = co_await udp.async_send_to( //
                    {{}, 8989}, asio::buffer("hello world!\n"sv), asio::as_tuple);
    
                if (!ec)
                    trace() << "Message #" << i << " completion (" << ec.message() << ")" << " n:" << n
                            << " latency:" << latency / 1ms << "ms" << std::endl;
                else
                    trace() << "Message #" << i << " completion (" << ec.message() << ")" << std::endl;
            }
        } catch (boost::system::system_error const& se) {
            trace() << se.code().message() << std::endl;
        }
    }
    
    static void parallel_producer(auto id, MyUdp& udp) {
        auto trace = [id] { return ::trace() << "parallel_producer:" << id << " "; };
        try {
            for (auto i = 0; i < 10; ++i) {
                udp.async_send_to( //
                    {{}, 8989}, asio::buffer("hello world!\n"sv),
                    [trace, i](error_code ec, size_t n, duration latency) {
                        if (!ec)
                            trace() << "Message #" << i << " completion (" << ec.message() << ")" << " n:" << n
                                    << " latency:" << latency / 1ms << "ms" << std::endl;
                        else
                            trace() << "Message #" << i << " completion (" << ec.message() << ")" << std::endl;
                    });
            }
    
        } catch (boost::system::system_error const& se) {
            trace() << se.code().message() << std::endl;
        }
    }
    
    int main() {
        trace() << "main: Enter\n";
        asio::thread_pool ioc{4};
        MyUdp             udp(5, make_strand(ioc), {}, 5);
    
    #if 1
        co_spawn(ioc, sequential_producer("A", udp), asio::detached);
        co_spawn(ioc, sequential_producer("B", udp), asio::detached);
    #else
        parallel_producer("A", udp);
        parallel_producer("B", udp);
    #endif
        parallel_producer("C", udp);
    
        std::this_thread::sleep_for(10s);
        trace() << "main: Closing\n";
        udp.close();
    
        trace() << "main: Joining\n";
        ioc.join();
    
        trace() << "main: Goodbye\n";
    }
    

    Printing e.g.

    sehe@workstation:~/Projects/stackoverflow$ ./build/sotest
    T:0      0ms main: Enter
    T:1      0ms drip: interval_ = 200000000ns
    T:1      0ms parallel_producer:C Message #0 completion (Success) n:13 latency:0ms
    T:2    200ms parallel_producer:C Message #1 completion (Success) n:13 latency:200ms
    T:3    400ms parallel_producer:C Message #2 completion (Success) n:13 latency:400ms
    T:4    600ms parallel_producer:C Message #3 completion (Success) n:13 latency:600ms
    T:2    801ms parallel_producer:C Message #4 completion (Success) n:13 latency:800ms
    T:1   1001ms parallel_producer:C Message #5 completion (Success) n:13 latency:1001ms
    T:2   1201ms parallel_producer:C Message #6 completion (Success) n:13 latency:1201ms
    T:4   1401ms parallel_producer:C Message #7 completion (Success) n:13 latency:1401ms
    T:3   1601ms parallel_producer:C Message #8 completion (Success) n:13 latency:1601ms
    T:4   1801ms parallel_producer:C Message #9 completion (Success) n:13 latency:1801ms
    T:3   2001ms sequential_producer:A Message #0 completion (Success) n:13 latency:2001ms
    T:4   2201ms sequential_producer:B Message #0 completion (Success) n:13 latency:2201ms
    T:3   2401ms sequential_producer:A Message #1 completion (Success) n:13 latency:399ms
    T:4   2602ms sequential_producer:B Message #1 completion (Success) n:13 latency:400ms
    T:2   2802ms sequential_producer:A Message #2 completion (Success) n:13 latency:400ms
    T:2   3002ms sequential_producer:B Message #2 completion (Success) n:13 latency:400ms
    T:3   3202ms sequential_producer:A Message #3 completion (Success) n:13 latency:400ms
    T:2   3402ms sequential_producer:B Message #3 completion (Success) n:13 latency:400ms
    T:3   3602ms sequential_producer:A Message #4 completion (Success) n:13 latency:400ms
    T:4   3802ms sequential_producer:B Message #4 completion (Success) n:13 latency:400ms
    T:3   4002ms sequential_producer:A Message #5 completion (Success) n:13 latency:400ms
    T:4   4202ms sequential_producer:B Message #5 completion (Success) n:13 latency:400ms
    T:2   4402ms sequential_producer:A Message #6 completion (Success) n:13 latency:400ms
    T:1   4602ms sequential_producer:B Message #6 completion (Success) n:13 latency:400ms
    T:3   4802ms sequential_producer:A Message #7 completion (Success) n:13 latency:400ms
    T:1   5002ms sequential_producer:B Message #7 completion (Success) n:13 latency:400ms
    T:3   5203ms sequential_producer:A Message #8 completion (Success) n:13 latency:400ms
    T:2   5403ms sequential_producer:B Message #8 completion (Success) n:13 latency:400ms
    T:2   5603ms sequential_producer:A Message #9 completion (Success) n:13 latency:400ms
    T:2   5803ms sequential_producer:B Message #9 completion (Success) n:13 latency:400ms
    T:0  10000ms main: Closing
    T:0  10000ms main: Joining
    T:4  10000ms drip: Channel closed
    T:0  10000ms main: Goodbye
    sehe@workstation:~/Projects/stackoverflow$ 
    

    Note how increasing the max buffer size to, say, 100 makes all messages enqueue immediately. Note how the choice of rate=5 causes the drip interval to be 200ms.

    Notes: