c++multithreadingboostboost-asioboost-beast

Boost.Beast WebSocket Server - "Read error: The I/O operation has been aborted" on Client Connection


I'm building a WebSocket server using Boost.Asio and Boost.Beast with SSL. The server accepts connections, but I'm encountering an issue where the client disconnects shortly after connecting. The error message I get is:

Read error: The I/O operation has been aborted because of either a thread exit or an application request (Code: 995)

Here’s a breakdown of my setup:

  1. The server uses async_accept to accept new client connections, where each connection is wrapped in an SslWebSocket instance.
  2. A ConnectionManager keeps track of connected clients.
  3. I use async_read to read incoming messages and route them appropriately.
  4. There is also a ping mechanism to keep connections alive.

The server logs the following messages in sequence:

Code Sample:

Here’s an excerpt from the do_accept, start_read, and main methods:

int main(int argc, char** argv) {
    // Create io_context and SSL context
    boost::asio::io_context io_context;
    asio::ssl::context ssl_ctx(asio::ssl::context::sslv23);
    ssl_ctx.set_verify_mode(asio::ssl::verify_none); // Disable SSL verification for testing

    // Define the server endpoint (IP and port)
    tcp::endpoint endpoint(tcp::v4(), 8080);
    auto logger = std::make_shared<FileLogger>(io_context, "server.log"); // Assuming you have a logger class

    // Create the WebSocket server
    auto server = std::make_shared<WebSocketServer>(io_context, ssl_ctx, endpoint, logger);

    // Start the server in a separate thread
    std::thread server_thread([server]() {
        server->start(); // Assuming you have a start method in WebSocketServer
        });

    // Run the io_context in the main thread
    io_context.run();

    // Optionally join the server thread if you want to wait for it
    server_thread.join();

    return 0; // Exit the program when the server is stopped (if ever)
}

void WebSocketServer::start() {
    std::cout << "Starting WebSocket server..." << std::endl;

    try {
        // Start listening for incoming connections
        acceptor_.listen();
        do_accept();  // Begin accepting connections
    }
    catch (const std::exception& e) {
        std::cerr << "Error starting server: " << e.what() << std::endl;
    }
}
void WebSocketServer::do_accept() {
    std::cout << "Waiting for connections..." << std::endl;
    acceptor_.listen();

    // Use a shared pointer to manage the resolver's lifetime
    auto resolver = std::make_shared<tcp::resolver>(acceptor_.get_executor());

    acceptor_.async_accept(asio::bind_executor(strand_, [this, resolver](beast::error_code ec, tcp::socket socket) {
        if (!ec) {
            auto ws = std::make_shared<SslWebSocket>(std::move(socket), ssl_ctx_);
            std::cout << "Connection accepted." << std::endl;

            // Attempting to add client immediately
            std::cout << "Attempting to add client..." << std::endl;

            // Add the connection to the manager
            connection_manager_->add_connection(ws);

            // Use a separate post to ensure thread safety
            asio::post(strand_, [this, ws]() {
                // Check if the connection was added successfully
                if (connection_manager_->is_connected(ws)) {
                    std::cout << "Client added successfully." << std::endl;

                    // Start the ping timer
                    asio::steady_timer ping_timer(ws->get_executor(), std::chrono::seconds(30));
                    start_ping(ws, ping_timer, std::chrono::seconds(30));

                    // Start reading messages from the client
                    start_read(ws); // Start reading messages from the client
                } else {
                    std::cout << "Failed to add client." << std::endl;
                    // Close the WebSocket if it was not added
                    ws->async_close(beast::websocket::close_code::normal, [](beast::error_code ec) {
                        if (ec) {
                            std::cerr << "Error closing WebSocket: " << ec.message() << std::endl;
                        }
                    });
                }
            });
        } else {
            std::cerr << "Error on accept: " << ec.message() << std::endl;
        }

        // Continue accepting new connections
        do_accept();  // This should be called after handling the current connection
    }));
}

