c++boostc++20boost-asiocoroutine

boost.asio : how to cancel a synchronous task


The following code simulates short bursts of work by sleeping 1ms at a time in order to achieve a cancellable task that takes a total of 2s. That work is then launched in three different ways.

  1. The first co_spawn (labelled Task-1) explicitly uses bind_cancellation_slot and successfully cancels the work after 1s.
  2. The second co_spawn uses awaitable_operators and steady_timer::async_wait to deliver the cancellation after 200ms, but that attempt fails, resulting in the full 2s execution duration.
  3. The third co_spawn uses a variation of the work that explicitly uses boost::asio::defer before starting the work to resolve the issue with the previous attempt.

Is the third attempt at using co_spawn correct? It seems wrong to me that my task's implementation needs to defensively defer at the very beginning in order to work with the awaitable_operators. Is there a better way to implement cancellation of such a long running task?

As a sidenote: I am aware that using co_await on an asynchronous task inside SyncWork would enable cancellation at suspension points. But I really am looking for a way to be more proactive with cancellation in between suspension points. I believe this issue is mostly relevant to the period of time when the coroutine starts and before the first suspension point is eventually hit.

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>

// Simulate work that lasts for 2s, and that checks for cancellation at every 1ms
boost::asio::awaitable<void>
SyncWork()
{
    using namespace std::chrono_literals;
    auto before = std::chrono::steady_clock::now();
    auto limit = before + 2s;
    while (std::chrono::steady_clock::now() < limit) {
        auto cs = co_await boost::asio::this_coro::cancellation_state;
        if (cs.cancelled() != boost::asio::cancellation_type::none) break;
        std::this_thread::sleep_for(1ms);
    }
    std::cout << "SyncWork - elapsed: " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - before).count() << "ms\n";
}

boost::asio::awaitable<void>
DeferredSyncWork()
{
    auto executor = co_await boost::asio::this_coro::executor;
    co_await boost::asio::defer(executor, boost::asio::use_awaitable);
    
    co_await SyncWork();
}

boost::asio::awaitable<void>
timeout()
{
    auto executor = co_await boost::asio::this_coro::executor;
    boost::asio::steady_timer timer{executor};
    timer.expires_from_now(std::chrono::milliseconds{200});
    co_await timer.async_wait(boost::asio::use_awaitable);
    std::cout << "timeout\n";
}

boost::asio::awaitable<void>
WorkWithTimeout()
{
    using namespace boost::asio::experimental::awaitable_operators;
    co_await (SyncWork() || timeout());
}

boost::asio::awaitable<void>
DeferredWorkWithTimeout()
{
    using namespace boost::asio::experimental::awaitable_operators;
    co_await (DeferredSyncWork() || timeout());
}

void HandleException(std::exception_ptr p)
{
    if (p) {
        try {
            std::rethrow_exception(p);
        } catch (const std::exception& e) {
            std::cout << e.what() << '\n';
        } catch (...) {
            std::cout << "catch\n";
        }
    } else {
        std::cout << "success\n";
    }
}

int main()
{
    boost::asio::thread_pool pool{8};
    
    boost::asio::cancellation_signal signal;
    boost::asio::co_spawn(pool, SyncWork(), boost::asio::bind_cancellation_slot(signal.slot(), HandleException)); // Task-1
    boost::asio::co_spawn(pool, DeferredWorkWithTimeout(), HandleException); // Task-2
    boost::asio::co_spawn(pool, WorkWithTimeout(), HandleException); // Task-3
    
    std::this_thread::sleep_for(std::chrono::seconds{1});
    signal.emit(boost::asio::cancellation_type_t::terminal);
    
    pool.join();
}

Here is the program output:

timeout
SyncWork - elapsed: 202ms
success
SyncWork - elapsed: 1005ms
success
SyncWork - elapsed: 2000ms
success
Program ended with exit code: 0

