c++boostc++20boost-asiocoroutine

What is the idiomatic way for a coroutine to transition from a strand to its parent executor?


Let's assume I have a coroutine running on a strand. I would like the completion to be posted to the parent executor of the strand (i.e. : a thread_pool) while also leaving the strand. What is the best way to achieve this result without incurring undue overhead?

Here is some sample code, that illustrates the issue. It seems to me that f2 incurs overhead and breaks locality of reasoning for the code. But f3 cannot do what I want in the specific case where the bound executor is the parent of the strand. Binding f3 to any other unrelated executor would have been fine, just not the strand's parent.

boost::asio::awaitable<void> f2(auto& pool, auto& strand)
{
    assert(strand.running_in_this_thread());
    co_await boost::asio::post(bind_executor(pool, boost::asio::deferred));
}

boost::asio::awaitable<void> f3([[maybe_unused]] auto& pool, auto& strand)
{
    assert(strand.running_in_this_thread());
    co_return;
}

int main(int argc, char* argv[])
{
    boost::asio::thread_pool pool{std::thread::hardware_concurrency()};
    auto strand = boost::asio::make_strand(pool);
    
    co_spawn(strand, f2(pool, strand), bind_executor(pool, [&](std::exception_ptr exception) {
        assert(!strand.running_in_this_thread()); // Succeeds, but required an explicit `post` at the end of `f2`
    }));
    co_spawn(strand, f3(pool, strand), bind_executor(pool, [&](std::exception_ptr exception) {
        assert(strand.running_in_this_thread()); // actual result : succeeds
        assert(!strand.running_in_this_thread()); // expected result, but currently fails
    }));

    pool.join();
}

Edit

@sehe I'm missing the point I guess. First, in your sample code, f1 and f3 are identical. I'm not sure if they are meant to be or not. Second, the completion passed to co_spawn is always bound to a strand, but never to the strand's parent executor (the pool).

I'll try to express my issue in a different way. I can simplify the sample code by removing coroutines altogether. Dispatching to pool from within strand executes inline because it is both true that we are already inside strand and inside pool. If I want to free up strand, I can stop using dispatch and use post or defer.

Now back with coroutines, my current understanding is that the awaitable continuation will be dispatched to the associated executor. Meaning it'll execute inline if my associated executor is the parent of the strand because I'm already running inside it.

In my initial sample code f2 successfully gets off the strand by adding an extra post at the very end. Afterwards, the coroutine's completion is dispatched to that same pool and never returns on strand. But it seems to me there is undue overhead in having both post and dispatch for the same continuation.

https://gcc.godbolt.org/z/c98s8EMs9

int main()
{
    boost::asio::thread_pool pool{1};
    auto strand = boost::asio::make_strand(pool);

    // Similar to my initial `f3`
    boost::asio::dispatch(bind_executor(strand, [&]() {
        boost::asio::dispatch(bind_executor(pool, [&]() {
            assert(strand.running_in_this_thread());
        }));
    }));

    // Similar to my initial `f2`
    boost::asio::dispatch(bind_executor(strand, [&]() {
        boost::asio::post(bind_executor(pool, [&]() {
            boost::asio::dispatch(bind_executor(pool, [&]() {
                assert(!strand.running_in_this_thread());
            }));
        }));
    }));

    // What I'm hoping for is the following
    boost::asio::dispatch(bind_executor(strand, [&]() {
        boost::asio::post(bind_executor(pool, [&]() {
            assert(!strand.running_in_this_thread());
        }));
    }));

    pool.join();
}

Solution

  • What resource are you protecting with the strand?

    I ask, because a coroutine is always its own implicit strand (it's a purely sequential control flow) so, unless you are sharing resources between concurrent copies of the coroutine, a strand is redundant here.

    In the general case, Asio communicates what executor to dispatch a (completion) handler to by means of the associated executor. You already use this mechanism anyways (bind_executor).

    So in your specific case, this ought to suffice:

    co_spawn(strand, fX, bind_executor(pool.get_executor(), asio::detached));
    

    CAVEATS

    Beware, strands are not deterministic and not exclusive. See for background here: Boost Asio: How to run a single handler in multiple independent strands?

    A live sample with near-exhaustive illustration of the possibilities:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <syncstream>
    namespace asio = boost::asio;
    
    using Strand = asio::strand<asio::thread_pool::executor_type>;
    static std::atomic<Strand const*> //
        p_strand1 = nullptr,
        p_strand2 = nullptr;
    
    void report(auto const& caption) {
        std::string where;
        if (p_strand1 && p_strand1.load()->running_in_this_thread())
            where += " strand1";
        if (p_strand2 && p_strand2.load()->running_in_this_thread())
            where += " strand2";
    
        if (where.empty())
            where = " (none)";
    
        std::osyncstream(std::cerr) << caption << " on" << where << '\n';
    }
    
    asio::awaitable<void> f1() {
        report("--\nf1");
        co_return;
    }
    
    asio::awaitable<void> f2(bool use_dispatch) {
        report("--\nf2 (" + std::string(use_dispatch ? "dispatch" : "post") + ")");
    
        asio::any_io_executor ex   = co_await asio::this_coro::executor,
                              pool = ex.target<Strand>()->get_inner_executor();
        if (use_dispatch)
            co_await dispatch(bind_executor(pool, asio::deferred));
        else
            co_await post(bind_executor(pool, asio::deferred));
    }
    
    asio::awaitable<void> f3() {
        report("--\nf3");
        co_return;
    }
    
    void complete(std::exception_ptr e) { return e ? std::rethrow_exception(e) : report("completion"); };
    
    enum strand_select { one, two };
    void test(auto f, strand_select select) {
        asio::thread_pool pool(4);
    
        auto strand1 = make_strand(pool);
        auto strand2 = make_strand(pool);
        p_strand1    = &strand1; // for debug output
        p_strand2    = &strand2; // for debug output
    
        auto& which = (select == one) ? strand1 : strand2;
        co_spawn(strand1, std::move(f), bind_executor(which, complete));
    
        pool.join(); // wait for all tasks to finish
    
        p_strand1 = p_strand2 = nullptr;
    }
    
    int main() {
        test(f1, one);
        test(f1, two);
    
        test(f2(false), one);
        test(f2(false), two);
    
        test(f2(true), one);
        test(f2(true), two);
    
        test(f3, one);
        test(f3, two);
    }
    

    With typical output (in this strictly isolating test() setup the output is deterministic, of course):

    --
    f1 on strand1
    completion on strand1
    --
    f1 on strand1
    completion on strand1 strand2
    --
    f2 (post) on strand1
    completion on strand1
    --
    f2 (post) on strand1
    completion on strand2
    --
    f2 (dispatch) on strand1
    completion on strand1
    --
    f2 (dispatch) on strand1
    completion on strand1 strand2
    --
    f3 on strand1
    completion on strand1
    --
    f3 on strand1
    completion on strand1 strand2