boostboost-asio

make_parallel_group: Can it be used with thread pool 'post'?


I'd like to have a function that applies elements from input range to the handler in a parallel. A possible implementation is:

template <typename Ex, typename Handler, typename Range>
void for_each_parallel(Ex executor, Handler handler, Range const& input)
{
    if (std::atomic<size_t> left = size(input))
    {
        std::promise<void> p;
        auto f = p.get_future();
        
        for (auto& el: input)
        {
            post(executor, [&] {
                handler(el);
                if (--left == 0) p.set_value();
            });
        }
        f.get();
    }
}

Is it possible to implement it by using ranged form of make_parallel_group(wait_for_all(),...)? No matter how I tried, I'm getting compiler errors.


Solution

  • Yeah your godbolt makes sense to me. I think the core problem is that post only posts "a handler", not an async task with completion. (The handler is the completion). You want an async initiation, so to speak, async_post. Let's say you implement an async_post like so:

    auto async_post(auto executor, auto op, auto&& token) {
        return asio::async_compose<decltype(token), void()>(
            [executor, fn = std::move(op)](auto& self) mutable {
                asio::post(executor, [fn = std::move(fn), self = std::move(self)]() mutable {
                    fn();
                    std::move(self).complete();
                });
            },
            token, executor);
    }
    

    Now you can create deferred ops from it:

    void for_each_parallel(auto ex, auto action, auto& input_range) {
        auto&& ops =
            input_range //
            | ranges::views::transform(
                  [&](auto& elem) {                                                    //
                      return async_post(                                               //
                          ex, [&elem, &action]() { action(elem); }, asio::deferred); //
                  });
    

    Sadly Asio doesn't recognize that view as is_async_operation_range. Let's cop out for now and convert to a vector:

        using Op = std::decay_t<decltype(ops.front())>;
        std::vector<Op> vops(ops.begin(), ops.end());
    

    Now your original group does work:

        namespace X = asio::experimental;
        auto grp    = X::make_parallel_group(vops);
    
        std::move(grp).async_wait(                                    //
            X::wait_for_all(), [](std::vector<size_t> const& order) { //
                fmt::print("[{}]: finished in order {}\n", tid, order);
            });
    }
    

    See it Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/experimental/parallel_group.hpp>
    #include <fmt/ranges.h>
    #include <range/v3/all.hpp>
    namespace asio = boost::asio;
    
    static std::atomic_int tid_gen = 0;
    thread_local int const tid     = ++tid_gen;
    
    auto async_post(auto executor, auto op, auto&& token) {
        return asio::async_compose<decltype(token), void()>(
            [executor, fn = std::move(op)](auto& self) mutable {
                asio::post(executor, [fn = std::move(fn), self = std::move(self)]() mutable {
                    fn();
                    std::move(self).complete();
                });
            },
            token, executor);
    }
    
    void for_each_parallel(auto ex, auto action, auto& input_range) {
        auto&& ops =
            input_range //
            | ranges::views::transform(
                  [&](auto& elem) {                                                    //
                      return async_post(                                               //
                          ex, [&elem, &action]() { action(elem); }, asio::deferred); //
                  });
        using Op = std::decay_t<decltype(ops.front())>;
        std::vector<Op> vops(ops.begin(), ops.end());
    
        namespace X = asio::experimental;
        auto grp    = X::make_parallel_group(vops);
    
        std::move(grp).async_wait(                                    //
            X::wait_for_all(), [](std::vector<size_t> const& order) { //
                fmt::print("[{}]: finished in order {}\n", tid, order);
            });
    }
    
    int main() {
        fmt::print("[{}]: main\n", tid);
        asio::thread_pool tp(3);
    
        std::vector input = {1, 2, 3, 4, 5};
        for_each_parallel(
            tp.get_executor(),                                 //
            [](int i) { fmt::print("[{}]: i={}\n", tid, i); }, //
            input);
    
        fmt::print("[{}]: done\n", tid);
    
        tp.join();
    }
    

    Printing, locally: