c++c++20boost-asioboost-coroutinedeadline-timer

Can I run N boost::asio::awaitable co-routines (or a vector of co-routines) in parallel and tack on a deadline timer?


I'm trying to execute a bunch of boost::asio::awaitable co-routines that I don't know the exact amount of at runtime (it varies based on conditions). I need them to run in parallel AND I need them to all cancel and return an error if the whole operation times out.

I have come up with the following code+pseudo code to demonstrate what I'm trying to do.

Note: Please feel free to critique my deadline timer idea. I couldn't think of anything cleaner.

#include <boost/asio/thread_pool.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/detached.hpp>  

#include <iostream>
#include <chrono>
#include <system_error>

using namespace std::chrono_literals;
using namespace boost::asio::experimental::awaitable_operators;

boost::asio::awaitable<void> do_work( int index, std::chrono::steady_clock::time_point tp )
{
    std::cout << "Starting work." << std::endl;

    boost::asio::steady_timer timer( co_await boost::asio::this_coro::executor );
    timer.expires_at( tp );
    co_await timer.async_wait( boost::asio::deferred );

    std::cout << "Work " << index << " finished!" << std::endl;
}

boost::asio::awaitable<void> test()
{
    try
    {
        auto executor  = co_await boost::asio::this_coro::executor;

        boost::asio::steady_timer deadline_timer( executor );
        
        auto schedule_deadline = [ &deadline_timer ]( std::chrono::steady_clock::time_point tp ) -> boost::asio::awaitable<void>
        {
            std::cout << "Starting deadline" << std::endl;
            try
            {
                deadline_timer.expires_at( tp );
                co_await deadline_timer.async_wait( boost::asio::deferred  );
                throw boost::system::system_error( std::make_error_code( std::errc::timed_out ) );
            }
            catch( const boost::system::system_error& e )
            {
                if( e.code() == boost::asio::error::operation_aborted )
                    std::cout << "Timer was canceled." << std::endl;
                else
                    std::cerr << "Unexpected error: " << e.what() << std::endl;
            }

            std::cout << "Ending deadline" << std::endl;
        };

        auto execute_coro = [ &deadline_timer ]( auto &&coro ) -> boost::asio::awaitable<void>
        {
            co_await std::forward<decltype(coro)>(coro);

            deadline_timer.cancel();
        };

        std::cout << "Scenario 1." << std::endl;

        auto now = std::chrono::steady_clock::now();


        // Pesudo code from here!!!
        // Collection of N co-routine tasks
        std::vector<awaitables> coros;

        // Add these all in a loop (no loop in this example)
        coros.push_back( do_work( 1, now + 1s ) );
        coros.push_back( do_work( 2, now + 2s ) );
        coros.push_back( do_work( 3, now + 3s ) );

        // Await them all and let them execute in parallel
        auto all_coros = parallel_coros_group( coros );

        co_await (
            // Wrap in a timer canceling function
            execute_coro( all_coros ) &&
            // Schedule a deadline
                schedule_deadline( now + 5s )
        );

    }
    catch( const std::exception &ex )
    {
        std::cout << "Exception: " << ex.what() << std::endl;
    }
}

int main()
{
    boost::asio::thread_pool ioc(1);

    co_spawn(
        ioc,
        test(),
        boost::asio::detached
    );

    ioc.join();
}

Is something like this possible and how can it be done?


Solution

  • You can use the ranged make_parallel_group (see also Coordinating coros)

    asio::awaitable<void> scenario(unsigned n) try {
        auto ex = co_await asio::this_coro::executor;
    
        using Task = decltype(co_spawn(ex, do_work(1, 1s), asio::deferred));
        std::vector<Task> tasks;
        for (unsigned i = 1; i <= n; ++i)
            tasks.push_back(co_spawn(ex, do_work(i, i * 1s), asio::deferred));
    
        // Await them all and let them execute in parallel
        auto grp = asio::experimental::make_parallel_group(std::move(tasks));
    
        auto rr = co_await ( //
            grp.async_wait(asio::experimental::wait_for_all(), asio::use_awaitable) ||
            asio::steady_timer(ex, 5s).async_wait(asio::use_awaitable) //
        );
    
        if (rr.index() == 0) {
            std::cout << "All tasks finished!" << std::endl;
        } else {
            std::cout << "Timeout!" << std::endl;
        }
    
    } catch (std::exception const& ex) {
        std::cout << "Exception: " << ex.what() << std::endl;
    }
    

    See a live demo with two scenarios:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/experimental/awaitable_operators.hpp>
    #include <iostream>
    
    namespace asio = boost::asio;
    using namespace std::chrono_literals;
    using namespace asio::experimental::awaitable_operators;
    
    asio::awaitable<void> do_work(int index, std::chrono::steady_clock::duration d) {
        std::cout << "Starting work." << std::endl;
    
        co_await asio::steady_timer(co_await asio::this_coro::executor, d).async_wait(asio::deferred);
    
        std::cout << "Work " << index << " finished!" << std::endl;
    }
    
    asio::awaitable<void> scenario(unsigned n) try {
        auto ex = co_await asio::this_coro::executor;
    
        using Task = decltype(co_spawn(ex, do_work(1, 1s), asio::deferred));
        std::vector<Task> tasks;
        for (unsigned i = 1; i <= n; ++i)
            tasks.push_back(co_spawn(ex, do_work(i, i * 1s), asio::deferred));
    
        // Await them all and let them execute in parallel
        auto grp = asio::experimental::make_parallel_group(std::move(tasks));
    
        auto rr = co_await ( //
            grp.async_wait(asio::experimental::wait_for_all(), asio::use_awaitable) ||
            asio::steady_timer(ex, 5s).async_wait(asio::use_awaitable) //
        );
    
        if (rr.index() == 0) {
            std::cout << "All tasks finished!" << std::endl;
        } else {
            std::cout << "Timeout!" << std::endl;
        }
    
    } catch (std::exception const& ex) {
        std::cout << "Exception: " << ex.what() << std::endl;
    }
    
    asio::awaitable<void> test() {
        std::cout << "Scenario 1." << std::endl;
        co_await scenario(3);
    
        std::cout << "Scenario 2." << std::endl;
        co_await scenario(6);
    }
    
    int main() {
        asio::thread_pool ioc(1);
    
        co_spawn(ioc, test(), asio::detached);
    
        ioc.join();
    }
    

    Printing:

    Scenario 1.
    Starting work.
    Starting work.
    Starting work.
    Work 1 finished!
    Work 2 finished!
    Work 3 finished!
    All tasks finished!
    Scenario 2.
    Starting work.
    Starting work.
    Starting work.
    Starting work.
    Starting work.
    Starting work.
    Work 1 finished!
    Work 2 finished!
    Work 3 finished!
    Work 4 finished!
    Work 5 finished!
    Timeout!