c++boostboost-asioboost-process

Using single io_context to run mutlple processes with timeout in parallel


I've tried to modify the example from here and make the processes run in parallel since in my use case, the processes can be idle for a large portion of their lifecycle, so cpu resource can be better utilize when running in parallel

  1. First I've tried that each task will run from boost::thread_group object. While running it, I got sometimes unexpected behavior that led to crash for the reasons below :
a.out(17512,0x1dac25c40) malloc: *** error for object 0x600003d60000: pointer being freed was not allocated
a.out(17512,0x1dac25c40) malloc: *** set a breakpoint in malloc_error_break to debug
libc++abi: terminating due to uncaught exception of type std::__1::future_error: The associated promise has been destructed prior to the associated state becoming ready.```
  1. Then I've tried to replace the thread_group and run everything from the io_context using boost::asio::post(ioc, [&]() { but it didn't go well either since i got deadlock where the std::future is blocking the thread, and prevent the process from running. I guess it's possible to add more threads to run io_context, but i prefer to have some std::future that can yield.

I'd be happy to hear suggestions to repair my code:

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <iostream>
#include <boost/thread.hpp>


using duration = std::chrono::system_clock::duration;
namespace asio = boost::asio;
using namespace std::chrono_literals;

std::string ExecuteProcess(boost::filesystem::path  exe,
                           std::vector<std::string> args, //
                           duration                 time, //
                           std::error_code&         ec,   //
                           asio::io_context&        ioc) {
    namespace bp = boost::process;
    std::future<std::string> data, err_output;

    auto const deadline = std::chrono::steady_clock::now() + time;

    bp::group  g;
    ec.clear();
    bp::child child(exe, args, ioc, g, bp::error(ec), bp::std_in.null(), bp::std_out > data,
                    bp::std_err > err_output);

    if (ec) {
        return {};
    }

    if (data.wait_until(deadline) == std::future_status::ready) {
        return data.get();
    }

    if (std::error_code ignore; child.running(ignore)) {
        g.terminate(ignore);
    }

    ec = make_error_code(asio::error::timed_out); // TODO FIXME
    return {};
}

int main() {
    constexpr duration              timeout = 20s;
    [[maybe_unused]] constexpr auto script1 = "/usr/bin/curl http://httpbin.org/ip -m 5";
    [[maybe_unused]] constexpr auto script2 = R"(delay="0.5"; sleep "$delay"; echo -n "sanity restored after $delay")";

    asio::io_context ioc;

    auto        work = make_work_guard(ioc); // prevent running out of work
    std::thread io_thread([&ioc] { ioc.run(); });


// Option 1 : use thread group

//    boost::thread_group worker_threads;
//    for (int i = 0; i < 20; i++) {
//      worker_threads.create_thread([&]() {
//          std::error_code ec;
//          auto s = ExecuteProcess("/bin/bash", {"-c", script2}, timeout, ec, ioc);
//          std::cout << "got " << ec.message() << ": " << s << std::endl;
//      });
//    }
//
//    work.reset(); // allow running out of work
//    io_thread.join();
//    worker_threads.join_all();



// Option 2 : use post-io_context
    for (int i = 0; i < 20; i++) {
    boost::asio::post(ioc, [&]() {
            std::error_code ec;
            auto s = ExecuteProcess("/bin/bash", {"-c", script2}, timeout, ec, ioc);
            std::cout << "got " << ec.message() << ": " << s << std::endl;
      });
    }
    work.reset(); // allow running out of work
    io_thread.join();

}

the code can be compiled for testing using the following command :

g++ -std=c++20 -g -O3 -Wall -pedantic -pthread -lboost_{thread,coroutine,context} ~/main.cpp -I<path_to_boost_headers> -L<path_to_boost_libs>

Solution

  • Yeah. It's a bit annoying that boost::process::child doesn't expose a regular async interface compatible with completion tokens, despite the fact that it tightly integrates with Asio and naturally deals with 100% asynchronous processes.

    This interface does exist: https://beta.boost.org/doc/libs/1_82_0/doc/html/boost/process/async_system.html. However, it cannot work with c++14-only completion tokens. Also it has the very old anti-pattern of passing a reference to asio::io_context around.

    This makes it hard to intelligently use it e.g. with a strand or even with asio::thread_pool. However, we can roll our own initiation function that gets around those downsides. Sadly, we will always need an io_context instance, though you might choose to use an internal ("global") instance hidden from the interface.

    With some convenience typedefs

    using boost::filesystem::path;
    using Signature = void(std::error_code, int, std::string);
    using Duration  = std::chrono::system_clock::duration;
    using Args      = std::vector<std::string>;
    

    Let's define our completion token interface as

    template <typename Token>
    auto asyncExecuteProcess(                                       //
        path exe, Args args, Duration limit, asio::io_context& ioc, //
        Token&& token);
    

    Before we dive in deep, this is how we expect to use it to run many processes in parallel under some time constraints and handle each result as soon as it is ready:

    int main() {
        using ProcessExecution::asyncExecuteProcess;
    
        constexpr auto timeout = 50ms;
        constexpr auto script  = R"(
             number=$((1 + $RANDOM % 9));
             sleep "0.0$number";
             echo -n "sanity restored after 0.0$number";
             exit $number
        )";
    
        asio::io_context ioc(1);
        auto work = make_work_guard(ioc);
    
        for (int i = 0; i < 10; i++)
            asyncExecuteProcess(                           //
                "/bin/bash", {"-c", script}, timeout, ioc, //
                [i](std::error_code ec, int exit_code, std::string data) {
                    if (ec)
                        std::cout << "Completed #" << i << " " << ec.message() << " " << exit_code << " "
                                  << quoted(data) << std::endl;
                });
    
        std::cout << "Everything initiated, single-threaded multiplexed completion" << std::endl;
    
        work.reset();
        ioc.run();
    }
    

    The embedded bash script sleeps randomly between 1..10 * 10ms, and returns the corresponding exit code 1..10. Since we start them all "immediately", we expect the results to come in ascending order of delay, but not necessarily in order of job number (#i).

    Since all delays > 50ms should timeout, we expect the few last entries to show errors.

    The Magic

    In the implementation details we need to deduce an associated executor (in case the completion token is e.g. bind_executor(my_strand, asio::use_awaitable)).

    auto ex = asio::get_associated_executor(token, ioc.get_executor());
    

    Next up, some resources need to be allocated so they remain stable for the duration of our async operation:

    auto a  = asio::get_associated_allocator(token);
    auto st = std::allocate_shared<stable_state>(a, ex, deadline);
    

    Stable state contains all the things that cannot be (cheaply) moved:

    struct stable_state : std::enable_shared_from_this<stable_state> {
        stable_state(asio::any_io_executor ex, time_point t) : timer(ex, t) {}
        asio::steady_timer       timer;
        bp::group                process_group;
        std::future<std::string> out, err;
    
        void arm_timer() {
            timer.async_wait([self = this /*->shared_from_this()*/](boost::system::error_code ec) {
                if (!ec)
                    self->process_group.terminate(ec);
            });
        }
    };
    

    The remaining bits of the initiation function may look like this:

    auto init = [&ioc, ex, st] //
        (auto&& handler, path exe, Args args) {
            auto wrapped_handler = [h = std::move(handler), ex, st] //
                (int exit_code, std::error_code ec) mutable {
                    bool terminated = !st->timer.expires_at(time_point::min());
                    bool ok         = !terminated && !ec;
    
                    std::string data;
                    if (ok) {
                        if (st->out.wait_for(0s) == std::future_status::ready)
                            data = st->out.get();
                        else
                            ec = make_error_code(asio::error::interrupted); // TODO
                    }
    
                    if (terminated) {
                        data = "Killed";
                        if (!ec)
                            ec = make_error_code(asio::error::operation_aborted);
                    }
    
                    assert(st.unique());
                    st.reset(); // deallocate before invoking the handler
    
                    asio::dispatch(                                                //
                        ex, [=, h = std::move(h), d = std::move(data)]() mutable { //
                            std::move(h)(ec, exit_code, std::move(d));
                        });
                };
    
            bp::child(ioc, exe, args,        //
                      st->process_group,     //
                      bp::std_in.null(),     //
                      bp::std_out > st->out, //
                      bp::std_err > st->err, //
                      bp::on_exit(std::move(wrapped_handler)))
                .detach();
    
            st->arm_timer();
        };
    return asio::async_initiate<Token, Signature>( //
        init, token, std::move(exe), std::move(args));
    

    Note I elected to "soft-detect" termination indirectly by detecting whether the timer had already completed. On POSIX, a much nicer way to do it would be keeping bp::child in the stable-state and querying the native_exit_code like described here: https://stackoverflow.com/a/57733210/85371

    Most of the code otherwise is making sure that the right executor is used¹, the allocations are freed at the required moments and doing various asserts to guard against programming mistakes.

    DEMO TIME

    Of course, where would we be without proof of the pudding:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/process.hpp>
    #include <iostream>
    using namespace std::chrono_literals;
    namespace asio = boost::asio;
    
    namespace ProcessExecution {
        using boost::filesystem::path;
        using Signature = void(std::error_code, int, std::string);
        using Duration  = std::chrono::system_clock::duration;
        using Args      = std::vector<std::string>;
    
        template <typename Token>
        auto asyncExecuteProcess(                                       //
            path exe, Args args, Duration limit, asio::io_context& ioc, //
            Token&& token)                                              //
        {
            namespace bp = boost::process;
    
            using time_point    = std::chrono::steady_clock::time_point;
            time_point deadline = std::chrono::steady_clock::now() + limit;
    
            struct stable_state : std::enable_shared_from_this<stable_state> {
                stable_state(asio::any_io_executor ex, time_point t) : timer(ex, t) {}
                asio::steady_timer       timer;
                bp::group                process_group;
                std::future<std::string> out, err;
    
                void arm_timer() {
                    timer.async_wait([self = this /*->shared_from_this()*/](boost::system::error_code ec) {
                        if (!ec)
                            self->process_group.terminate(ec);
                    });
                }
            };
    
            auto ex = asio::get_associated_executor(token, ioc.get_executor());
            auto a  = asio::get_associated_allocator(token);
            auto st = std::allocate_shared<stable_state>(a, ex, deadline);
    
            auto init = [&ioc, ex, st] //
                (auto&& handler, path exe, Args args) {
                    auto wrapped_handler = [h = std::move(handler), ex, st] //
                        (int exit_code, std::error_code ec) mutable {
                            bool terminated = !st->timer.expires_at(time_point::min());
                            bool ok         = !terminated && !ec;
    
                            std::string data;
                            if (ok) {
                                if (st->out.wait_for(0s) == std::future_status::ready)
                                    data = st->out.get();
                                else
                                    ec = make_error_code(asio::error::interrupted); // TODO
                            }
    
                            if (terminated) {
                                data = "Killed";
                                if (!ec)
                                    ec = make_error_code(asio::error::operation_aborted);
                            }
    
                            assert(st.unique());
                            st.reset(); // deallocate before invoking the handler
    
                            asio::dispatch(                                                //
                                ex, [=, h = std::move(h), d = std::move(data)]() mutable { //
                                    std::move(h)(ec, exit_code, std::move(d));
                                });
                        };
    
                    bp::child(ioc, exe, args,        //
                              st->process_group,     //
                              bp::std_in.null(),     //
                              bp::std_out > st->out, //
                              bp::std_err > st->err, //
                              bp::on_exit(std::move(wrapped_handler)))
                        .detach();
    
                    st->arm_timer();
                };
            return asio::async_initiate<Token, Signature>( //
                init, token, std::move(exe), std::move(args));
        }
    } // namespace ProcessExecution
    
    int main() {
        using ProcessExecution::asyncExecuteProcess;
    
        constexpr auto timeout = 50ms;
        constexpr auto script  = R"(
             number=$((1 + $RANDOM % 9));
             sleep "0.0$number";
             echo -n "sanity restored after 0.0$number";
             exit $number
        )";
    
        asio::io_context ioc(1);
        auto work = make_work_guard(ioc);
    
        for (int i = 0; i < 10; i++)
            asyncExecuteProcess(                           //
                "/bin/bash", {"-c", script}, timeout, ioc, //
                [i](std::error_code ec, int exit_code, std::string data) {
                    std::cout << "Completed #" << i << " " << ec.message() << " " << exit_code << " "
                              << quoted(data) << std::endl;
                });
    
        std::cout << "Everything initiated, single-threaded multiplexed completion" << std::endl;
    
        work.reset();
        ioc.run();
    }
    

    enter image description here


    ¹ there's a subtle race still here, where even if terminated==false we still have to check that the out future is ready. If we don't, we can get very unlucky and have a race where the future is never ready but the process has been killed. I suspect this is indicating that Boost Process actually doesn't perform all the work on an service thread (as required). I'll probably look into this later.