c++c++20boost-asioc++-coroutine

Boost-asio - issues creating cancellable/restartable tasks


Background/Goal

I'm very much in the early stages of learning asio. My goal currently is to have an io_context manage a number of long-running tasks that will be worked on by a number of worker threads. Each task has the possibility to become out of date/invalid at some point in time, so I'm maintaining an asio::cancellation_signal alongside a future which is used to get results if it did successfully complete.

Progress:

The API I'm trying to implement is something like:

template <class Return> struct CancellableTask {
    // future/cancellation_signal are Non-CopyConstructible.
    // Keep shared_ptrs to allow CopyConstructible-ness
    std::shared_ptr<std::future<Return>> m_task;
    std::shared_ptr<asio::cancellation_signal> m_cancel;

    asio::cancellation_slot cancelSlot() { return m_cancel->slot(); }
    void cancel() { m_cancel->emit(asio::cancellation_type::all); }
};

namespace util {

template <class T, class... Rest> struct WrappedType;

template <template <class, class...> class Wrapper, class T, class... Rest>
struct WrappedType<Wrapper<T, Rest...>> {
    using type = T;
};

} // namespace util

namespace task {

template <class Coroutine> auto spawn(asio::io_context& io, Coroutine&& coroutine) {
    using FutureReturn = util::WrappedType<Coroutine>::type;
    using FutureType = std::future<FutureReturn>;

    CancellableTask<FutureReturn> task{
        std::make_shared<FutureType>(), std::make_shared<asio::cancellation_signal>()
    };

    *task.m_task = co_spawn(
        io,
        std::forward<Coroutine>(coroutine),
        asio::bind_cancellation_slot(task.cancelSlot(), asio::use_future)
    );

    return task;
}
} // namespace task

The usage is as follows:

asio::awaitable<void> sleepTask(std::chrono::seconds duration) try {
    auto ex = co_await asio::this_coro::executor;
    std::cout << "running sleep task with duration: " << duration << std::endl;
    co_await (asio::steady_timer{ex, duration}.async_wait(asio::deferred));
} catch (asio::system_error& err) {
    std::cout << err.what() << std::endl;
}

TEST(LongRunningTask, IsCancellable) {
    asio::io_context io_context;
    auto work_guard = asio::make_work_guard(io_context);
    std::thread runner([&]() { io_context.run(); });

    const auto start = std::chrono::high_resolution_clock::now();
    auto long_running = task::spawn(io_context, sleepTask(10s));

    long_running.cancel();
    const auto end = std::chrono::high_resolution_clock::now();
    const std::chrono::duration<double> diff = end - start;

    work_guard.reset();
    runner.join();
    ASSERT_LT(diff, 1s);
}

This test works fine/as expected. long_running is canceled, and the test ends promptly.

TEST(LongRunningTask, IsRespawnable) {
    asio::io_context io_context;

    auto work_guard = asio::make_work_guard(io_context);
    std::thread runner([&]() {
        io_context.run();
        std::cout << "io_context.run() finished" << std::endl;
    });

    const auto start = std::chrono::high_resolution_clock::now();
    auto long_running = task::spawn(io_context, sleepTask(10s));

    for (auto _ : std::views::iota(0, 5)) {
        std::cout << _ << std::endl;
        long_running.cancel();
        auto copy = long_running;
        ASSERT_EQ(copy.m_cancel, long_running.m_cancel);

        long_running = task::spawn(io_context, sleepTask(1s));

        // Assert pointers transferred..
        ASSERT_NE(copy.m_cancel, long_running.m_cancel);
    }

    work_guard.reset();
    runner.join();
    std::cout << "joined..." << std::endl;
    const auto end = std::chrono::high_resolution_clock::now();
    const std::chrono::duration<double> diff = end - start;

    ASSERT_LT(diff, 1s);
}

