c++socketsboost-asioasio

ASIO continuously send requests to and get responses from the server over TCP using asio::async_write, asio::async_read, and asio::async_read_until


I am working on a C++ client/server application with standalone asio without boost. I want the client application to continuously get responses from the server and send requests to the server when a client is trying to do so. However, I do not know where exactly I should put asio::io_context::run in my code to make it work as long as the application works. In my application I also use Dear ImGui library for the GUI-related stuff, which is run in a separate thread. ASIO is running in the main thread. I have noticed that some people run io_context in another thread right after calling asio::async_connect which invokes the first async_read operation to prime ASIO with some work to do to avoid premature stops. In my code I tried the same technique but it still exits prematurely. I am including a part of my code with my main function and a client class. In this program, I want to first send the size of my request with a specified delimeter and then it sends the request itself, then the server reads the request size using asio::async_read_until until the specified delimeter and prepares the request buffer with the given size to read the request itself. Then the server also sends response size and the response itself in the same way.

class TCPClient 
{
public:
    TCPClient(std::string& IP, std::string& PORT);
    ~TCPClient();
    
    void push_request(std::string& request);
    void connect();

private:
    void start_read();
    void process_write();

    asio::io_context context;
    asio::io_context::work work;
    std::shared_ptr<tcp::socket> socket;
    
    std::shared_ptr<asio::streambuf> buffer;
    std::queue<std::string> requests;
    
    const std::string delimeter = "\n";

    std::thread io_thread;

    tcp::resolver resolver;
    tcp::resolver::results_type endpoints;
};

int main()
{
    OpenSSL_add_all_algorithms();
    ERR_load_crypto_strings();

    std::thread gui_thread;

    try
    {
        std::string IP = "IP";
        std::string PORT = "PORT";

        TCPClient client(IP, PORT);

        gui_thread = std::thread([&client] {
            if (!std::filesystem::exists("public.pem"))
            {
                std::string request = "key";
                client.push_request(request);
            }

            std::unique_lock<std::mutex> lock(options.mutex);
            options.condition.wait(lock, [] { return std::filesystem::exists("public.pem"); });

            login_register_window(client);

            if (options.is_logged && options.is_retrieved)
            {
                main_window(client);
            }
            else
            {
                std::cout << "Application should stop\n";
                client.~TCPClient();
            }
            }
        );

        client.connect();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << '\n';
    }
    
    gui_thread.join();

    return 0;
}

TCPClient::TCPClient(std::string& IP, std::string& PORT)
    : socket(std::make_shared<tcp::socket>(context)),
    buffer(std::make_shared<asio::streambuf>()), resolver(context),
    endpoints(resolver.resolve(IP, PORT)), work(context)
{ }

TCPClient::~TCPClient()
{
    context.stop();
    io_thread.join();
}

void TCPClient::connect()
{
    asio::async_connect(*socket, endpoints, [this](std::error_code ec, tcp::endpoint) {
        if (ec)
        {
            std::cerr << "Error connecting: " << ec.message() << '\n';
        }
        });

    start_read();

    io_thread = std::thread([this]() { context.run(); });
}

void TCPClient::push_request(std::string& request)
{
    bool is_queue_empty = false;

    {
        std::unique_lock<std::mutex> lock(options.mutex);
        
        is_queue_empty = requests.empty();
        requests.push(request);
    }

    options.condition.notify_one();

    if (is_queue_empty)
        process_write();
}

void TCPClient::process_write()
{
    asio::post(context, [this]() {
        std::unique_lock<std::mutex> lock(options.mutex);

        if (!requests.empty()) {
            std::string request = requests.front();
            requests.pop();

            bool is_queue_empty = requests.empty();

            lock.unlock();

            std::string request_size = std::to_string(request.size()) + delimeter;

            asio::async_write(*socket, asio::buffer(request_size),
                [this, request = std::move(request), is_queue_empty = std::move(is_queue_empty)](const std::error_code& ec, std::size_t bytes) {
                    if (!ec) {
                        std::cout << "Request size (" << bytes << " bytes) was sent.\n";

                        asio::async_write(*socket, asio::buffer(request),
                            [this, is_queue_empty = std::move(is_queue_empty)](const std::error_code& ec, std::size_t bytes) {
                                if (!ec) {
                                    std::cout << "Request (" << bytes << " bytes) was sent.\n";

                                    if (!is_queue_empty)
                                        process_write();  // Initiate the next write operation
                                }
                                else {
                                    std::cerr << "Error sending request: " << ec.message() << '\n';
                                }
                                });
                    }
                    else {
                        std::cerr << "Error sending request size: " << ec.message() << '\n';
                    }
                    });
        }
        });
}

