I'm currently trying out C++ coroutines abstracting away io_uring. For that I have the following class:
class io_service final {
public:
explicit io_service(unsigned size, threadpool& pool) : pool_(pool) {
if (auto ret = io_uring_queue_init(size, &ring_, 0); ret < 0) {
throw std::runtime_error{"Liburing error!"};
}
}
~io_service() {
io_uring_queue_exit(&ring_);
}
void message_pump() {
io_uring_cqe* cqe = nullptr;
while (true) {
auto ret = io_uring_wait_cqe(&ring_, &cqe);
auto* data = static_cast<io_result*>(io_uring_cqe_get_data(cqe));
if (ret < 0) {
std::cerr << "Fatal error in io_uring_wait_cqe!\n";
throw std::runtime_error{"Fatal error in io_uring_wait_cqe!"};
}
if (cqe->res < 0) {
std::cerr << "Error while doing an asynchronous request: "
<< -cqe->res << " (" << strerror(-cqe->res) << ")\n";
throw std::runtime_error{"Error while doing an asynchronous request : "
+ std::string(strerror(-cqe->res))};
}
data->status_code = cqe->res;
pool_.push_task([handle = data->handle] { handle.resume(); });
io_uring_cqe_seen(&ring_, cqe);
}
}
[[nodiscard]] auto accept_async(int socket, sockaddr_in& in, socklen_t& socket_length) {
return uring_awaitable{
&ring_,
io_result::operation_type::accept,
io_uring_prep_accept,
socket,
reinterpret_cast<sockaddr*>(&in),
&socket_length,
0
};
}
private:
struct uring_awaiter {
io_uring* ring_;
io_uring_sqe* entry;
io_result request_data{};
explicit uring_awaiter(io_result::operation_type op_type, io_uring* ring, io_uring_sqe* sqe) : ring_(ring), entry(sqe), request_data{op_type} {}
[[nodiscard]] bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> handle) noexcept {
request_data.handle = handle;
io_uring_sqe_set_data(entry, &request_data);
// SUBMITTING HERE LATER CAUSES ERRORS ==============================
io_uring_submit(ring_);
// ==================================================================
}
[[nodiscard]] int await_resume() const noexcept {
return request_data.status_code;
}
};
class uring_awaitable {
public:
template <typename F, typename... Args>
requires requires(F f) { f(std::declval<io_uring_sqe*>(), std::declval<Args>()...); }
uring_awaitable(io_uring* ring, io_result::operation_type op, F function, Args&&... args)
: ring_(ring), sqe_(io_uring_get_sqe(ring_)), op_(op) {
function(sqe_, std::forward<Args>(args)...);
}
auto operator co_await() const {
return uring_awaiter{op_, ring_, sqe_};
}
private:
io_uring* ring_;
io_uring_sqe* sqe_;
io_result::operation_type op_;
};
io_uring ring_{};
bool interrupted_ = false;
threadpool& pool_;
};
This class is meant to be used like this:
threadpool p{};
io_service s{128, p};
// In another thread, later
co_await s.accept_async(/* ... */);
The problem occurs when I put io_uring_submit
in await_resume()
as indicated in the above code snippet. Then I get the output "Error while doing an asynchronous request: 125 (Operation canceled)". However, if I change my message_pump()
to something like this (and remove the submission from await_resume()
):
void message_pump() {
using namespace std::chrono_literals;
io_uring_cqe* cqe = nullptr;
while (true) {
// SUBMITTING HERE ==================================================
std::this_thread::sleep_for(1s);
io_uring_submit(&ring_);
// ==================================================================
auto ret = io_uring_wait_cqe(&ring_, &cqe);
auto* data = static_cast<io_result*>(io_uring_cqe_get_data(cqe));
if (ret < 0) {
std::cerr << "Fatal error in io_uring_wait_cqe!\n";
throw std::runtime_error{"Fatal error in io_uring_wait_cqe!"};
}
if (cqe->res < 0) {
std::cerr << "Error while doing an asynchronous request: " << -cqe->res << " (" << strerror(-cqe->res) << ")\n";
throw std::runtime_error{"Error while doing an asynchronous request : " + std::string(strerror(-cqe->res))};
}
data->status_code = cqe->res;
pool_.push_task([handle = data->handle] { handle.resume(); });
io_uring_cqe_seen(&ring_, cqe);
}
}
Now everything works as expected. Obviously this is not the proper way to do things.
Why is the first approach not working?
Ops that complete through a kernel task have to use the thread that called io_uring_submit()
. That means that thread can't terminate before the cqe is completed in the kernel. You risk losing completions if you're submitting sqes from a dynamic thread pool.
I'm not 100% certain accept uses a kernel task or that this case returns -ECANCEL
but I had to switch to dedicated threads to submit for uring_cmds because of this.
The recommendation from the liburing feature request "submit requests from any thread" is to have a single thread to submit or each thread has its own ring (scroll to the very bottom).