I'm trying to execute a bunch of boost::asio::awaitable co-routines that I don't know the exact amount of at runtime (it varies based on conditions). I need them to run in parallel AND I need them to all cancel and return an error if the whole operation times out.
I have come up with the following code+pseudo code to demonstrate what I'm trying to do.
Note: Please feel free to critique my deadline timer idea. I couldn't think of anything cleaner.
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/detached.hpp>
#include <iostream>
#include <chrono>
#include <system_error>
using namespace std::chrono_literals;
using namespace boost::asio::experimental::awaitable_operators;
boost::asio::awaitable<void> do_work( int index, std::chrono::steady_clock::time_point tp )
{
std::cout << "Starting work." << std::endl;
boost::asio::steady_timer timer( co_await boost::asio::this_coro::executor );
timer.expires_at( tp );
co_await timer.async_wait( boost::asio::deferred );
std::cout << "Work " << index << " finished!" << std::endl;
}
boost::asio::awaitable<void> test()
{
try
{
auto executor = co_await boost::asio::this_coro::executor;
boost::asio::steady_timer deadline_timer( executor );
auto schedule_deadline = [ &deadline_timer ]( std::chrono::steady_clock::time_point tp ) -> boost::asio::awaitable<void>
{
std::cout << "Starting deadline" << std::endl;
try
{
deadline_timer.expires_at( tp );
co_await deadline_timer.async_wait( boost::asio::deferred );
throw boost::system::system_error( std::make_error_code( std::errc::timed_out ) );
}
catch( const boost::system::system_error& e )
{
if( e.code() == boost::asio::error::operation_aborted )
std::cout << "Timer was canceled." << std::endl;
else
std::cerr << "Unexpected error: " << e.what() << std::endl;
}
std::cout << "Ending deadline" << std::endl;
};
auto execute_coro = [ &deadline_timer ]( auto &&coro ) -> boost::asio::awaitable<void>
{
co_await std::forward<decltype(coro)>(coro);
deadline_timer.cancel();
};
std::cout << "Scenario 1." << std::endl;
auto now = std::chrono::steady_clock::now();
// Pesudo code from here!!!
// Collection of N co-routine tasks
std::vector<awaitables> coros;
// Add these all in a loop (no loop in this example)
coros.push_back( do_work( 1, now + 1s ) );
coros.push_back( do_work( 2, now + 2s ) );
coros.push_back( do_work( 3, now + 3s ) );
// Await them all and let them execute in parallel
auto all_coros = parallel_coros_group( coros );
co_await (
// Wrap in a timer canceling function
execute_coro( all_coros ) &&
// Schedule a deadline
schedule_deadline( now + 5s )
);
}
catch( const std::exception &ex )
{
std::cout << "Exception: " << ex.what() << std::endl;
}
}
int main()
{
boost::asio::thread_pool ioc(1);
co_spawn(
ioc,
test(),
boost::asio::detached
);
ioc.join();
}
Is something like this possible and how can it be done?
You can use the ranged make_parallel_group
(see also Coordinating coros)
asio::awaitable<void> scenario(unsigned n) try {
auto ex = co_await asio::this_coro::executor;
using Task = decltype(co_spawn(ex, do_work(1, 1s), asio::deferred));
std::vector<Task> tasks;
for (unsigned i = 1; i <= n; ++i)
tasks.push_back(co_spawn(ex, do_work(i, i * 1s), asio::deferred));
// Await them all and let them execute in parallel
auto grp = asio::experimental::make_parallel_group(std::move(tasks));
auto rr = co_await ( //
grp.async_wait(asio::experimental::wait_for_all(), asio::use_awaitable) ||
asio::steady_timer(ex, 5s).async_wait(asio::use_awaitable) //
);
if (rr.index() == 0) {
std::cout << "All tasks finished!" << std::endl;
} else {
std::cout << "Timeout!" << std::endl;
}
} catch (std::exception const& ex) {
std::cout << "Exception: " << ex.what() << std::endl;
}
See a live demo with two scenarios:
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;
asio::awaitable<void> do_work(int index, std::chrono::steady_clock::duration d) {
std::cout << "Starting work." << std::endl;
co_await asio::steady_timer(co_await asio::this_coro::executor, d).async_wait(asio::deferred);
std::cout << "Work " << index << " finished!" << std::endl;
}
asio::awaitable<void> scenario(unsigned n) try {
auto ex = co_await asio::this_coro::executor;
using Task = decltype(co_spawn(ex, do_work(1, 1s), asio::deferred));
std::vector<Task> tasks;
for (unsigned i = 1; i <= n; ++i)
tasks.push_back(co_spawn(ex, do_work(i, i * 1s), asio::deferred));
// Await them all and let them execute in parallel
auto grp = asio::experimental::make_parallel_group(std::move(tasks));
auto rr = co_await ( //
grp.async_wait(asio::experimental::wait_for_all(), asio::use_awaitable) ||
asio::steady_timer(ex, 5s).async_wait(asio::use_awaitable) //
);
if (rr.index() == 0) {
std::cout << "All tasks finished!" << std::endl;
} else {
std::cout << "Timeout!" << std::endl;
}
} catch (std::exception const& ex) {
std::cout << "Exception: " << ex.what() << std::endl;
}
asio::awaitable<void> test() {
std::cout << "Scenario 1." << std::endl;
co_await scenario(3);
std::cout << "Scenario 2." << std::endl;
co_await scenario(6);
}
int main() {
asio::thread_pool ioc(1);
co_spawn(ioc, test(), asio::detached);
ioc.join();
}
Printing:
Scenario 1.
Starting work.
Starting work.
Starting work.
Work 1 finished!
Work 2 finished!
Work 3 finished!
All tasks finished!
Scenario 2.
Starting work.
Starting work.
Starting work.
Starting work.
Starting work.
Starting work.
Work 1 finished!
Work 2 finished!
Work 3 finished!
Work 4 finished!
Work 5 finished!
Timeout!