c++boost-asio

Execute and wait for unknown number of coroutines with asio


I'm trying to execute a number of C++20 coroutines (number provided at runtime), in parallel, and wait for them all.

My current design is to post N times to the pool a task that co_spawn a coroutine, push the result of this into a container and use make_parallel_group. Not sure it makes entirely sense, feedback welcomed.

Here's my attempt (not compiling)

#include <asio/co_spawn.hpp>
#include <asio/detached.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <asio/experimental/parallel_group.hpp>
#include <asio/io_context.hpp>
#include <asio/steady_timer.hpp>
#include <asio/thread_pool.hpp>
#include <asio/use_awaitable.hpp>
#include <fmt/core.h>
#include <fmt/ranges.h>
#include <iostream>
#include <list>

int main(int, char* argv[])
{
    auto task = [&]() -> asio::awaitable<int> {
        auto ex = co_await asio::this_coro::executor;
        auto& io = *ex.target<asio::io_context>();

        // do coroutine stuff using io ...

        co_return 1; // int is pointless, just an attempt to see if it was making a difference
    };
    asio::thread_pool pool{ 4 };

    auto end_way = asio::use_awaitable; // using asio::deferred causes a different error
    using op_type = decltype(asio::co_spawn(pool, task(), end_way));
    std::list<op_type> ops;
    std::mutex m;

    // 5 is hard coded for the moment
    for(auto i = 0; i < 5; ++i)
    {
        asio::post(pool, [&ops, &task, &pool, &m, &end_way]() {
            std::lock_guard<std::mutex> l{m};
            ops.push_back(asio::co_spawn(pool, task(), end_way));
        });
    }
    asio::experimental::make_parallel_group(ops)
        .async_wait(
            asio::experimental::wait_for_all(),
            [](std::vector<std::size_t> completion_order, auto&&...) {
                for(std::size_t i = 0; i < completion_order.size(); ++i)
                {
                    std::cout << "task " << completion_order[i] << " finished: ";
                }
            }
        );
    pool.join();

    return 0;
}


Solution

  • Deferred async operations are move-only. You need to move the range into the parallel group:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/experimental/awaitable_operators.hpp>
    #include <boost/asio/experimental/parallel_group.hpp>
    #include <fmt/ranges.h>
    #include <iostream>
    #include <vector>
    namespace asio = boost::asio;
    
    asio::awaitable<int> task() {
        auto ex = co_await asio::this_coro::executor;
        co_return 1;
    }
    
    int main() {
        asio::thread_pool pool{4};
    
        static constexpr auto token = asio::deferred;
    
        using op_type = decltype(asio::co_spawn(pool, task, token));
        std::vector<op_type> ops;
    
        unsigned n = 5;
        for (unsigned i = 0; i < n; ++i)
            ops.push_back(asio::co_spawn(pool, task, token));
    
        auto g = asio::experimental::make_parallel_group(std::move(ops));
        g.async_wait(asio::experimental::wait_for_all(),
                     [](std::vector<std::size_t> completion_order, auto&&...) {
                         std::cout << __PRETTY_FUNCTION__ << std::endl;
                         for (auto index : completion_order) {
                             std::cout << "task " << index << " finished: " << std::endl;
                         }
                     });
    
        pool.join();
    }
    

    Printing e.g.

    task 0 finished: 
    task 4 finished: 
    task 3 finished: 
    task 1 finished: 
    task 2 finished:
    

    Demo

    Demonstrating actual output, random task load, error handling and reporting and simplifying relying on recent Boost making deferred the default completion token:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/experimental/awaitable_operators.hpp>
    #include <boost/asio/experimental/parallel_group.hpp>
    #include <fmt/ranges.h>
    #include <iostream>
    #include <random>
    #include <vector>
    namespace asio = boost::asio;
    
    asio::awaitable<int> task(int input) {
        co_return input % 7 ? 10 * input : throw std::runtime_error("seven is not a lucky number");
    }
    
    int main() {
        asio::thread_pool pool{4};
    
        std::vector<decltype(co_spawn(pool, task(1)))> ops;
    
        unsigned n = 10 + std::random_device{}() % 5;
        for (unsigned i = 1; i < n; ++i)
            ops.push_back(co_spawn(pool, task(i)));
    
        auto g = asio::experimental::make_parallel_group(std::move(ops));
        std::move(g).async_wait( //
            asio::experimental::wait_for_all(),
            [](auto const& order, auto&& exceptions, auto&& rvals) {
                for (size_t i = 0; i < order.size(); ++i)
                    try {
                        std::cout << "task " << std::setw(2) << order.at(i) << " finished: ";
                        if (exceptions.at(i))
                            std::rethrow_exception(exceptions.at(i));
    
                        std::cout << rvals.at(i) << std::endl;
                    } catch (std::exception const& e) {
                        std::cout << "ERROR " << std::quoted(e.what()) << std::endl;
                    }
            });
    
        pool.join();
    }
    

    With local demos