void TCPClient::start_read()
{
    asio::async_read_until(*socket, *buffer, delimeter,
        [this](const std::error_code& ec, std::size_t bytes) {
            if (!ec) {
                std::istream input_stream(buffer.get());
                std::string response_size_str;
                std::getline(input_stream, response_size_str);
                
                buffer->consume(bytes);

                response_size_str = response_size_str.substr(0, response_size_str.find(delimeter));

                int response_size = std::stoi(response_size_str);

                asio::async_read(*socket, asio::buffer(buffer->prepare(response_size), response_size),
                    [this](const std::error_code& ec, std::size_t bytes) {
                        if (!ec)
                        {
                            std::istream input_stream(buffer.get());
                            std::string response;
                            std::getline(input_stream, response);

                            buffer->consume(bytes);

                            if (options.is_logged)
                            {
                                handle_response(response);
                            }
                            else
                            {
                                std::cout << "Response received: " << response << '\n';
                                handle_login(response);
                            }

                            start_read();
                        }
                        else
                        {
                            std::cerr << "Error getting response: " << ec.message() << '\n';
                        }
                        });
            }
            else {
                std::cerr << "Error getting response size: " << ec.message() << '\n';
            }
            });
}

I checked the examples provided by boost::asio but it did not work for me, I searched some forums online but that still did not solve my issue