Solution

  • Is the third attempt at using co_spawn correct? It seems wrong to me that my task's implementation needs to defensively defer at the very beginning in order to work with the awaitable_operators.

    No that has no effect. You simply incur some runtime overhead to land back on the same executor you were already on.

    The Problem

    Your problem is that you are ... running a blocking function. Also, some other thread is emit()-ing a signal, but there's no synchronization in place. It looks like with optimization enabled, since the await-transform of cancellation_state_t is always ready, that all inlines away to a tight loop: https://godbolt.org/z/bPGE4rna9

    Some strategies to fix the situation:

    1. replace sleep_for with an async wait

      //sleep_for(1ms);
      co_await delay(1ms);
      

      where delay is a simple

      asio::awaitable<void> delay(auto dur) {
          co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::use_awaitable);
      }
      

      As bonus, you don't have to do manual checking of the cancellation state at all anymore, because awaitable<> has throw_if_cancelled on by default:

      assert(co_await asio::this_coro::throw_if_cancelled());
      
    2. of course, if sleep_for was actually the stand in for actual blocking work, you could follow that by a post:

      asio::awaitable<void> delay(auto dur) {
          co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::use_awaitable);
      }
      

    Both these fix the problem without needing "magic" defer calls.

    DEMO

    Restructured to show the above, with some simplifications, and removing the unneeded DeferredWorkWithTimeout version:

    Live On Compiler Explorer

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/experimental/awaitable_operators.hpp>
    #include <iostream>
    namespace asio = boost::asio;
    using namespace asio::experimental::awaitable_operators;
    using namespace std::chrono_literals;
    using std::this_thread::sleep_for;
    auto constexpr now = std::chrono::steady_clock::now;
    
    asio::awaitable<void> delay(auto dur) {
        co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::use_awaitable);
    }
    
    asio::awaitable<void> timeout(auto dur) {
        co_await delay(dur);
        std::cout << "timeout" << std::endl;
    }
    
    // Simulate 2s of work, checking cancellation every 1ms
    boost::asio::awaitable<void> SyncWork() try {
        assert(co_await asio::this_coro::throw_if_cancelled());
    
        for (auto limit = now() + 2s; now() < limit;) {
            sleep_for(1ms);
            co_await post(co_await asio::this_coro::executor);
    
            //// or, combined:
            // co_await delay(1ms);
        }
    
        std::cout << "SyncWork - elapsed" << std::endl;
    } catch (boost::system::system_error const& se) {
        std::cout << "SyncWork - " << se.code().message() << std::endl;
    }
    
    auto HandleException() {
        return [before = now()](std::exception_ptr p) {
            try {
                std::cout << "Completion in " << (now() - before) / 1ms << "ms" << std::endl;
                if (p)
                    std::rethrow_exception(p);
                else
                    std::cout << "success" << std::endl;
            } catch (std::exception const& e) {
                std::cout << e.what() << std::endl;
            } catch (...) {
                std::cout << "catch" << std::endl;
            }
        };
    }
    
    void task1() {
        std::cout << " ----- " << __FUNCTION__ << " start " << std::endl;
        asio::thread_pool pool{8};
    
        asio::cancellation_signal signal;
        co_spawn(pool, SyncWork(), bind_cancellation_slot(signal.slot(), HandleException()));
    
        sleep_for(1s);
        signal.emit(asio::cancellation_type_t::terminal);
    
        pool.join();
        std::cout << " ----- " << __FUNCTION__ << " exit\n" << std::endl;
    }
    
    static asio::awaitable<void> WorkWithTimeout() { co_await (SyncWork() || timeout(200ms)); }
    
    void task3() {
        std::cout << " ----- " << __FUNCTION__ << " start " << std::endl;
        asio::thread_pool pool{8};
    
        co_spawn(pool, WorkWithTimeout(), HandleException());
    
        pool.join();
        std::cout << " ----- " << __FUNCTION__ << " exit\n" << std::endl;
    }
    
    int main() {
        task1();
        task3();
    }
    

    Printing e.g.

    g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp && ./a.out
     ----- task1 start 
    SyncWork - Operation canceled
    Completion in 1001ms
    success
     ----- task1 exit
    
     ----- task3 start 
    timeout
    SyncWork - Operation canceled
    Completion in 202ms
    success
     ----- task3 exit
    

    Side Notes

    Perhaps a more natural way to control purely blocking work is e.g. std::stop_token. That way you don't even need to make it a coroutine. This could be important in case you can e.g. not afford to be resuming SyncWork on a different thread.