This test, however, fails in a segfault. I think the reason is due the awaitable is still sitting around in the io_context, even after the cancellation is signaled. Something in the awaitable (the cancellation_slot perhaps?) might be pointing to memory held by one of my shared_ptrs, which is getting reset before the awaitable has a chance to delete fully.

Sorry for the length of the next section... TL;DR/Question at the bottom.

Things I've tried:

  1. I tried to fix this by having a "wrapper" coroutine, which gets access to copies of the shared_ptr, to make sure that they're ref counted until the awaitable fully deletes.
namespace task {

template <class Coroutine, class Inner = util::WrappedType<Coroutine>::type>
asio::awaitable<Inner> wrapper(
    Coroutine&& coroutine,
    std::shared_ptr<std::future<Inner>>,
    std::shared_ptr<asio::cancellation_signal>
) {
    auto data = co_await std::forward<Coroutine>(coroutine);
    co_return data;
}

template <class Coroutine>
asio::awaitable<void> wrapper(
    Coroutine&& coroutine,
    std::shared_ptr<std::future<void>>,
    std::shared_ptr<asio::cancellation_signal>
) {
    co_await std::forward<Coroutine>(coroutine);
    co_return;
}

template <class Coroutine> auto spawn(asio::io_context& io, Coroutine&& coroutine) {
    using FutureReturn = util::WrappedType<Coroutine>::type;
    using FutureType = std::future<FutureReturn>;

    CancellableTask<FutureReturn> task{
        std::make_shared<FutureType>(), std::make_shared<asio::cancellation_signal>()
    };

    *task.m_task = co_spawn(
        io,
        // Forward coroutine to wrapper, with own-copies of the shared_ptrs
        wrapper(std::forward<Coroutine>(coroutine), task.m_task, task.m_cancel),
        asio::bind_cancellation_slot(task.cancelSlot(), asio::use_future)
    );

    return task;
}
} // namespace task

This did not work at all -- the first IsCancellable test doesn't even work with this.

  1. I tried keeping a shared_ptr to CancellableTask itself, and doing various things to keep it alive, including the wrapper above...
template <class Return>
struct CancellableTask : std::enable_shared_from_this<CancellableTask<Return>> {
// ...
}

namespace task {

template <class Coroutine, class Inner = util::WrappedType<Coroutine>::type>
asio::awaitable<Inner>
wrapper(Coroutine&& coroutine, std::shared_ptr<CancellableTask<Inner>> _) {
    auto data = co_await std::forward<Coroutine>(coroutine);
    co_return data;
}

template <class Coroutine>
asio::awaitable<void>
wrapper(Coroutine&& coroutine, std::shared_ptr<CancellableTask<void>>) {
    co_await std::forward<Coroutine>(coroutine);
    co_return;
}

template <class Coroutine> auto spawn(asio::io_context& io, Coroutine&& coroutine) {
    using FutureReturn = util::WrappedType<Coroutine>::type;
    using FutureType = std::future<FutureReturn>;

    auto task = std::make_shared<CancellableTask<FutureReturn>>();
    task->m_task = std::make_shared<FutureType>();
    task->m_cancel = std::make_shared<asio::cancellation_signal>();

    *task->m_task = co_spawn(
        io,
        wrapper(std::forward<Coroutine>(coroutine), task),
        asio::bind_cancellation_slot(task->cancelSlot(), asio::use_future)
    );

    return task;
}
} // namespace task
// ...

TEST(LongRunningTask, IsRespawnable) {
    asio::io_context io_context;

    auto work_guard = asio::make_work_guard(io_context);
    std::thread runner([&]() {
        io_context.run();
        std::cout << "io_context.run() finished" << std::endl;
    });

    const auto start = std::chrono::high_resolution_clock::now();
    auto long_running = task::spawn(io_context, sleepTask(10s));

    for (auto _ : std::views::iota(0, 5)) {
        std::cout << _ << std::endl;
        long_running->cancel();
        auto copy = long_running;
        ASSERT_EQ(copy->m_cancel, long_running->m_cancel);

        long_running = task::spawn(io_context, sleepTask(1s));

        // Assert pointers transfered..
        ASSERT_NE(copy->m_cancel, long_running->m_cancel);
    }

    work_guard.reset();
    runner.join();
    std::cout << "joined..." << std::endl;
    const auto end = std::chrono::high_resolution_clock::now();
    const std::chrono::duration<double> diff = end - start;

    ASSERT_LT(diff, 1s);
}

