c++c++20boost-asioipccoroutine

Why can't I read from the stdout/stderr of a process spawned with forkpty in the C++20 coroutines + Asio single-threaded multi-coroutine model?


I am new to corotine and asio. I am working on a local process management tool where a client is responsible for launching a program. This client sends commands to a backend daemon that manages the process. I am using a single-threaded, multi-coroutine model. My daemon needs to interact with the child process to obtain logs for IPC transmission to the client.

My question is: why does my daemon get stuck at code 1 async_read_some and not trigger reading stdout, while code 2 works fine?

Here is a simple demo:

client code:

#include <iostream>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <thread>
#include <unistd.h>

void receive_output(int sockfd)
{
    char buffer[1024];
    while (true)
    {
        ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
        if (n <= 0)
            break;
        buffer[n] = '\0';
        std::cout << buffer << std::flush;
    }
}

void send_input(int sockfd)
{
    std::string input;
    while (std::getline(std::cin, input))
    {
        input += '\n';
        send(sockfd, input.c_str(), input.size(), 0);
    }
}

int main()
{
    int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
    struct sockaddr_un addr;
    addr.sun_family = AF_UNIX;
    strcpy(addr.sun_path, "/tmp/socket");

    connect(sockfd, (struct sockaddr *)&addr, sizeof(addr));

    std::string command = "test";
    send(sockfd, command.c_str(), command.size(), 0);

    std::thread output_thread(receive_output, sockfd);
    std::thread input_thread(send_input, sockfd);

    output_thread.join();
    input_thread.join();

    close(sockfd);
    return 0;
}

server code:

#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <cstdlib>
#include <fcntl.h>
#include <iostream>
#include <pty.h>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>

using boost::asio::awaitable;
using boost::asio::use_awaitable;
namespace asio = boost::asio;

boost::asio::io_context io_context;

void set_nonblocking(int fd)
{
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1)
    {
        perror("fcntl(F_GETFL)");
        return;
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1)
    {
        perror("fcntl(F_SETFL)");
    }
}

awaitable<std::string> receive_command(asio::posix::stream_descriptor &client_stream)
{
    char buffer[256];
    std::size_t len = co_await client_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);
    buffer[len] = '\0';
    co_return std::string(buffer);
}

awaitable<void> run_process_in_pty(const std::string &command, asio::posix::stream_descriptor &client_stream,
                                   int client_fd)
{
    int master_fd;
    pid_t pid = forkpty(&master_fd, NULL, NULL, NULL);
    //   set_nonblocking(master_fd);
    if (pid == 0)
    {
        // Child process: Set user and change directory
        setvbuf(stdout, nullptr, _IONBF, 0);

        // execute echo just for test
        // execl(command.c_str(), "", NULL);
        execl("/bin/echo", "echo", "Testing PTY output!", NULL);

        std::cerr << "Failed to execute command: " << strerror(errno) << std::endl;
        exit(EXIT_FAILURE);
    }
    else
    {
        // Parent process: Handle I/O between client and pty
        asio::posix::stream_descriptor pty_stream(io_context, master_fd);

        // handle output
        co_spawn(
            io_context,
            [&client_stream, &pty_stream]() -> awaitable<void> {
                char buffer[1024];
                try
                {
                    while (true)
                    {
                        // code 1
                        // std::size_t n = co_await pty_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);

                        // code 2
                        ssize_t n = read(pty_stream.native_handle(), buffer, sizeof(buffer));

                        std::cout << buffer << std::endl;
                        if (n == 0)
                            break; // End of stream

                        //   co_await asio::async_write(
                        //   client_stream, boost::asio::buffer(buffer, n),
                        //   use_awaitable);
                    }
                }
                catch (std::exception &e)
                {
                    std::cerr << strerror(errno) << std::endl;
                }
                co_return;
            },
            asio::detached);

        // handle input



        waitpid(pid, nullptr, 0);

        co_return;
    }
}

awaitable<void> accept_client(asio::posix::stream_descriptor client_stream, int client_fd)
{
    std::string command = co_await receive_command(client_stream);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process_in_pty(command, client_stream, client_fd);
    client_stream.close();
}

awaitable<int> async_accept(int server_fd)
{
    int client_fd;
    co_await asio::post(io_context, use_awaitable);
    client_fd = accept(server_fd, nullptr, nullptr);
    if (client_fd < 0)
    {
        std::cerr << "Failed to accept client connection" << std::endl;
        co_return - 1;
    }
    std::cout << "clientfd, " << client_fd << std::endl;
    set_nonblocking(client_fd);
    co_return client_fd;
}

awaitable<void> daemon_loop()
{
    const char *socket_path = "/tmp/socket";

    if (access(socket_path, F_OK) == 0)
    {
        if (unlink(socket_path) != 0)
        {
            std::cerr << "Failed to remove existing socket file" << std::endl;
            exit(EXIT_FAILURE);
        }
    }

    int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (server_fd < 0)
    {
        std::cerr << "Failed to create socket" << std::endl;
        exit(EXIT_FAILURE);
    }
    //   set_nonblocking(server_fd);

    struct sockaddr_un addr;
    memset(&addr, 0, sizeof(addr));
    addr.sun_family = AF_UNIX;
    strcpy(addr.sun_path, socket_path);
    if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
    {
        std::cerr << "Failed to bind socket" << std::endl;
        exit(EXIT_FAILURE);
    }

    if (listen(server_fd, 5) < 0)
    {
        std::cerr << "Failed to listen on socket" << std::endl;
        exit(EXIT_FAILURE);
    }

    asio::posix::stream_descriptor server_stream(io_context, server_fd);

    while (true)
    {
        int client_fd = co_await async_accept(server_fd);
        asio::posix::stream_descriptor client_stream(io_context, client_fd);
        std::cout << "Accepted connection" << std::endl;
        co_spawn(io_context, accept_client(std::move(client_stream), client_fd), asio::detached);
    }
}

