The following code simulates short bursts of work by sleeping 1ms at a time in order to achieve a cancellable task that takes a total of 2s. That work is then launched in three different ways.
co_spawn
(labelled Task-1) explicitly uses bind_cancellation_slot
and successfully cancels the work after 1s.co_spawn
uses awaitable_operators
and steady_timer::async_wait
to deliver the cancellation after 200ms, but that attempt fails, resulting in the full 2s execution duration.co_spawn
uses a variation of the work that explicitly uses boost::asio::defer
before starting the work to resolve the issue with the previous attempt.Is the third attempt at using co_spawn
correct? It seems wrong to me that my task's implementation needs to defensively defer
at the very beginning in order to work with the awaitable_operators
. Is there a better way to implement cancellation of such a long running task?
As a sidenote: I am aware that using co_await
on an asynchronous task inside SyncWork
would enable cancellation at suspension points. But I really am looking for a way to be more proactive with cancellation in between suspension points. I believe this issue is mostly relevant to the period of time when the coroutine starts and before the first suspension point is eventually hit.
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
// Simulate work that lasts for 2s, and that checks for cancellation at every 1ms
boost::asio::awaitable<void>
SyncWork()
{
using namespace std::chrono_literals;
auto before = std::chrono::steady_clock::now();
auto limit = before + 2s;
while (std::chrono::steady_clock::now() < limit) {
auto cs = co_await boost::asio::this_coro::cancellation_state;
if (cs.cancelled() != boost::asio::cancellation_type::none) break;
std::this_thread::sleep_for(1ms);
}
std::cout << "SyncWork - elapsed: " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before).count() << "ms\n";
}
boost::asio::awaitable<void>
DeferredSyncWork()
{
auto executor = co_await boost::asio::this_coro::executor;
co_await boost::asio::defer(executor, boost::asio::use_awaitable);
co_await SyncWork();
}
boost::asio::awaitable<void>
timeout()
{
auto executor = co_await boost::asio::this_coro::executor;
boost::asio::steady_timer timer{executor};
timer.expires_from_now(std::chrono::milliseconds{200});
co_await timer.async_wait(boost::asio::use_awaitable);
std::cout << "timeout\n";
}
boost::asio::awaitable<void>
WorkWithTimeout()
{
using namespace boost::asio::experimental::awaitable_operators;
co_await (SyncWork() || timeout());
}
boost::asio::awaitable<void>
DeferredWorkWithTimeout()
{
using namespace boost::asio::experimental::awaitable_operators;
co_await (DeferredSyncWork() || timeout());
}
void HandleException(std::exception_ptr p)
{
if (p) {
try {
std::rethrow_exception(p);
} catch (const std::exception& e) {
std::cout << e.what() << '\n';
} catch (...) {
std::cout << "catch\n";
}
} else {
std::cout << "success\n";
}
}
int main()
{
boost::asio::thread_pool pool{8};
boost::asio::cancellation_signal signal;
boost::asio::co_spawn(pool, SyncWork(), boost::asio::bind_cancellation_slot(signal.slot(), HandleException)); // Task-1
boost::asio::co_spawn(pool, DeferredWorkWithTimeout(), HandleException); // Task-2
boost::asio::co_spawn(pool, WorkWithTimeout(), HandleException); // Task-3
std::this_thread::sleep_for(std::chrono::seconds{1});
signal.emit(boost::asio::cancellation_type_t::terminal);
pool.join();
}
Here is the program output:
timeout
SyncWork - elapsed: 202ms
success
SyncWork - elapsed: 1005ms
success
SyncWork - elapsed: 2000ms
success
Program ended with exit code: 0
Is the third attempt at using co_spawn correct? It seems wrong to me that my task's implementation needs to defensively defer at the very beginning in order to work with the awaitable_operators.
No that has no effect. You simply incur some runtime overhead to land back on the same executor you were already on.
Your problem is that you are ... running a blocking function. Also, some other thread is emit()
-ing a signal, but there's no synchronization in place. It looks like with optimization enabled, since the await-transform of cancellation_state_t
is always ready, that all inlines away to a tight loop: https://godbolt.org/z/bPGE4rna9
Some strategies to fix the situation:
replace sleep_for
with an async wait
//sleep_for(1ms);
co_await delay(1ms);
where delay is a simple
asio::awaitable<void> delay(auto dur) {
co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::use_awaitable);
}
As bonus, you don't have to do manual checking of the cancellation state at all anymore, because awaitable<>
has throw_if_cancelled
on by default:
assert(co_await asio::this_coro::throw_if_cancelled());
of course, if sleep_for
was actually the stand in for actual blocking work, you could follow that by a post
:
asio::awaitable<void> delay(auto dur) {
co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::use_awaitable);
}
Both these fix the problem without needing "magic" defer
calls.
Restructured to show the above, with some simplifications, and removing the unneeded DeferredWorkWithTimeout
version:
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace asio::experimental::awaitable_operators;
using namespace std::chrono_literals;
using std::this_thread::sleep_for;
auto constexpr now = std::chrono::steady_clock::now;
asio::awaitable<void> delay(auto dur) {
co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::use_awaitable);
}
asio::awaitable<void> timeout(auto dur) {
co_await delay(dur);
std::cout << "timeout" << std::endl;
}
// Simulate 2s of work, checking cancellation every 1ms
boost::asio::awaitable<void> SyncWork() try {
assert(co_await asio::this_coro::throw_if_cancelled());
for (auto limit = now() + 2s; now() < limit;) {
sleep_for(1ms);
co_await post(co_await asio::this_coro::executor);
//// or, combined:
// co_await delay(1ms);
}
std::cout << "SyncWork - elapsed" << std::endl;
} catch (boost::system::system_error const& se) {
std::cout << "SyncWork - " << se.code().message() << std::endl;
}
auto HandleException() {
return [before = now()](std::exception_ptr p) {
try {
std::cout << "Completion in " << (now() - before) / 1ms << "ms" << std::endl;
if (p)
std::rethrow_exception(p);
else
std::cout << "success" << std::endl;
} catch (std::exception const& e) {
std::cout << e.what() << std::endl;
} catch (...) {
std::cout << "catch" << std::endl;
}
};
}
void task1() {
std::cout << " ----- " << __FUNCTION__ << " start " << std::endl;
asio::thread_pool pool{8};
asio::cancellation_signal signal;
co_spawn(pool, SyncWork(), bind_cancellation_slot(signal.slot(), HandleException()));
sleep_for(1s);
signal.emit(asio::cancellation_type_t::terminal);
pool.join();
std::cout << " ----- " << __FUNCTION__ << " exit\n" << std::endl;
}
static asio::awaitable<void> WorkWithTimeout() { co_await (SyncWork() || timeout(200ms)); }
void task3() {
std::cout << " ----- " << __FUNCTION__ << " start " << std::endl;
asio::thread_pool pool{8};
co_spawn(pool, WorkWithTimeout(), HandleException());
pool.join();
std::cout << " ----- " << __FUNCTION__ << " exit\n" << std::endl;
}
int main() {
task1();
task3();
}
Printing e.g.
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp && ./a.out
----- task1 start
SyncWork - Operation canceled
Completion in 1001ms
success
----- task1 exit
----- task3 start
timeout
SyncWork - Operation canceled
Completion in 202ms
success
----- task3 exit
Perhaps a more natural way to control purely blocking work is e.g. std::stop_token
.
That way you don't even need to make it a coroutine. This could be important in case you can e.g. not afford to be resuming SyncWork
on a different thread.