I'm very much in the early stages of learning asio. My goal currently is to have an io_context manage a number of long-running tasks that will be worked on by a number of worker threads. Each task has the possibility to become out of date/invalid at some point in time, so I'm maintaining an asio::cancellation_signal alongside a future which is used to get results if it did successfully complete.
The API I'm trying to implement is something like:
template <class Return> struct CancellableTask {
// future/cancellation_signal are Non-CopyConstructible.
// Keep shared_ptrs to allow CopyConstructible-ness
std::shared_ptr<std::future<Return>> m_task;
std::shared_ptr<asio::cancellation_signal> m_cancel;
asio::cancellation_slot cancelSlot() { return m_cancel->slot(); }
void cancel() { m_cancel->emit(asio::cancellation_type::all); }
};
namespace util {
template <class T, class... Rest> struct WrappedType;
template <template <class, class...> class Wrapper, class T, class... Rest>
struct WrappedType<Wrapper<T, Rest...>> {
using type = T;
};
} // namespace util
namespace task {
template <class Coroutine> auto spawn(asio::io_context& io, Coroutine&& coroutine) {
using FutureReturn = util::WrappedType<Coroutine>::type;
using FutureType = std::future<FutureReturn>;
CancellableTask<FutureReturn> task{
std::make_shared<FutureType>(), std::make_shared<asio::cancellation_signal>()
};
*task.m_task = co_spawn(
io,
std::forward<Coroutine>(coroutine),
asio::bind_cancellation_slot(task.cancelSlot(), asio::use_future)
);
return task;
}
} // namespace task
The usage is as follows:
asio::awaitable<void> sleepTask(std::chrono::seconds duration) try {
auto ex = co_await asio::this_coro::executor;
std::cout << "running sleep task with duration: " << duration << std::endl;
co_await (asio::steady_timer{ex, duration}.async_wait(asio::deferred));
} catch (asio::system_error& err) {
std::cout << err.what() << std::endl;
}
TEST(LongRunningTask, IsCancellable) {
asio::io_context io_context;
auto work_guard = asio::make_work_guard(io_context);
std::thread runner([&]() { io_context.run(); });
const auto start = std::chrono::high_resolution_clock::now();
auto long_running = task::spawn(io_context, sleepTask(10s));
long_running.cancel();
const auto end = std::chrono::high_resolution_clock::now();
const std::chrono::duration<double> diff = end - start;
work_guard.reset();
runner.join();
ASSERT_LT(diff, 1s);
}
This test works fine/as expected. long_running
is canceled, and the test ends promptly.
TEST(LongRunningTask, IsRespawnable) {
asio::io_context io_context;
auto work_guard = asio::make_work_guard(io_context);
std::thread runner([&]() {
io_context.run();
std::cout << "io_context.run() finished" << std::endl;
});
const auto start = std::chrono::high_resolution_clock::now();
auto long_running = task::spawn(io_context, sleepTask(10s));
for (auto _ : std::views::iota(0, 5)) {
std::cout << _ << std::endl;
long_running.cancel();
auto copy = long_running;
ASSERT_EQ(copy.m_cancel, long_running.m_cancel);
long_running = task::spawn(io_context, sleepTask(1s));
// Assert pointers transferred..
ASSERT_NE(copy.m_cancel, long_running.m_cancel);
}
work_guard.reset();
runner.join();
std::cout << "joined..." << std::endl;
const auto end = std::chrono::high_resolution_clock::now();
const std::chrono::duration<double> diff = end - start;
ASSERT_LT(diff, 1s);
}
This test, however, fails in a segfault. I think the reason is due the awaitable
is still sitting around in the io_context
, even after the cancellation is signaled. Something in the awaitable (the cancellation_slot
perhaps?) might be pointing to memory held by one of my shared_ptrs, which is getting reset before the awaitable has a chance to delete fully.
Sorry for the length of the next section... TL;DR/Question at the bottom.
namespace task {
template <class Coroutine, class Inner = util::WrappedType<Coroutine>::type>
asio::awaitable<Inner> wrapper(
Coroutine&& coroutine,
std::shared_ptr<std::future<Inner>>,
std::shared_ptr<asio::cancellation_signal>
) {
auto data = co_await std::forward<Coroutine>(coroutine);
co_return data;
}
template <class Coroutine>
asio::awaitable<void> wrapper(
Coroutine&& coroutine,
std::shared_ptr<std::future<void>>,
std::shared_ptr<asio::cancellation_signal>
) {
co_await std::forward<Coroutine>(coroutine);
co_return;
}
template <class Coroutine> auto spawn(asio::io_context& io, Coroutine&& coroutine) {
using FutureReturn = util::WrappedType<Coroutine>::type;
using FutureType = std::future<FutureReturn>;
CancellableTask<FutureReturn> task{
std::make_shared<FutureType>(), std::make_shared<asio::cancellation_signal>()
};
*task.m_task = co_spawn(
io,
// Forward coroutine to wrapper, with own-copies of the shared_ptrs
wrapper(std::forward<Coroutine>(coroutine), task.m_task, task.m_cancel),
asio::bind_cancellation_slot(task.cancelSlot(), asio::use_future)
);
return task;
}
} // namespace task
This did not work at all -- the first IsCancellable
test doesn't even work with this.
shared_ptr
to CancellableTask
itself, and doing various things to keep it alive, including the wrapper above...template <class Return>
struct CancellableTask : std::enable_shared_from_this<CancellableTask<Return>> {
// ...
}
namespace task {
template <class Coroutine, class Inner = util::WrappedType<Coroutine>::type>
asio::awaitable<Inner>
wrapper(Coroutine&& coroutine, std::shared_ptr<CancellableTask<Inner>> _) {
auto data = co_await std::forward<Coroutine>(coroutine);
co_return data;
}
template <class Coroutine>
asio::awaitable<void>
wrapper(Coroutine&& coroutine, std::shared_ptr<CancellableTask<void>>) {
co_await std::forward<Coroutine>(coroutine);
co_return;
}
template <class Coroutine> auto spawn(asio::io_context& io, Coroutine&& coroutine) {
using FutureReturn = util::WrappedType<Coroutine>::type;
using FutureType = std::future<FutureReturn>;
auto task = std::make_shared<CancellableTask<FutureReturn>>();
task->m_task = std::make_shared<FutureType>();
task->m_cancel = std::make_shared<asio::cancellation_signal>();
*task->m_task = co_spawn(
io,
wrapper(std::forward<Coroutine>(coroutine), task),
asio::bind_cancellation_slot(task->cancelSlot(), asio::use_future)
);
return task;
}
} // namespace task
// ...
TEST(LongRunningTask, IsRespawnable) {
asio::io_context io_context;
auto work_guard = asio::make_work_guard(io_context);
std::thread runner([&]() {
io_context.run();
std::cout << "io_context.run() finished" << std::endl;
});
const auto start = std::chrono::high_resolution_clock::now();
auto long_running = task::spawn(io_context, sleepTask(10s));
for (auto _ : std::views::iota(0, 5)) {
std::cout << _ << std::endl;
long_running->cancel();
auto copy = long_running;
ASSERT_EQ(copy->m_cancel, long_running->m_cancel);
long_running = task::spawn(io_context, sleepTask(1s));
// Assert pointers transfered..
ASSERT_NE(copy->m_cancel, long_running->m_cancel);
}
work_guard.reset();
runner.join();
std::cout << "joined..." << std::endl;
const auto end = std::chrono::high_resolution_clock::now();
const std::chrono::duration<double> diff = end - start;
ASSERT_LT(diff, 1s);
}
In the version above, both tests segfault again... (seemingly because of the coroutine wrapper). Without the coroutine wrapper, it's the same as before...
TEST(LongRunningTask, IsRespawnable) {
asio::io_context io_context;
auto work_guard = asio::make_work_guard(io_context);
std::thread runner([&]() {
io_context.run();
std::cout << "io_context.run() finished" << std::endl;
});
const auto start = std::chrono::high_resolution_clock::now();
CancellableTask<void> long_running;
std::vector<decltype(long_running)> tasks;
for (auto _ : std::views::iota(0, 5)) {
std::cout << _ << std::endl;
if (ore::task::getStatus(long_running.task()) == ore::task::Status::Running) {
long_running.cancel();
}
tasks.push_back(task::spawn(io_context, sleepTask(1s)));
long_running = tasks.back();
}
long_running.cancel();
work_guard.reset();
runner.join();
std::cout << "joined..." << std::endl;
const auto end = std::chrono::high_resolution_clock::now();
const std::chrono::duration<double> diff = end - start;
ASSERT_LT(diff, 1s);
}
If this were the proper solution, I'm sure I could periodically scan through a list to purge entries that have been fully handled... But surely there is a better way?
What is the best way to accomplish the goal of maintaining a cancellable/restartable asio coroutine-based task?
Having arrived at the TL;DR
warning, I came up with this simplified repro. The line
tmp.get(); // this is required! need to await the task before destroying it
(of course you could also wait for task
before moving).
This simplified version:
WrappedType
detectorCancellableTask
move-only instead of copyable; logically a future is simply never copyable. What shared_ptr
does is ... making things shared, not copied.See it Live On Coliru
#include <boost/asio.hpp>
#include <iostream>
#include <ranges>
namespace asio = boost::asio;
using boost::system::error_code; // asio::error_code for standalone Asio
static auto constexpr now = std::chrono::high_resolution_clock::now;
using namespace std::chrono_literals;
template <class T> struct CancellableTask {
struct Impl { // non-copyables:
std::future<T> m_task;
asio::cancellation_signal m_cancel;
};
std::unique_ptr<Impl> impl_ = std::make_unique<Impl>();
void cancel() const { impl_->m_cancel.emit(asio::cancellation_type::all); }
T get() const { return impl_->m_task.get(); }
template <typename Executor, typename Coro> CancellableTask(Executor io, Coro&& f) {
impl_->m_task = co_spawn( //
io, std::forward<Coro>(f), //
bind_cancellation_slot(impl_->m_cancel.slot(), asio::use_future) //
);
}
};
namespace task {
template <typename Executor, typename Coro> auto spawn(Executor io, Coro&& f) {
using F = decltype(co_spawn(io, std::forward<Coro>(f), asio::use_future));
using T = decltype(std::declval<F>().get());
return CancellableTask<T>(io, std::forward<Coro>(f));
}
} // namespace task
asio::awaitable<void> sleepTask(auto duration) {
error_code ec;
co_await asio::steady_timer(co_await asio::this_coro::executor, duration)
.async_wait(asio::redirect_error(ec));
std::cout << " -> sleepTask(" << duration / 1.s << "s) completed: " << ec.message() << std::endl;
}
void testLongRunningTaskIsCancellable() {
asio::thread_pool ioc(1);
auto start = now();
CancellableTask task = task::spawn(ioc.get_executor(), sleepTask(10s));
std::this_thread::sleep_for(100ms);
task.cancel();
task.get();
assert(now() - start < 1s);
ioc.join();
assert(now() - start < 1s);
}
void testLongRunningTaskIsRespawnable() {
asio::thread_pool ioc(1);
auto start = now();
CancellableTask task = task::spawn(ioc.get_executor(), sleepTask(10s));
for (auto _ : std::views::iota(0, 5)) {
std::cout << _ << std::endl;
task.cancel();
auto tmp = std::move(task);
// tmp.cancel(); // alternatively
tmp.get(); // this is required! need to await the task before destroying it
task = task::spawn(ioc.get_executor(), sleepTask(1s));
// Assert pointers transferred..
assert(tmp.impl_ != task.impl_);
}
task.cancel();
ioc.join();
std::cout << "joined..." << std::endl;
assert(now() - start < 1s);
}
int main() {
testLongRunningTaskIsCancellable();
testLongRunningTaskIsRespawnable();
std::cout << "Test completed successfully." << std::endl;
}
Printing:
-> sleepTask(10s) completed: Operation canceled
0
-> sleepTask(10s) completed: Operation canceled
1
-> sleepTask(1s) completed: Operation canceled
2
-> sleepTask(1s) completed: Operation canceled
3
-> sleepTask(1s) completed: Operation canceled
4
-> sleepTask(1s) completed: Operation canceled
-> sleepTask(1s) completed: Operation canceled
joined...
Test completed successfully.
We actually wanted to "detach" the future and let Asio clean up instead?
In that case I'd indeed suggest sharing the implementation details, and consigning the shared pointer to the handler posted.
template <class T> struct CancellableTask {
struct Impl { // non-copyables:
std::future<T> m_task;
asio::cancellation_signal m_cancel;
};
std::shared_ptr<Impl> impl_ = std::make_shared<Impl>();
void cancel() const { impl_->m_cancel.emit(asio::cancellation_type::all); }
T get() const { return impl_->m_task.get(); }
auto ref() const { return std::static_pointer_cast<void const>(impl_); } // opaque
template <typename Executor, typename Coro> CancellableTask(Executor io, Coro&& f) {
impl_->m_task = co_spawn( //
io, std::forward<Coro>(f), //
consign(bind_cancellation_slot(impl_->m_cancel.slot(), asio::use_future), ref()) //
);
}
};
Note how the opaque
shared_ptr<void const>
helps limit template instantiations if many differentCancellableTask
types are instantiated. Also it expresses the intent that ONLY ownership is shared, no details are accessible.
Now things work out fine without explicitly awaiting the usurped/canceled tasks:
for (auto _ : std::views::iota(0, 5)) {
std::cout << _ << std::endl;
task.cancel();
task = task::spawn(ioc.get_executor(), sleepTask(1s));
}
However, you could go the extra mile and make CancellableTask
self-cancel on destruction and assignment:
~CancellableTask() { cancel(); }
CancellableTask& operator=(CancellableTask rhs) {
rhs.impl_.swap(impl_);
return *this;
}
Now it becomes: Live On Coliru
#include <boost/asio.hpp>
#include <iostream>
#include <ranges>
namespace asio = boost::asio;
using boost::system::error_code; // asio::error_code for standalone Asio
static auto constexpr now = std::chrono::high_resolution_clock::now;
using namespace std::chrono_literals;
template <class T> struct CancellableTask {
struct Impl { // non-copyables:
std::future<T> m_task;
asio::cancellation_signal m_cancel;
};
std::shared_ptr<Impl> impl_ = std::make_shared<Impl>();
~CancellableTask() { cancel(); }
CancellableTask& operator=(CancellableTask rhs) {
rhs.impl_.swap(impl_);
return *this;
}
void cancel() {
if (auto tmp = std::exchange(impl_, nullptr)) {
std::cout << "cancelling task..." << std::endl;
tmp->m_cancel.emit(asio::cancellation_type::all);
} else {
std::cout << "task already cancelled" << std::endl;
}
}
T get() const {
if (!impl_)
throw std::future_error(std::future_errc::no_state);
return impl_->m_task.get();
}
auto ref() const { // opaque
return std::static_pointer_cast<void const>(impl_);
}
template <typename Executor, typename Coro> CancellableTask(Executor io, Coro&& f) {
impl_->m_task = co_spawn( //
io, std::forward<Coro>(f), //
consign(bind_cancellation_slot(impl_->m_cancel.slot(), asio::use_future), ref()) //
);
}
};
namespace task {
template <typename Executor, typename Coro> auto spawn(Executor io, Coro&& f) {
using F = decltype(co_spawn(io, std::forward<Coro>(f), asio::use_future));
using T = decltype(std::declval<F>().get());
return CancellableTask<T>(io, std::forward<Coro>(f));
}
} // namespace task
asio::awaitable<void> sleepTask(auto duration) {
error_code ec;
co_await asio::steady_timer(co_await asio::this_coro::executor, duration)
.async_wait(asio::redirect_error(ec));
std::cout << " -> sleepTask(" << duration / 1.s << "s) completed: " << ec.message() << std::endl;
}
namespace Tests::LongRunningTasks {
void Cancellable() {
asio::thread_pool ioc(1);
auto start = now();
CancellableTask task = task::spawn(ioc.get_executor(), sleepTask(10s));
std::this_thread::sleep_for(100ms);
task.cancel();
// semantics of cancellation changed to release the future, task.get() would throw now
ioc.join();
assert(now() - start < 1s);
}
void Respawnable() {
asio::thread_pool ioc(1);
auto start = now();
{
CancellableTask task = task::spawn(ioc.get_executor(), sleepTask(10s));
for (auto i : std::views::iota(0, 5)) {
std::cout << i << std::endl;
task = task::spawn(ioc.get_executor(), sleepTask(i * 1s));
}
} // auto-cancels task on destruction
ioc.join();
std::cout << "joined..." << std::endl;
assert(now() - start < 1s);
}
} // namespace Tests::LongRunningTasks
int main() {
Tests::LongRunningTasks::Cancellable();
Tests::LongRunningTasks::Respawnable();
std::cout << "Test completed successfully." << std::endl;
}
Printing
cancelling task...
-> sleepTask(10s) completed: Operation canceled
task already cancelled
0
cancelling task...
1
cancelling task...
2
cancelling task...
3
cancelling task...
4
cancelling task...
cancelling task...
-> sleepTask(10s) completed: Operation canceled
-> sleepTask(0s) completed: Operation canceled
-> sleepTask(1s) completed: Operation canceled
-> sleepTask(2s) completed: Operation canceled
-> sleepTask(3s) completed: Operation canceled
-> sleepTask(4s) completed: Operation canceled
joined...
Test completed successfully.
I wrote my answer before reading anything beyond the TL;DR warning as mentioned. Regarding your attempts: the wrapping coro probably needs to manually forward any cancellations. An example of forwarding connected cancellation slots can bee seen in this answer: (Boost) ASIO custom timeout token and custom async operation cancellation propagation