int main()
{
    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
    return 0;
}

Here is the different running result between code 1 and code 2

code 1: enter image description here

code 2:

enter image description here


Solution

  • The most likely problem is your use of ::waitpid which blocks the only available IO service thread.

    Your co_spawn in run_process_in_pty also plays fast and loose with object lifetimes. It captures client_stream and pty_stream by reference, but they are either locals or references from another coroutine frame.

    Regardless, all the code just looks highly overcomplicated, somehow forcing C-style network code into Asio's coroutine framework. Why not use Asio? So you can get:

    awaitable<void> client_connection(Socket client) {
        std::cout << "Accepted connection" << std::endl;
        std::string command = co_await receive_command(client);
        std::cout << "Received command: " << command << std::endl;
    
        co_await run_process_in_pty(command, client, client.native_handle());
        // client.close(); // implicit due to RAII lifetime
    }
    
    awaitable<void> daemon_loop() {
        path socket_path = "/tmp/socket";
        auto ex          = co_await asio::this_coro::executor;
    
        if (exists(socket_path)) {
            if (!is_socket(socket_path))
                throw std::runtime_error("File exists and is not a socket");
            if (!remove(socket_path))
                throw std::runtime_error("Failed to remove existing socket file");
        }
    
        UNIX::acceptor acc(ex, socket_path.native());
        acc.listen(5);
    
        while (true) {
            co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
        }
    }
    
    int main() {
        boost::asio::io_context io_context;
    
        std::cout << "start" << std::endl;
        co_spawn(io_context, daemon_loop(), asio::detached);
        io_context.run();
    }
    

    BONUS DEMO

    As a bonus, let me also demonstrate how to use Boost Process for the child process instead:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/process/v2.hpp>
    #include <filesystem>
    #include <iostream>
    #include <pty.h>
    #include <sys/wait.h>
    
    namespace asio = boost::asio;
    namespace bp   = boost::process::v2;
    using UNIX     = asio::local::stream_protocol;
    using Socket   = UNIX::socket;
    using std::filesystem::path;
    
    asio::awaitable<std::string> receive_command(UNIX::socket& client_stream) {
    #if 1
        std::string buf;
        co_await async_read_until(client_stream, asio::dynamic_buffer(buf), '\n', asio::deferred);
        while (buf.ends_with('\n'))
            buf.pop_back();
        co_return buf;
    #else
        char buffer[512];
        auto n = co_await client_stream.async_read_some(asio::buffer(buffer), asio::deferred);
        co_return std::string(buffer, n);
    #endif
    }
    
    asio::awaitable<void> run_process( //
        [[maybe_unused]] std::string const& command, Socket& client_stream) try {
        asio::any_io_executor ex = co_await asio::this_coro::executor;
    
        //bp::popen child(ex, "/bin/echo", {"Testing PTY output for command [", command, "]"});
        bp::popen child(ex, "/bin/sh", {"-c", command});
    
        // int client_fd = client_stream.native_handle();
    
        for (char buffer[1024]; ;) {
            // auto [ec, n] = co_await child.async_read_some(boost::asio::buffer(buffer),
            // asio::as_tuple(asio::deferred));
            auto n = co_await child.async_read_some(boost::asio::buffer(buffer));
    
            std::cout << quoted(std::string_view(buffer, n)) << std::endl;
    
            if (n)
                co_await async_write(client_stream, boost::asio::buffer(buffer, n), asio::deferred);
    
            // if (ec) {
            // std::cout << "End of stream (" << ec.message() << ")" << std::endl;
            // break;
            //}
        }
    } catch (boost::system::system_error const& se) {
        std::cerr << "run_process: " << se.code().message() << std::endl;
    } catch (std::exception const& e) {
        std::cerr << "run_process: " << e.what() << std::endl;
    }
    
    asio::awaitable<void> client_connection(Socket client) {
        std::cout << "Accepted connection" << std::endl;
        std::string command = co_await receive_command(client);
        std::cout << "Received command: " << command << std::endl;
    
        co_await run_process(command, client);
        client.close(); // implicit due to RAII lifetime
    }
    
    asio::awaitable<void> daemon_loop() {
        path socket_path = "/tmp/socket";
        auto ex          = co_await asio::this_coro::executor;
    
        if (exists(socket_path)) {
            if (!is_socket(socket_path))
                throw std::runtime_error("File exists and is not a socket");
            if (!remove(socket_path))
                throw std::runtime_error("Failed to remove existing socket file");
        }
    
        UNIX::acceptor acc(ex, socket_path.native());
        acc.listen(5);
    
        while (true) {
            co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
        }
    }
    
    int main() {
        boost::asio::io_context io_context;
    
        std::cout << "start" << std::endl;
        co_spawn(io_context, daemon_loop(), asio::detached);
        io_context.run();
    }
    

    With a local, more functional demonstration: