As is well known, in Boost we can use a rate limit to restrict the read and write speed of a TCP stream, as indicated in the documentation: https://www.boost.org/doc/libs/latest/libs/beast/doc/html/beast/using_io/rate_limiting.html
My question is: How can I apply the same rate limit to reading from and writing to a UDP socket?
You can't. Besides, it would only apply to the specific udp socket instance being used, which is kinda arbitrary, because any other socket is just as connection-less, so would count.
However, you can employ any rate limiting algorithm (e.g. "Leaky bucket") of your choosing by queuing up reads/writes.
See for examples and descriptions:
One interesting consideration for datagram protocols is that you cannot re-fragment packets at all. Therefore it might be convenient to measure rate in datagrams/s, not bytes/s.
A quick sketch using the leaky-bucket approach to limit to N datagrams per second.
Using a channel with a fixed capacity (max_buffer_size) as well to show back-pressure. You can see this in action by the reported latency between enqueueing and send operation completion.
Using co-routines for ease of implementation, but showing general purpose initiation for async_send_to so you can consume it in non-coroutines as well (see parallel_producer).
Using concurrent channels and strand to be thread safe.
#include <boost/asio.hpp>
#include <boost/asio/any_completion_handler.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <iostream>
#include <syncstream>
namespace asio = boost::asio;
namespace aex = asio::experimental;
using namespace std::literals; // _sv and chrono
using asio::ip::udp;
using boost::system::error_code;
using duration = std::chrono::high_resolution_clock::duration;
static constexpr auto now = std::chrono::high_resolution_clock::now;
auto trace() {
static auto const start = now();
thread_local auto const t_id = [] {
static std::atomic_uint32_t gen{};
return gen++;
}();
return std::osyncstream(std::cout)
<< "T:" << t_id << " " << std::setw(6) << (now() - start) / 1ms << "ms ";
}
struct MyUdp {
MyUdp(size_t rate, asio::any_io_executor ex, udp::endpoint local_ep = {}, size_t max_buffer_size = 100)
: sock_(ex, local_ep) //
, queue_(ex, max_buffer_size) //
, interval_(duration_cast<std::chrono::nanoseconds>(1.s) / rate) //
{
co_spawn(queue_.get_executor(), std::bind_front(&MyUdp::drip, this), asio::detached);
}
using Sig = void(error_code, size_t, duration);
using Handler = asio::any_completion_handler<Sig>;
template <asio::completion_token_for<Sig> Token = asio::deferred_t>
auto async_send_to(udp::endpoint to, asio::const_buffer buf, Token&& token = {}) {
return asio::async_initiate<Token, Sig>(
[](auto handler, queue& q, udp::endpoint to, asio::const_buffer buf) {
auto wrap = [heap = std::make_shared<decltype(handler)>(std::move(handler))] //
(auto&&... args) mutable { //
std::move (*heap)(std::forward<decltype(args)>(args)...);
};
q.async_send({}, to, buf, wrap, now(), [wrap](error_code ec) mutable {
if (ec)
std::move(wrap)(ec, 0, duration{});
});
},
token, std::ref(queue_), to, buf);
}
void close() { queue_.close(); }
private:
using stamp = decltype(now());
using message = void(error_code, udp::endpoint, asio::const_buffer, Handler, stamp);
using queue = aex::concurrent_channel<message>;
udp::socket sock_;
queue queue_;
duration const interval_;
asio::awaitable<void> drip() {
trace() << "drip: interval_ = " << interval_ << std::endl;
asio::high_resolution_timer next_drip{co_await asio::this_coro::executor};
error_code ec;
for (auto token = asio::redirect_error(ec); !ec; co_await next_drip.async_wait(token)) {
auto [to, buf, h, ts] = co_await queue_.async_receive(token);
if (ec)
break;
next_drip.expires_after(interval_);
auto n = co_await sock_.async_send_to(buf, to, token);
// trace() << "drip: sent " << n << " bytes (" << ec.message() << ")" << std::endl;
asio::dispatch(std::bind(std::move(h), std::move(ec), n, now() - ts));
}
trace() << "drip: " << ec.message() << std::endl;
}
};
static asio::awaitable<void> sequential_producer(auto id, MyUdp& udp) {
auto trace = [id] { return ::trace() << "sequential_producer:" << id << " "; };
try {
for (auto i = 0; i < 10; ++i) {
// trace() << "Initiating send #" << i << std::endl;
auto [ec, n, latency] = co_await udp.async_send_to( //
{{}, 8989}, asio::buffer("hello world!\n"sv), asio::as_tuple);
if (!ec)
trace() << "Message #" << i << " completion (" << ec.message() << ")" << " n:" << n
<< " latency:" << latency / 1ms << "ms" << std::endl;
else
trace() << "Message #" << i << " completion (" << ec.message() << ")" << std::endl;
}
} catch (boost::system::system_error const& se) {
trace() << se.code().message() << std::endl;
}
}
static void parallel_producer(auto id, MyUdp& udp) {
auto trace = [id] { return ::trace() << "parallel_producer:" << id << " "; };
try {
for (auto i = 0; i < 10; ++i) {
udp.async_send_to( //
{{}, 8989}, asio::buffer("hello world!\n"sv),
[trace, i](error_code ec, size_t n, duration latency) {
if (!ec)
trace() << "Message #" << i << " completion (" << ec.message() << ")" << " n:" << n
<< " latency:" << latency / 1ms << "ms" << std::endl;
else
trace() << "Message #" << i << " completion (" << ec.message() << ")" << std::endl;
});
}
} catch (boost::system::system_error const& se) {
trace() << se.code().message() << std::endl;
}
}
int main() {
trace() << "main: Enter\n";
asio::thread_pool ioc{4};
MyUdp udp(5, make_strand(ioc), {}, 5);
#if 1
co_spawn(ioc, sequential_producer("A", udp), asio::detached);
co_spawn(ioc, sequential_producer("B", udp), asio::detached);
#else
parallel_producer("A", udp);
parallel_producer("B", udp);
#endif
parallel_producer("C", udp);
std::this_thread::sleep_for(10s);
trace() << "main: Closing\n";
udp.close();
trace() << "main: Joining\n";
ioc.join();
trace() << "main: Goodbye\n";
}
Printing e.g.
sehe@workstation:~/Projects/stackoverflow$ ./build/sotest
T:0 0ms main: Enter
T:1 0ms drip: interval_ = 200000000ns
T:1 0ms parallel_producer:C Message #0 completion (Success) n:13 latency:0ms
T:2 200ms parallel_producer:C Message #1 completion (Success) n:13 latency:200ms
T:3 400ms parallel_producer:C Message #2 completion (Success) n:13 latency:400ms
T:4 600ms parallel_producer:C Message #3 completion (Success) n:13 latency:600ms
T:2 801ms parallel_producer:C Message #4 completion (Success) n:13 latency:800ms
T:1 1001ms parallel_producer:C Message #5 completion (Success) n:13 latency:1001ms
T:2 1201ms parallel_producer:C Message #6 completion (Success) n:13 latency:1201ms
T:4 1401ms parallel_producer:C Message #7 completion (Success) n:13 latency:1401ms
T:3 1601ms parallel_producer:C Message #8 completion (Success) n:13 latency:1601ms
T:4 1801ms parallel_producer:C Message #9 completion (Success) n:13 latency:1801ms
T:3 2001ms sequential_producer:A Message #0 completion (Success) n:13 latency:2001ms
T:4 2201ms sequential_producer:B Message #0 completion (Success) n:13 latency:2201ms
T:3 2401ms sequential_producer:A Message #1 completion (Success) n:13 latency:399ms
T:4 2602ms sequential_producer:B Message #1 completion (Success) n:13 latency:400ms
T:2 2802ms sequential_producer:A Message #2 completion (Success) n:13 latency:400ms
T:2 3002ms sequential_producer:B Message #2 completion (Success) n:13 latency:400ms
T:3 3202ms sequential_producer:A Message #3 completion (Success) n:13 latency:400ms
T:2 3402ms sequential_producer:B Message #3 completion (Success) n:13 latency:400ms
T:3 3602ms sequential_producer:A Message #4 completion (Success) n:13 latency:400ms
T:4 3802ms sequential_producer:B Message #4 completion (Success) n:13 latency:400ms
T:3 4002ms sequential_producer:A Message #5 completion (Success) n:13 latency:400ms
T:4 4202ms sequential_producer:B Message #5 completion (Success) n:13 latency:400ms
T:2 4402ms sequential_producer:A Message #6 completion (Success) n:13 latency:400ms
T:1 4602ms sequential_producer:B Message #6 completion (Success) n:13 latency:400ms
T:3 4802ms sequential_producer:A Message #7 completion (Success) n:13 latency:400ms
T:1 5002ms sequential_producer:B Message #7 completion (Success) n:13 latency:400ms
T:3 5203ms sequential_producer:A Message #8 completion (Success) n:13 latency:400ms
T:2 5403ms sequential_producer:B Message #8 completion (Success) n:13 latency:400ms
T:2 5603ms sequential_producer:A Message #9 completion (Success) n:13 latency:400ms
T:2 5803ms sequential_producer:B Message #9 completion (Success) n:13 latency:400ms
T:0 10000ms main: Closing
T:0 10000ms main: Joining
T:4 10000ms drip: Channel closed
T:0 10000ms main: Goodbye
sehe@workstation:~/Projects/stackoverflow$
Note how increasing the max buffer size to, say, 100 makes all messages enqueue immediately. Note how the choice of rate=5 causes the drip interval to be 200ms.
Notes:
asio::experimental::promise to somehow prevent the shared state and having to cancel broken promises manuallyasio::deferred([](...) { }) for composition to avoid having to shared-allocate the handler as currently shown