In the version above, both tests segfault again... (seemingly because of the coroutine wrapper). Without the coroutine wrapper, it's the same as before...

  1. What DOES work (but I do not necessarily want to do), is storing all copies of CancellableTask in a storage that exists until it is fully cleaned up by the io_context:

TEST(LongRunningTask, IsRespawnable) {
    asio::io_context io_context;

    auto work_guard = asio::make_work_guard(io_context);
    std::thread runner([&]() {
        io_context.run();
        std::cout << "io_context.run() finished" << std::endl;
    });

    const auto start = std::chrono::high_resolution_clock::now();
    CancellableTask<void> long_running;
    std::vector<decltype(long_running)> tasks;
    for (auto _ : std::views::iota(0, 5)) {
        std::cout << _ << std::endl;

        if (ore::task::getStatus(long_running.task()) == ore::task::Status::Running) {
            long_running.cancel();
        }

        tasks.push_back(task::spawn(io_context, sleepTask(1s)));
        long_running = tasks.back();
    }

    long_running.cancel();

    work_guard.reset();
    runner.join();
    std::cout << "joined..." << std::endl;
    const auto end = std::chrono::high_resolution_clock::now();
    const std::chrono::duration<double> diff = end - start;

    ASSERT_LT(diff, 1s);
}

If this were the proper solution, I'm sure I could periodically scan through a list to purge entries that have been fully handled... But surely there is a better way?

Question/TLDR:

What is the best way to accomplish the goal of maintaining a cancellable/restartable asio coroutine-based task?


