c++boostboost-asioboost-process

Is is possible to use process output while process is running?


Boost.process allows the usage of Boost.asio in order to perform asynchronous read.

From what I understand, this is useful to read output while process is running without having to wait for the process to terminate.

But to access this output, is it necessary to wait for the process to terminate, or is it possible to access it while process is running, and how?

Actually my need is to access the beginning of a process output (to check that it started as expected) while while keeping it running.

To detail the context, I run a process which I want to keep until the end of the execution:

boost::asio::io_service ios;
std::vector<char> buf;

bp::child c("process_that_needs_to_keep_running", args, 
bp::std_out > boost::asio::buffer(buf), ios);

ios.run();
// I DON'T WANT WAIT FOR c TO TERMINATE
// but I want to check that buf contains some text that ensures me it started correctly
// the issue I have here is that I don't know how to read from buf, since its size and content might not be consistent
// is it possible to take a snapshot for instance?
check_started_correctly(buf);

Here the issue is that the producer creates output which I don't control, I just issues output.


Solution

  • If you use bp::std_out > some_kind_of_buffer_or_future you will usually get the result only at exit.

    However, you can use an async_pipe:

    bp::async_pipe pipe(io);
    
    bp::child c( //
        "/bin/bash",
        std::vector<std::string>{
            "-c",
            "for a in {1..20}; do sleep 1; echo message $a; done",
        },                    //
        bp::std_out > pipe,   //
        bp::on_exit(on_exit), //
        io);
    

    Now, you have to explicitly do the IO on that pipe:

    boost::asio::streambuf sb;
    async_read_until(                //
        pipe, sb, "message 5\n",     //
        [&](error_code ec, size_t) { //
            std::cout << "Got message 5 (" << ec.message() << ")" << std::endl;
        });
    

    This works:

    Live On Coliru

    #include <boost/process.hpp>
    #include <boost/process/async.hpp>
    #include <boost/asio.hpp>
    #include <iostream>
    
    namespace bp = boost::process;
    using boost::system::error_code;
    
    namespace /*file-static*/ {
        using namespace std::chrono_literals;
        static auto       now = std::chrono::steady_clock::now;
        static const auto t0  = now();
    
        static auto timestamp() {
            return std::to_string((now() - t0) / 1.s) + "s ";
        }
    } // namespace
    
    int main() {
        boost::asio::io_context io;
        bp::async_pipe pipe(io);
    
        auto on_exit = [](int code, std::error_code ec) {
            std::cout << timestamp() << "on_exit: " << ec.message() << " code "
                      << code << std::endl;
        };
    
        bp::child c( //
            "/bin/bash",
            std::vector<std::string>{
                "-c",
                "for a in {1..20}; do sleep 1; echo message $a; done",
            },                    //
            bp::std_out > pipe,   //
            bp::on_exit(on_exit), //
            io);
    
        boost::asio::streambuf sb;
        async_read_until(                //
            pipe, sb, "message 5\n",     //
            [&](error_code ec, size_t) { //
                std::cout << timestamp() << "Got message 5 (" << ec.message() << ")"
                          << std::endl;
            });
    
        io.run();
    }
    

    Prints

    5.025400s Got message 5 (Success)
    20.100547s on_exit: Success code 0
    

    So you can respond to content you're looking for when it happens. Keep in mind OS and shells do stream buffering on pipes, but the default is line-buffering so, you can expect to receive input as soon as a newline is printed.

    Large Buffers?

    The above kinda assumes that you can buffer the entire output up to the interesting message. What if that is gigabytes? As long as your pattern isn't gigabytes, you can keep reading until the criteria is matched.

    Let's morph our example into an async grep that looks for the regex class\s*\w+_heap in all of the boost headers. Of course, this is many megabytes of data, but we use only a 10Kb buffer:

    std::string text;
    auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte
    
    size_t total_received =0;
    boost::regex const re(R"(class\s*\w+_heap)");
    

    Now we make a read loop that reads until match or when the buffer is full:

    std::function<void()> wait_for_message;
    wait_for_message = [&] {
        async_read_until(                         //
            pipe, buf, re,                        //
            [&](error_code ec, size_t received) { //
                std::cerr << '\x0d' << timestamp() << "Checking for message ("
                          << ec.message() << ", total " << total_received
                          << ")                ";
    
                if (received || ec != boost::asio::error::not_found) {
                    total_received += received;
                    buf.consume(received);
    
                    boost::smatch m;
                    if (regex_search(text, m, re)) {
                        std::cout << "\n" << timestamp()
                                  << "Found: " << std::quoted(m.str()) << " at "
                                  << (total_received - m.size()) << " bytes"
                                  << std::endl;
                    }
                } else {
                    // discard 90% of buffer capacity
                    auto discard =
                        std::min(buf.max_size() / 10 * 9, buf.size());
                    total_received += discard;
                    buf.consume(discard);
                }
    
                if (!ec | (ec == boost::asio::error::not_found))
                    wait_for_message();
                else
                    std::cout << "\n" << timestamp() << ec.message() << std::endl;
            });
    };
    

    Of course, this system might miss matches if the match exceeds 10% of the buffer size (because we only keep 10% of the previous buffer contents to allow for matches overlapping read boundaries).

    Again, see it Live On Coliru

    #include <boost/process.hpp>
    #include <boost/process/async.hpp>
    #include <boost/asio.hpp>
    #include <boost/regex.hpp>
    #include <iostream>
    #include <iomanip>
    
    namespace bp = boost::process;
    using boost::system::error_code;
    
    namespace /*file-static*/ {
        using namespace std::chrono_literals;
        static auto       now = std::chrono::steady_clock::now;
        static const auto t0  = now();
    
        static auto timestamp() {
            return std::to_string((now() - t0) / 1.s) + "s ";
        }
    } // namespace
    
    int main() {
        boost::asio::io_context io;
        bp::async_pipe pipe(io);
    
        auto on_exit = [](int code, std::error_code ec) {
            std::cout << timestamp() << "on_exit: " << ec.message() << " code "
                      << code << std::endl;
        };
    
        bp::child c( //
            "/usr/bin/find",
            std::vector<std::string>{"/usr/local/include/boost", "-name",
                                     "*.hpp", "-exec", "cat", "{}", "+"},
            bp::std_out > pipe,   //
            bp::on_exit(on_exit), //
            io);
    
        std::string text;
        auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte
    
        size_t total_received =0;
        boost::regex const re(R"(class\s*\w+_heap)");
    
        std::function<void()> wait_for_message;
        wait_for_message = [&] {
            async_read_until(                         //
                pipe, buf, re,                        //
                [&](error_code ec, size_t received) { //
                    std::cerr << '\x0d' << timestamp() << "Checking for message ("
                              << ec.message() << ", total " << total_received
                              << ")                ";
    
                    if (received || ec != boost::asio::error::not_found) {
                        total_received += received;
                        buf.consume(received);
    
                        boost::smatch m;
                        if (regex_search(text, m, re)) {
                            std::cout << "\n" << timestamp()
                                      << "Found: " << std::quoted(m.str()) << " at "
                                      << (total_received - m.size()) << " bytes"
                                      << std::endl;
                        }
                    } else {
                        // discard 90% of buffer capacity
                        auto discard =
                            std::min(buf.max_size() / 10 * 9, buf.size());
                        total_received += discard;
                        buf.consume(discard);
                    }
    
                    if (!ec | (ec == boost::asio::error::not_found))
                        wait_for_message();
                    else
                        std::cout << "\n" << timestamp() << ec.message() << std::endl;
                });
        };
    
        wait_for_message();
        io.run();
    
        std::cout << timestamp() << " - Done, total_received: " << total_received << "\n";
    }
    

    Which prints

    2.033324s Found: "class d_ary_heap" at 6747512 bytes
    2.065290s Found: "class pairing_heap" at 6831390 bytes
    2.071888s Found: "class binomial_heap" at 6860833 bytes
    2.072715s Found: "class skew_heap" at 6895677 bytes
    2.073348s Found: "class fibonacci_heap" at 6921559 bytes
    34.729355s End of file
    34.730515s on_exit: Success code 0
    34.730593s  - Done, total_received: 154746011
    

    Or live from my machine:

    enter image description here