void WebSocketServer::start_read(std::shared_ptr<SslWebSocket> ws) {
    auto buffer = std::make_shared<beast::flat_buffer>(); // Use a unique buffer for this read
    ws->async_read(*buffer, [this, ws, buffer](beast::error_code ec, std::size_t bytes_transferred) {
        if (ec) {
            std::cout << "Read error: " << ec.message() << " (Code: " << ec.value() << ")" << std::endl;
            connection_manager_->remove_connection(ws);
            return;
        }

        // Process the received message
        std::string received_message(beast::buffers_to_string(buffer->data()));
        buffer->consume(bytes_transferred); // Clear the buffer after reading

        // Here, you can handle the received message (e.g., route it)
        Message msg = Message::fromJson(received_message); // Assuming the message is a JSON string
        route_message(msg, ws); // Route the message to the appropriate handler

        // Continue reading if the connection is still valid
        if (connection_manager_->is_connected(ws)) {
            start_read(ws); // Continue reading if connection is still valid
        }
        else {
            std::cout << "Client has disconnected. Stopping reads." << std::endl;
        }
        });
}

Questions:

  1. What could cause this 995 error code in a Boost.Beast WebSocket server?
  2. Are there specific threading or strand-related configurations that might help resolve this?

Solution

  • I made it self-contained from the first question code. Adding a Tracer shows where the websocket is destructed:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/ssl.hpp>
    #include <iostream>
    namespace asio  = boost::asio;
    namespace beast = boost::beast;
    using asio::ip::tcp;
    
    template <typename T>
    struct Tracing : T {
        using T::T;
        using T::operator=;
    
        ~Tracing() { std::cout << __PRETTY_FUNCTION__ << std::endl; }
    };
    
    using SslWebSocket = Tracing<boost::beast::websocket::stream<boost::beast::ssl_stream<tcp::socket>>>;
    
    struct Message {
        std::string type;
        std::string data;
    
        static Message fromJson(std::string const& /*json*/) {
            // Parse the JSON string and return a Message object
            return Message{"type", "data"};
        }
    };
    
    struct ConnectionManager {
        void add_connection(std::shared_ptr<SslWebSocket>) {}    // Add the connection to the manager
        void remove_connection(std::shared_ptr<SslWebSocket>) {} // Remove the connection from the manager
        bool is_connected(std::shared_ptr<SslWebSocket>) { return true; } // Check if the connection is still valid
    };
    
    struct FileLogger {
        FileLogger(asio::io_context&, std::string const&) {} // Open the log file
        void log(std::string const& message);
    };
    
    class WebSocketServer {
      public:
        WebSocketServer(asio::io_context& io_context, asio::ssl::context& ssl_ctx, tcp::endpoint const& endpoint,
                        std::shared_ptr<FileLogger> logger)
            : io_context_(io_context)
            , ssl_ctx_(ssl_ctx)
            , acceptor_(io_context, endpoint)
            , strand_(make_strand(io_context))
            , logger_(logger) //
        {
            connection_manager_ = std::make_shared<ConnectionManager>();
        }
    
        void start() { do_accept(); }      // Start accepting connections
        void stop() { acceptor_.close(); } // Stop accepting new connections
    
      private:
        void do_accept();
        void start_read(std::shared_ptr<SslWebSocket> ws);
        void start_ping(std::shared_ptr<SslWebSocket> ws, asio::steady_timer& timer,
                        std::chrono::seconds interval);
        void route_message(Message const& msg, std::shared_ptr<SslWebSocket> ws);
    
        asio::io_context&                             io_context_;
        asio::ssl::context&                           ssl_ctx_;
        tcp::acceptor                                 acceptor_;
        asio::strand<asio::io_context::executor_type> strand_;
        std::shared_ptr<FileLogger>                   logger_;
        std::shared_ptr<ConnectionManager>            connection_manager_;
    };
    
    void WebSocketServer::do_accept() {
        std::cout << "Waiting for connections..." << std::endl;
        acceptor_.listen();
    
        // Use a shared pointer to manage the resolver's lifetime
        auto resolver = std::make_shared<tcp::resolver>(acceptor_.get_executor());
    
        acceptor_.async_accept(
            asio::bind_executor(strand_, [this, resolver](beast::error_code ec, tcp::socket socket) {
                if (!ec) {
                    auto ws = std::make_shared<SslWebSocket>(std::move(socket), ssl_ctx_);
                    std::cout << "Connection accepted." << std::endl;
    
                    // Attempting to add client immediately
                    std::cout << "Attempting to add client..." << std::endl;
    
                    // Add the connection to the manager
                    connection_manager_->add_connection(ws);
    
                    // Use a separate post to ensure thread safety
                    asio::post(strand_, [this, ws]() {
                        // Check if the connection was added successfully
                        if (connection_manager_->is_connected(ws)) {
                            std::cout << "Client added successfully." << std::endl;
    
                            // Start the ping timer
                            asio::steady_timer ping_timer(ws->get_executor(), std::chrono::seconds(30));
                            start_ping(ws, ping_timer, std::chrono::seconds(30));
    
                            // Start reading messages from the client
                            start_read(ws); // Start reading messages from the client
                        } else {
                            std::cout << "Failed to add client." << std::endl;
                            // Close the WebSocket if it was not added
                            ws->async_close(beast::websocket::close_code::normal, [](beast::error_code ec) {
                                if (ec) {
                                    std::cerr << "Error closing WebSocket: " << ec.message() << std::endl;
                                }
                            });
                        }
                    });
                } else {
                    std::cerr << "Error on accept: " << ec.message() << std::endl;
                }
    
                // Continue accepting new connections
                do_accept(); // This should be called after handling the current connection
            }));
    }
    
    void WebSocketServer::start_read(std::shared_ptr<SslWebSocket> ws) {
        auto buffer = std::make_shared<beast::flat_buffer>(); // Use a unique buffer for this read
        ws->async_read(*buffer, [this, ws, buffer](beast::error_code ec, std::size_t bytes_transferred) {
            if (ec) {
                std::cout << "Read error: " << ec.message() << " (Code: " << ec.value() << ")" << std::endl;
                connection_manager_->remove_connection(ws);
                return;
            }
    
            // Process the received message
            std::string received_message(beast::buffers_to_string(buffer->data()));
            buffer->consume(bytes_transferred); // Clear the buffer after reading
    
            // Here, you can handle the received message (e.g., route it)
            Message msg = Message::fromJson(received_message); // Assuming the message is a JSON string
            route_message(msg, ws);                            // Route the message to the appropriate handler
    
            // Continue reading if the connection is still valid
            if (connection_manager_->is_connected(ws)) {
                start_read(ws); // Continue reading if connection is still valid
            } else {
                std::cout << "Client has disconnected. Stopping reads." << std::endl;
            }
        });
    }
    
    int main() {
        // Create io_context and SSL context
        boost::asio::io_context io_context;
        asio::ssl::context      ssl_ctx(asio::ssl::context::sslv23);
        ssl_ctx.set_verify_mode(asio::ssl::verify_none); // Disable SSL verification for testing
    
        // Define the server endpoint (IP and port)
        tcp::endpoint endpoint(tcp::v4(), 8080);
        auto logger = std::make_shared<FileLogger>(io_context, "server.log"); // Assuming you have a logger class
    
        // Create the WebSocket server
        auto server = std::make_shared<WebSocketServer>(io_context, ssl_ctx, endpoint, logger);
    
        // Start the server in a separate thread
        std::thread server_thread([server]() {
            server->start(); // Assuming you have a start method in WebSocketServer
            std::cout << "server_thread already exited" << std::endl;
        });
    
        // Run the io_context in the main thread
        io_context.run();
    
        // Optionally join the server thread if you want to wait for it
        server_thread.join();
    }
    
    // more stubs:
    
    void WebSocketServer::start_ping(std::shared_ptr<SslWebSocket> ws, asio::steady_timer& timer,
                                     std::chrono::seconds interval) {
        timer.async_wait([this, ws, &timer, interval](beast::error_code ec) {
            if (!ec) {
                ws->async_ping({}, [this, ws, &timer, interval](beast::error_code ec) {
                    if (ec) {
                        std::cerr << "Ping failed: " << ec.message() << std::endl;
                        connection_manager_->remove_connection(ws);
                        return;
                    }
    
                    // Restart the timer for the next ping
                    start_ping(ws, timer, interval);
                });
            } else {
                std::cerr << "Ping timer error: " << ec.message() << std::endl;
                connection_manager_->remove_connection(ws);
            }
        });
    }
    
    void WebSocketServer::route_message(Message const& msg, std::shared_ptr<SslWebSocket>) {
        // Route the message based on the message type
        if (msg.type == "type1") {
            // Handle type1 message
        } else if (msg.type == "type2") {
            // Handle type2 message
        } else {
            // Unknown message type
        }
    }
    
    // FileLogger stubs
    void FileLogger::log(std::string const& message) { std::cout << "Logged message: " << message << std::endl; }
    

    Output of a sample run:

    Waiting for connections...
    server_thread already exited
    Connection accepted.
    Attempting to add client...
    Waiting for connections...
    Client added successfully.
    Read error: Operation canceled (Code: 125)
    Ping timer error: Operation canceled
    Waiting for connections...
    server_thread already exited
    Connection accepted.
    Attempting to add client...
    Waiting for connections...
    Client added successfully.
    Read error: Operation canceled (Code: 125)
    Ping timer error: Operation canceled
    Tracing<boost::beast::websocket::stream<boost::beast::ssl_stream<boost::asio::basic_stream_socket<boost::asio::ip::
    tcp>>>>::~Tracing() [T = boost::beast::websocket::stream<boost::beast::ssl_stream<boost::asio::basic_stream_socket<
    boost::asio::ip::tcp>>>]
    

    The debugger will help you see that the problem is that ping_timer is local variable, and its destruction immediate cancels the timer. The causes the timer.async_wait completion to remove the connection.

    Other Problems

    There's a boatload of other problems.

    I might post a fixed up version if I don't have to spend my time elsewhere soon.

    UPDATE

    Refactoring to the above description leads to: https://coliru.stacked-crooked.com/a/aa8931128b1afddd

    However it revealed another GLARING error: all of the SSL handshake is missing, and so is the Websocket handshake. So adding that too:

    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <boost/beast.hpp>
    #include <boost/beast/ssl.hpp>
    #include <iostream>
    #include <list>
    namespace asio      = boost::asio;
    namespace ssl       = asio::ssl;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using asio::ip::tcp;
    
    struct FileLogger {
        FileLogger(asio::io_context&, std::string const&) {} // Open the log file
        void log(std::string const& message) { std::cout << "Logged message: " << message << std::endl; }
    };
    
    namespace WebSocket {
        struct Message {
            std::string type, data;
    
            static Message fromJson(std::string const& json) {
                // Parse the JSON string and return a Message object
                std::cout << "Received message: " << quoted(json) << std::endl;
                return Message{"type1", "data"};
            }
        };
    
        struct Session;
        using Router = std::function<bool(Message const& msg, std::shared_ptr<Session> ws)>;
    
        struct Session : std::enable_shared_from_this<Session> {
            Session(tcp::socket s, std::shared_ptr<FileLogger> logger, ssl::context& ctx, Router router)
                : stream_(std::move(s), ctx), logger_(std::move(logger)), router_(std::move(router)) {}
    
            void start() {
                std::cout << "Session accepted (" << peer_ << ")" << std::endl;
                do_ssl_handshake(); // no dispatch needed, as session creator will start this
            }
    
            ~Session() { std::cout << "Session closed (" << peer_ << ")" << std::endl; }
    
          private:
            using Stream     = websocket::stream<boost::beast::ssl_stream<tcp::socket>>;
            using close_code = websocket::close_code;
            Stream                      stream_;
            tcp::endpoint               peer_{beast::get_lowest_layer(stream_).remote_endpoint()};
            std::shared_ptr<FileLogger> logger_;
            asio::steady_timer          ping_timer{stream_.get_executor()};
            Router                      router_;
            beast::flat_buffer          buffer; // Use a unique buffer for this read
    
            void do_ssl_handshake() {
                stream_.next_layer().async_handshake( //
                    Stream::next_layer_type::handshake_type::server,
                    [this, self = shared_from_this()](beast::error_code ec) {
                        if (!ec)
                            do_ws_handshake();
                        else
                            std::cerr << "Handshake failed: " << ec.message() << std::endl;
                    });
            }
    
            void do_ws_handshake() {
                stream_.async_accept([this, self = shared_from_this()](beast::error_code ec) {
                    if (!ec) {
                        std::cout << "WebSocket handshake successful" << std::endl;
                        timer_loop(std::chrono::seconds(30));
                        read_loop();
                    } else {
                        std::cerr << "WebSocket handshake failed: " << ec.message() << std::endl;
                    }
                });
            }
    
            // all private member functions are on the strand of the stream
            void timer_loop(std::chrono::steady_clock::duration interval) {
                ping_timer.expires_after(interval);
    
                ping_timer.async_wait([=, this, self = shared_from_this()](beast::error_code ec) {
                    if (!ec) {
                        stream_.async_ping({}, [=, this](beast::error_code ec) {
                            if (ec) {
                                std::cerr << "Ping failed: " << ec.message() << std::endl;
                                stream_.async_close(close_code::going_away, asio::detached);
                            } else {
                                timer_loop(interval); // Restart the timer for the next ping
                            }
                        });
                    } else {
                        std::cerr << "Ping timer: " << ec.message() << std::endl;
                        if (ec != asio::error::operation_aborted)
                            stream_.async_close(close_code::abnormal, asio::detached);
                    }
                });
            }
    
            void read_loop() {
                stream_.async_read( //
                    buffer, [this, self = shared_from_this()](beast::error_code const ec, size_t xfr) {
                        if (!ec.failed() && router_) {
                            // Process the received message
                            std::string received_message(beast::buffers_to_string(buffer.data()).substr(0, xfr));
                            buffer.consume(xfr); // Clear the buffer after reading
    
                            Message msg = Message::fromJson(received_message);
    
                            if (router_(msg, self))
                                return read_loop(); // early exit
                        }
    
                        if (ec.failed()) {
                            std::cerr << "IO error " << ec.message() << std::endl;
                            stream_.async_close(close_code::protocol_error, asio::detached);
                        } else if (!router_) {
                            std::cerr << "No router set" << std::endl;
                            stream_.async_close(close_code::policy_error, asio::detached);
                        } else {
                            std::cerr << "Closing" << std::endl;
                            stream_.async_close(close_code::normal, asio::detached);
                        }
    
                        ping_timer.cancel(); // also cancel the ping timer
                    });
            }
        };
    
        struct SessionManager {
            SessionManager(ssl::context& ctx, Router router) : ssl_ctx_(ctx), router_(std::move(router)) {}
    
            std::shared_ptr<Session> create(tcp::socket s, std::shared_ptr<FileLogger> logger) {
                garbage_collect();
    
                auto sess = std::make_shared<Session>(std::move(s), logger, ssl_ctx_, router_);
                sessions_.push_back(sess);
                return sess;
            }
    
            size_t numberConnected() const { return sessions_.size(); }
    
          private:
            ssl::context& ssl_ctx_;
            Router        router_;
    
            void garbage_collect() {
                sessions_.remove_if([](Handle const& h) { return h.expired(); });
            }
    
            using Handle = std::weak_ptr<Session>;
            std::list<Handle> sessions_;
        };
    
        class Server {
          public:
            Server(asio::io_context& io_context, ssl::context& ssl_ctx, tcp::endpoint const& endpoint,
                   Router router, std::shared_ptr<FileLogger> logger)
                : acceptor_(make_strand(io_context), endpoint)
                , logger_(logger)
                , session_manager_(ssl_ctx, std::move(router)) {}
    
            void start() {
                /* assuming start is called from the single thread that created the
                 * server, no need for strand dispatch
                 */
                acceptor_.listen();
                do_accept();
            }
    
            void stop() {
                dispatch(acceptor_.get_executor(), [this] { // strand dispatch
                    acceptor_.cancel();
                    acceptor_.close();
                });
            }
    
          private:
            void do_accept() {
                std::cout << "Waiting for connections..." << std::endl;
    
                acceptor_.async_accept([this](beast::error_code ec, tcp::socket s) {
                    if (!ec)
                        session_manager_.create(std::move(s), logger_)->start();
                    else
                        std::cerr << "Error on accept: " << ec.message() << std::endl;
    
                    if (ec != asio::error::operation_aborted)
                        do_accept(); // can also be done before creating the session based on `s`
    
                    std::cout << "Active sessions: " << session_manager_.numberConnected() << std::endl;
                });
            };
    
            tcp::acceptor               acceptor_; // bound to strand
            std::shared_ptr<FileLogger> logger_;
            SessionManager              session_manager_;
        };
    } // namespace WebSocket
    
    int main() {
        // Create io_context and SSL context
        asio::io_context ioc;
        ssl::context     ctx(ssl::context::sslv23);
        ctx.use_certificate_file("server.pem", ssl::context::pem);
        ctx.set_password_callback([](std::size_t, ssl::context::password_purpose) { return "test"; });
        ctx.use_private_key_file("server.pem", ssl::context::pem);
        ctx.set_verify_mode(ssl::verify_none); // Disable SSL verification for testing
    
        // Define the server endpoint (IP and port)
        tcp::endpoint endpoint(tcp::v4(), 8080);
        auto logger = std::make_shared<FileLogger>(ioc, "server.log"); // Assuming you have a logger class
    
        // Create the WebSocket server
        auto router = [](WebSocket::Message const& msg, std::shared_ptr<WebSocket::Session>) -> bool {
            // Route the message based on the message type
            if (msg.type == "type1") {
                return true;
            } else if (msg.type == "type2") {
                return true;
            }
            // Unknown message type
            return false;
        };
    
        WebSocket::Server server(ioc, ctx, endpoint, router, logger);
        server.start();
    
        ioc.run();
    }