Solution

  • The biggest issue is that TCPClient is destructed at the end of the try block. Nobody waits for anything, and the destructor stops the io context. You could see this with a debugger, or e.g. adding some tracing:

    ~TCPClient() {
        std::cout << "PROOF OF " << __FUNCTION__ << std::endl;
        context.stop();
        io_thread.join();
    }
    

    Review Remarks

    "Never" manually invoke a destructor.

    Don't unnecessarily use dynamic allocation (even when using shared or other smart pointers).

    Don't pass local variables to async operations by reference (like request_size, which is a poor name for that variable by the way).

    When you use istream on streambuf, you MUST NOT consume the bytes, because the stream extraction operations do that!

    streambuf::prepare is only good to use when using async operations that require a single, fixed buffer (like tcp::socket::async_read_some). For composed operations (taking a dynamic buffer) just pass the streambuf.

    Asynchronous operations ALWAYS return immediately, so by definition before they have completed. Therefore, this cannot work:

    asio::async_connect(socket, endpoints, [this](std::error_code ec, tcp::endpoint) {
        if (ec) {
            std::cerr << "Error connecting: " << ec.message() <<std::endl;
        }
    });
    
    start_read();
    

    Als the very least, the start_read() needs to appear inside the completion handler for async_connect.

    async_connect(socket, endpoints, [this](error_code ec, tcp::endpoint) {
        std::cout << "Connection: " << ec.message() << std::endl;
        if (!ec)
            start_read();
    });
    

    I do agree that starting the io_context::run after posting the first async operation is good. However, since you have a work guard, it is redundant, and you should probably just start the thread in the constructor for clarity.

    However, prefer a modern work_guard which you can reset, so you can actually complete in the destructor. Of course, it doesn't matter because you forcibly stop the context.

    More Problems (reading on)

    You post the write loop. But it may block (due to the locks) and also it just launches async operations? Perhaps you meant to use the io thread to ensure serialized access to the members. In that case, why have the mutex as well. Just post the push_request itself:

    void push_request(std::string request) {
        post(socket.get_executor(), [this, request = std::move(request)]() mutable {
            requests.push(std::move(request));
            if (requests.size() == 1)
                do_write_loop(); // renamed from "process_write"
        });
    }
    

    Also, don't move the buffers:

    std::string request = requests.front();
    requests.pop();
    

    This again causes the buffer to be a local variable which is not suited for the async_write. Instead, leave the message in the queue (std::deque has reference stability for insertion/removal at either end).

    Combine the writes:

    requests.front().insert(0, std::to_string(request.size()) + delimiter);
    

    Now the entire write loop simplifies to

    void do_write_loop() { // on the logical strand
        if (requests.empty())
            return;
        requests.front().insert(0, std::to_string(requests.size()) + delimiter);
    
        async_write(socket, asio::buffer(requests.front()), [this](error_code ec, size_t bytes) {
            if (!ec) {
                std::cout << "Request (" << bytes << " bytes) was sent." << std::endl;
    
                requests.pop();
                do_write_loop(); // Initiate the next write operation
            } else {
                std::cerr << "Error sending request: " << ec.message() << std::endl;
            }
        });
    }
    

    Loose Ends

    As mentioned, the consume is redundant. However, so is the istream?! Just simplify, let the stream work for you?

    int response_size = 0;
    if ((std::istream(&buffer) >> response_size).ignore(9999, delimiter).bad()) {
        std::cerr << "Invalid response header" << std::endl;
        return;
    }
    

    Don't take constructor arguments by mutable reference needlessly.

    It's unclear what the condition is waiting for. Apparently after sending the key request, it waits... for a file to magically appear on the filesystem. This seems like a pretty brittle choice of IPC mechanism, especially since all the synchronizing parts are in this same process, as far as I can see. For now let's just put the magic spell in do_write_loop:

    { // magic wake up spell
        std::lock_guard<std::mutex> lock(options.mutex);
        options.condition.notify_one();
    }
    

    REVIEWED LISTING

    Live On Coliru

    #include <boost/asio.hpp>
    #include <filesystem>
    #include <iostream>
    #include <queue>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    using boost::system::error_code; // std::error_code; // for standalone
    
    struct {
        std::mutex              mutex;
        std::condition_variable condition;
        std::atomic_bool        is_logged    = false;
        std::atomic_bool        is_retrieved = false;
    } options;
    
    struct TCPClient {
        TCPClient(std::string const& host, std::string const& service) { //
            endpoints = tcp::resolver{context}.resolve(host, service);
        }
    
        ~TCPClient() {
            context.stop();
            io_thread.join();
        }
    
        void push_request(std::string request) {
            post(socket.get_executor(), [this, request = std::move(request)]() mutable {
                requests.push(std::move(request));
                if (requests.size() == 1)
                    do_write_loop(); // renamed from "process_write"
            });
        }
    
        void connect() {
            async_connect(socket, endpoints, [this](error_code ec, tcp::endpoint) {
                std::cout << "Connection: " << ec.message() << std::endl;
                if (!ec)
                    start_read();
            });
        }
    
      private:
        // on the logical strand
        void do_write_loop() {
            { // magic wake up spell
                std::lock_guard<std::mutex> lock(options.mutex);
                options.condition.notify_one();
            }
    
            if (requests.empty())
                return;
            requests.front().insert(0, std::to_string(requests.size()) + delimiter);
    
            async_write(socket, asio::buffer(requests.front()), [this](error_code ec, size_t bytes) {
                if (!ec) {
                    std::cout << "Request (" << bytes << " bytes) was sent." << std::endl;
    
                    requests.pop();
                    do_write_loop(); // Initiate the next write operation
                } else {
                    std::cerr << "Error sending request: " << ec.message() << std::endl;
                }
            });
        }
    
        void handle_response(std::string) const {}
        void handle_login(std::string) const { options.is_logged = true; }
    
        void start_read() {
            async_read_until(socket, buffer, delimiter, [this](error_code ec, size_t /*bytes*/) {
                if (ec && ec != asio::error::eof) {
                    std::cerr << "Error getting response size: " << ec.message() << std::endl;
                    return;
                }
    
                int response_size = 0;
                if ((std::istream(&buffer) >> response_size).ignore(9999, delimiter).bad()) {
                    std::cerr << "Invalid response header" << std::endl;
                    return;
                }
    
                async_read(socket, buffer, asio::transfer_exactly(response_size),
                           [this](error_code ec, size_t bytes) {
                               std::string response(buffer_cast<char const*>(buffer.data()), bytes);
                               buffer.consume(bytes);
                               if (!ec) {
                                   if (options.is_logged) {
                                       handle_response(response);
                                   } else {
                                       std::cout << "Response received: " << response << std::endl;
                                       handle_login(response);
                                   }
    
                                   start_read();
                               } else {
                                   std::cerr << "Error getting response: " << ec.message() << std::endl;
                               }
                           });
            });
        }
    
        asio::io_context            context;
        tcp::socket                 socket{context};
        tcp::resolver::results_type endpoints;
    
        asio::streambuf         buffer;
        std::queue<std::string> requests;
    
        static constexpr char delimiter = '\n';
    
        asio::io_context::work work{context};
        std::thread            io_thread{[this] { context.run(); }};
    };
    
    void main_window(TCPClient&) {}
    void login_register_window(TCPClient&) {}
    
    #include <openssl/evp.h>
    #include <openssl/err.h>
    
    int  main() try {
        OpenSSL_add_all_algorithms();
        ERR_load_crypto_strings();
    
        TCPClient client("localhost", "8989");
    
        std::thread gui_thread([&client] {
            if (!std::filesystem::exists("public.pem"))
                client.push_request("key");
    
            std::unique_lock<std::mutex> lock(options.mutex);
            options.condition.wait(lock, [] { return std::filesystem::exists("public.pem"); });
    
            login_register_window(client);
    
            if (options.is_logged && options.is_retrieved) {
                main_window(client);
            } else {
                std::cout << "Application should stop" << std::endl;
            }
        });
    
        client.connect();
    
        gui_thread.join();
    } catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }
    

    Live Demo

    enter image description here