Solution

  • Having arrived at the TL;DR warning, I came up with this simplified repro. The line

    tmp.get(); // this is required! need to await the task before destroying it
    

    (of course you could also wait for task before moving).

    This simplified version:

    See it Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <ranges>
    namespace asio = boost::asio;
    using boost::system::error_code; // asio::error_code for standalone Asio
    static auto constexpr now = std::chrono::high_resolution_clock::now;
    using namespace std::chrono_literals;
    
    template <class T> struct CancellableTask {
        struct Impl { // non-copyables:
            std::future<T>            m_task;
            asio::cancellation_signal m_cancel;
        };
        std::unique_ptr<Impl> impl_ = std::make_unique<Impl>();
    
        void cancel() const { impl_->m_cancel.emit(asio::cancellation_type::all); }
        T    get() const { return impl_->m_task.get(); }
    
        template <typename Executor, typename Coro> CancellableTask(Executor io, Coro&& f) {
            impl_->m_task = co_spawn(                                            //
                io, std::forward<Coro>(f),                                       //
                bind_cancellation_slot(impl_->m_cancel.slot(), asio::use_future) //
            );
        }
    };
    
    namespace task {
        template <typename Executor, typename Coro> auto spawn(Executor io, Coro&& f) {
            using F = decltype(co_spawn(io, std::forward<Coro>(f), asio::use_future));
            using T = decltype(std::declval<F>().get());
            return CancellableTask<T>(io, std::forward<Coro>(f));
        }
    } // namespace task
    
    asio::awaitable<void> sleepTask(auto duration) {
        error_code ec;
        co_await asio::steady_timer(co_await asio::this_coro::executor, duration)
            .async_wait(asio::redirect_error(ec));
        std::cout << " -> sleepTask(" << duration / 1.s << "s) completed: " << ec.message() << std::endl;
    }
    
    void testLongRunningTaskIsCancellable() {
        asio::thread_pool ioc(1);
    
        auto            start = now();
        CancellableTask task  = task::spawn(ioc.get_executor(), sleepTask(10s));
    
        std::this_thread::sleep_for(100ms);
    
        task.cancel();
    
        task.get();
        assert(now() - start < 1s);
    
        ioc.join();
        assert(now() - start < 1s);
    }
    
    void testLongRunningTaskIsRespawnable() {
        asio::thread_pool ioc(1);
    
        auto            start = now();
        CancellableTask task  = task::spawn(ioc.get_executor(), sleepTask(10s));
    
        for (auto _ : std::views::iota(0, 5)) {
            std::cout << _ << std::endl;
            task.cancel();
            auto tmp = std::move(task);
            // tmp.cancel(); // alternatively
            tmp.get(); // this is required! need to await the task before destroying it
    
            task = task::spawn(ioc.get_executor(), sleepTask(1s));
    
            // Assert pointers transferred..
            assert(tmp.impl_ != task.impl_);
        }
    
        task.cancel();
        ioc.join();
    
        std::cout << "joined..." << std::endl;
    
        assert(now() - start < 1s);
    }
    
    int main() {
        testLongRunningTaskIsCancellable();
        testLongRunningTaskIsRespawnable();
        std::cout << "Test completed successfully." << std::endl;
    }
    

    Printing:

     -> sleepTask(10s) completed: Operation canceled
    0
     -> sleepTask(10s) completed: Operation canceled
    1
     -> sleepTask(1s) completed: Operation canceled
    2
     -> sleepTask(1s) completed: Operation canceled
    3
     -> sleepTask(1s) completed: Operation canceled
    4
     -> sleepTask(1s) completed: Operation canceled
     -> sleepTask(1s) completed: Operation canceled
    joined...
    Test completed successfully.
    

    BUT WHAT IF ...

    We actually wanted to "detach" the future and let Asio clean up instead?

    In that case I'd indeed suggest sharing the implementation details, and consigning the shared pointer to the handler posted.

    template <class T> struct CancellableTask {
        struct Impl { // non-copyables:
            std::future<T>            m_task;
            asio::cancellation_signal m_cancel;
        };
        std::shared_ptr<Impl> impl_ = std::make_shared<Impl>();
    
        void cancel() const { impl_->m_cancel.emit(asio::cancellation_type::all); }
        T    get() const { return impl_->m_task.get(); }
        auto ref() const { return std::static_pointer_cast<void const>(impl_); } // opaque
    
        template <typename Executor, typename Coro> CancellableTask(Executor io, Coro&& f) {
            impl_->m_task = co_spawn(                                                            //
                io, std::forward<Coro>(f),                                                       //
                consign(bind_cancellation_slot(impl_->m_cancel.slot(), asio::use_future), ref()) //
            );
        }
    };
    

    Note how the opaque shared_ptr<void const> helps limit template instantiations if many different CancellableTask types are instantiated. Also it expresses the intent that ONLY ownership is shared, no details are accessible.

    Now things work out fine without explicitly awaiting the usurped/canceled tasks:

        for (auto _ : std::views::iota(0, 5)) {
            std::cout << _ << std::endl;
    
            task.cancel();
            task = task::spawn(ioc.get_executor(), sleepTask(1s));
        }
    

    However, you could go the extra mile and make CancellableTask self-cancel on destruction and assignment:

    ~CancellableTask() { cancel(); }
    
    CancellableTask& operator=(CancellableTask rhs) {
        rhs.impl_.swap(impl_);
        return *this;
    }
    

    Now it becomes: Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <ranges>
    namespace asio = boost::asio;
    using boost::system::error_code; // asio::error_code for standalone Asio
    static auto constexpr now = std::chrono::high_resolution_clock::now;
    using namespace std::chrono_literals;
    
    template <class T> struct CancellableTask {
        struct Impl { // non-copyables:
            std::future<T>            m_task;
            asio::cancellation_signal m_cancel;
        };
        std::shared_ptr<Impl> impl_ = std::make_shared<Impl>();
    
        ~CancellableTask() { cancel(); }
    
        CancellableTask& operator=(CancellableTask rhs) {
            rhs.impl_.swap(impl_);
            return *this;
        }
    
        void cancel() {
            if (auto tmp = std::exchange(impl_, nullptr)) {
                std::cout << "cancelling task..." << std::endl;
                tmp->m_cancel.emit(asio::cancellation_type::all);
            } else {
                std::cout << "task already cancelled" << std::endl;
            }
        }
    
        T get() const {
            if (!impl_)
                throw std::future_error(std::future_errc::no_state);
            return impl_->m_task.get();
        }
    
        auto ref() const { // opaque
            return std::static_pointer_cast<void const>(impl_);
        }
    
        template <typename Executor, typename Coro> CancellableTask(Executor io, Coro&& f) {
            impl_->m_task = co_spawn(                                                            //
                io, std::forward<Coro>(f),                                                       //
                consign(bind_cancellation_slot(impl_->m_cancel.slot(), asio::use_future), ref()) //
            );
        }
    };
    
    namespace task {
        template <typename Executor, typename Coro> auto spawn(Executor io, Coro&& f) {
            using F = decltype(co_spawn(io, std::forward<Coro>(f), asio::use_future));
            using T = decltype(std::declval<F>().get());
            return CancellableTask<T>(io, std::forward<Coro>(f));
        }
    } // namespace task
    
    asio::awaitable<void> sleepTask(auto duration) {
        error_code ec;
        co_await asio::steady_timer(co_await asio::this_coro::executor, duration)
            .async_wait(asio::redirect_error(ec));
        std::cout << " -> sleepTask(" << duration / 1.s << "s) completed: " << ec.message() << std::endl;
    }
    namespace Tests::LongRunningTasks {
        void Cancellable() {
            asio::thread_pool ioc(1);
    
            auto            start = now();
            CancellableTask task  = task::spawn(ioc.get_executor(), sleepTask(10s));
    
            std::this_thread::sleep_for(100ms);
    
            task.cancel();
    
            // semantics of cancellation changed to release the future, task.get() would throw now
    
            ioc.join();
            assert(now() - start < 1s);
        }
    
        void Respawnable() {
            asio::thread_pool ioc(1);
    
            auto start = now();
            {
                CancellableTask task = task::spawn(ioc.get_executor(), sleepTask(10s));
    
                for (auto i : std::views::iota(0, 5)) {
                    std::cout << i << std::endl;
                    task = task::spawn(ioc.get_executor(), sleepTask(i * 1s));
                }
            } // auto-cancels task on destruction
            ioc.join();
    
            std::cout << "joined..." << std::endl;
    
            assert(now() - start < 1s);
        }
    } // namespace Tests::LongRunningTasks
    
    int main() {
        Tests::LongRunningTasks::Cancellable();
        Tests::LongRunningTasks::Respawnable();
        std::cout << "Test completed successfully." << std::endl;
    }
    

    Printing

    cancelling task...
     -> sleepTask(10s) completed: Operation canceled
    task already cancelled
    0
    cancelling task...
    1
    cancelling task...
    2
    cancelling task...
    3
    cancelling task...
    4
    cancelling task...
    cancelling task...
     -> sleepTask(10s) completed: Operation canceled
     -> sleepTask(0s) completed: Operation canceled
     -> sleepTask(1s) completed: Operation canceled
     -> sleepTask(2s) completed: Operation canceled
     -> sleepTask(3s) completed: Operation canceled
     -> sleepTask(4s) completed: Operation canceled
    joined...
    Test completed successfully.
    

    BONUS

    I wrote my answer before reading anything beyond the TL;DR warning as mentioned. Regarding your attempts: the wrapping coro probably needs to manually forward any cancellations. An example of forwarding connected cancellation slots can bee seen in this answer: (Boost) ASIO custom timeout token and custom async operation cancellation propagation