I'm building a WebSocket server using Boost.Asio and Boost.Beast with SSL. The server accepts connections, but I'm encountering an issue where the client disconnects shortly after connecting. The error message I get is:
Read error: The I/O operation has been aborted because of either a thread exit or an application request (Code: 995)
Here’s a breakdown of my setup:
async_accept
to accept new client connections, where each connection is wrapped in an SslWebSocket
instance.ConnectionManager
keeps track of connected clients.async_read
to read incoming messages and route them appropriately.The server logs the following messages in sequence:
Code Sample:
Here’s an excerpt from the do_accept
, start_read
, and main
methods:
int main(int argc, char** argv) {
// Create io_context and SSL context
boost::asio::io_context io_context;
asio::ssl::context ssl_ctx(asio::ssl::context::sslv23);
ssl_ctx.set_verify_mode(asio::ssl::verify_none); // Disable SSL verification for testing
// Define the server endpoint (IP and port)
tcp::endpoint endpoint(tcp::v4(), 8080);
auto logger = std::make_shared<FileLogger>(io_context, "server.log"); // Assuming you have a logger class
// Create the WebSocket server
auto server = std::make_shared<WebSocketServer>(io_context, ssl_ctx, endpoint, logger);
// Start the server in a separate thread
std::thread server_thread([server]() {
server->start(); // Assuming you have a start method in WebSocketServer
});
// Run the io_context in the main thread
io_context.run();
// Optionally join the server thread if you want to wait for it
server_thread.join();
return 0; // Exit the program when the server is stopped (if ever)
}
void WebSocketServer::start() {
std::cout << "Starting WebSocket server..." << std::endl;
try {
// Start listening for incoming connections
acceptor_.listen();
do_accept(); // Begin accepting connections
}
catch (const std::exception& e) {
std::cerr << "Error starting server: " << e.what() << std::endl;
}
}
void WebSocketServer::do_accept() {
std::cout << "Waiting for connections..." << std::endl;
acceptor_.listen();
// Use a shared pointer to manage the resolver's lifetime
auto resolver = std::make_shared<tcp::resolver>(acceptor_.get_executor());
acceptor_.async_accept(asio::bind_executor(strand_, [this, resolver](beast::error_code ec, tcp::socket socket) {
if (!ec) {
auto ws = std::make_shared<SslWebSocket>(std::move(socket), ssl_ctx_);
std::cout << "Connection accepted." << std::endl;
// Attempting to add client immediately
std::cout << "Attempting to add client..." << std::endl;
// Add the connection to the manager
connection_manager_->add_connection(ws);
// Use a separate post to ensure thread safety
asio::post(strand_, [this, ws]() {
// Check if the connection was added successfully
if (connection_manager_->is_connected(ws)) {
std::cout << "Client added successfully." << std::endl;
// Start the ping timer
asio::steady_timer ping_timer(ws->get_executor(), std::chrono::seconds(30));
start_ping(ws, ping_timer, std::chrono::seconds(30));
// Start reading messages from the client
start_read(ws); // Start reading messages from the client
} else {
std::cout << "Failed to add client." << std::endl;
// Close the WebSocket if it was not added
ws->async_close(beast::websocket::close_code::normal, [](beast::error_code ec) {
if (ec) {
std::cerr << "Error closing WebSocket: " << ec.message() << std::endl;
}
});
}
});
} else {
std::cerr << "Error on accept: " << ec.message() << std::endl;
}
// Continue accepting new connections
do_accept(); // This should be called after handling the current connection
}));
}
void WebSocketServer::start_read(std::shared_ptr<SslWebSocket> ws) {
auto buffer = std::make_shared<beast::flat_buffer>(); // Use a unique buffer for this read
ws->async_read(*buffer, [this, ws, buffer](beast::error_code ec, std::size_t bytes_transferred) {
if (ec) {
std::cout << "Read error: " << ec.message() << " (Code: " << ec.value() << ")" << std::endl;
connection_manager_->remove_connection(ws);
return;
}
// Process the received message
std::string received_message(beast::buffers_to_string(buffer->data()));
buffer->consume(bytes_transferred); // Clear the buffer after reading
// Here, you can handle the received message (e.g., route it)
Message msg = Message::fromJson(received_message); // Assuming the message is a JSON string
route_message(msg, ws); // Route the message to the appropriate handler
// Continue reading if the connection is still valid
if (connection_manager_->is_connected(ws)) {
start_read(ws); // Continue reading if connection is still valid
}
else {
std::cout << "Client has disconnected. Stopping reads." << std::endl;
}
});
}
Questions:
995
error code in a Boost.Beast WebSocket server?ConnectionManager
successfully adds and tracks each client.I made it self-contained from the first question code. Adding a Tracer
shows where the websocket is destructed:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <iostream>
namespace asio = boost::asio;
namespace beast = boost::beast;
using asio::ip::tcp;
template <typename T>
struct Tracing : T {
using T::T;
using T::operator=;
~Tracing() { std::cout << __PRETTY_FUNCTION__ << std::endl; }
};
using SslWebSocket = Tracing<boost::beast::websocket::stream<boost::beast::ssl_stream<tcp::socket>>>;
struct Message {
std::string type;
std::string data;
static Message fromJson(std::string const& /*json*/) {
// Parse the JSON string and return a Message object
return Message{"type", "data"};
}
};
struct ConnectionManager {
void add_connection(std::shared_ptr<SslWebSocket>) {} // Add the connection to the manager
void remove_connection(std::shared_ptr<SslWebSocket>) {} // Remove the connection from the manager
bool is_connected(std::shared_ptr<SslWebSocket>) { return true; } // Check if the connection is still valid
};
struct FileLogger {
FileLogger(asio::io_context&, std::string const&) {} // Open the log file
void log(std::string const& message);
};
class WebSocketServer {
public:
WebSocketServer(asio::io_context& io_context, asio::ssl::context& ssl_ctx, tcp::endpoint const& endpoint,
std::shared_ptr<FileLogger> logger)
: io_context_(io_context)
, ssl_ctx_(ssl_ctx)
, acceptor_(io_context, endpoint)
, strand_(make_strand(io_context))
, logger_(logger) //
{
connection_manager_ = std::make_shared<ConnectionManager>();
}
void start() { do_accept(); } // Start accepting connections
void stop() { acceptor_.close(); } // Stop accepting new connections
private:
void do_accept();
void start_read(std::shared_ptr<SslWebSocket> ws);
void start_ping(std::shared_ptr<SslWebSocket> ws, asio::steady_timer& timer,
std::chrono::seconds interval);
void route_message(Message const& msg, std::shared_ptr<SslWebSocket> ws);
asio::io_context& io_context_;
asio::ssl::context& ssl_ctx_;
tcp::acceptor acceptor_;
asio::strand<asio::io_context::executor_type> strand_;
std::shared_ptr<FileLogger> logger_;
std::shared_ptr<ConnectionManager> connection_manager_;
};
void WebSocketServer::do_accept() {
std::cout << "Waiting for connections..." << std::endl;
acceptor_.listen();
// Use a shared pointer to manage the resolver's lifetime
auto resolver = std::make_shared<tcp::resolver>(acceptor_.get_executor());
acceptor_.async_accept(
asio::bind_executor(strand_, [this, resolver](beast::error_code ec, tcp::socket socket) {
if (!ec) {
auto ws = std::make_shared<SslWebSocket>(std::move(socket), ssl_ctx_);
std::cout << "Connection accepted." << std::endl;
// Attempting to add client immediately
std::cout << "Attempting to add client..." << std::endl;
// Add the connection to the manager
connection_manager_->add_connection(ws);
// Use a separate post to ensure thread safety
asio::post(strand_, [this, ws]() {
// Check if the connection was added successfully
if (connection_manager_->is_connected(ws)) {
std::cout << "Client added successfully." << std::endl;
// Start the ping timer
asio::steady_timer ping_timer(ws->get_executor(), std::chrono::seconds(30));
start_ping(ws, ping_timer, std::chrono::seconds(30));
// Start reading messages from the client
start_read(ws); // Start reading messages from the client
} else {
std::cout << "Failed to add client." << std::endl;
// Close the WebSocket if it was not added
ws->async_close(beast::websocket::close_code::normal, [](beast::error_code ec) {
if (ec) {
std::cerr << "Error closing WebSocket: " << ec.message() << std::endl;
}
});
}
});
} else {
std::cerr << "Error on accept: " << ec.message() << std::endl;
}
// Continue accepting new connections
do_accept(); // This should be called after handling the current connection
}));
}
void WebSocketServer::start_read(std::shared_ptr<SslWebSocket> ws) {
auto buffer = std::make_shared<beast::flat_buffer>(); // Use a unique buffer for this read
ws->async_read(*buffer, [this, ws, buffer](beast::error_code ec, std::size_t bytes_transferred) {
if (ec) {
std::cout << "Read error: " << ec.message() << " (Code: " << ec.value() << ")" << std::endl;
connection_manager_->remove_connection(ws);
return;
}
// Process the received message
std::string received_message(beast::buffers_to_string(buffer->data()));
buffer->consume(bytes_transferred); // Clear the buffer after reading
// Here, you can handle the received message (e.g., route it)
Message msg = Message::fromJson(received_message); // Assuming the message is a JSON string
route_message(msg, ws); // Route the message to the appropriate handler
// Continue reading if the connection is still valid
if (connection_manager_->is_connected(ws)) {
start_read(ws); // Continue reading if connection is still valid
} else {
std::cout << "Client has disconnected. Stopping reads." << std::endl;
}
});
}
int main() {
// Create io_context and SSL context
boost::asio::io_context io_context;
asio::ssl::context ssl_ctx(asio::ssl::context::sslv23);
ssl_ctx.set_verify_mode(asio::ssl::verify_none); // Disable SSL verification for testing
// Define the server endpoint (IP and port)
tcp::endpoint endpoint(tcp::v4(), 8080);
auto logger = std::make_shared<FileLogger>(io_context, "server.log"); // Assuming you have a logger class
// Create the WebSocket server
auto server = std::make_shared<WebSocketServer>(io_context, ssl_ctx, endpoint, logger);
// Start the server in a separate thread
std::thread server_thread([server]() {
server->start(); // Assuming you have a start method in WebSocketServer
std::cout << "server_thread already exited" << std::endl;
});
// Run the io_context in the main thread
io_context.run();
// Optionally join the server thread if you want to wait for it
server_thread.join();
}
// more stubs:
void WebSocketServer::start_ping(std::shared_ptr<SslWebSocket> ws, asio::steady_timer& timer,
std::chrono::seconds interval) {
timer.async_wait([this, ws, &timer, interval](beast::error_code ec) {
if (!ec) {
ws->async_ping({}, [this, ws, &timer, interval](beast::error_code ec) {
if (ec) {
std::cerr << "Ping failed: " << ec.message() << std::endl;
connection_manager_->remove_connection(ws);
return;
}
// Restart the timer for the next ping
start_ping(ws, timer, interval);
});
} else {
std::cerr << "Ping timer error: " << ec.message() << std::endl;
connection_manager_->remove_connection(ws);
}
});
}
void WebSocketServer::route_message(Message const& msg, std::shared_ptr<SslWebSocket>) {
// Route the message based on the message type
if (msg.type == "type1") {
// Handle type1 message
} else if (msg.type == "type2") {
// Handle type2 message
} else {
// Unknown message type
}
}
// FileLogger stubs
void FileLogger::log(std::string const& message) { std::cout << "Logged message: " << message << std::endl; }
Output of a sample run:
Waiting for connections...
server_thread already exited
Connection accepted.
Attempting to add client...
Waiting for connections...
Client added successfully.
Read error: Operation canceled (Code: 125)
Ping timer error: Operation canceled
Waiting for connections...
server_thread already exited
Connection accepted.
Attempting to add client...
Waiting for connections...
Client added successfully.
Read error: Operation canceled (Code: 125)
Ping timer error: Operation canceled
Tracing<boost::beast::websocket::stream<boost::beast::ssl_stream<boost::asio::basic_stream_socket<boost::asio::ip::
tcp>>>>::~Tracing() [T = boost::beast::websocket::stream<boost::beast::ssl_stream<boost::asio::basic_stream_socket<
boost::asio::ip::tcp>>>]
The debugger will help you see that the problem is that ping_timer
is local variable, and its destruction immediate cancels the timer. The causes the timer.async_wait
completion to remove the connection.
There's a boatload of other problems.
std::enable_shared_from_this
) instead of shuffling around many shared-pointer argumentsstd::weak_ptr
to hold on to the session. Then ConnectionManager::is_connected
can use weak_ptr::expired()
to check whether the connection is still validConnectionManager
is simply owned by WebSocketServer
and not passed in as a dependency, the shared/dynamic allocation adds no value.I might post a fixed up version if I don't have to spend my time elsewhere soon.
Refactoring to the above description leads to: https://coliru.stacked-crooked.com/a/aa8931128b1afddd
However it revealed another GLARING error: all of the SSL handshake is missing, and so is the Websocket handshake. So adding that too:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <iostream>
#include <list>
namespace asio = boost::asio;
namespace ssl = asio::ssl;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using asio::ip::tcp;
struct FileLogger {
FileLogger(asio::io_context&, std::string const&) {} // Open the log file
void log(std::string const& message) { std::cout << "Logged message: " << message << std::endl; }
};
namespace WebSocket {
struct Message {
std::string type, data;
static Message fromJson(std::string const& json) {
// Parse the JSON string and return a Message object
std::cout << "Received message: " << quoted(json) << std::endl;
return Message{"type1", "data"};
}
};
struct Session;
using Router = std::function<bool(Message const& msg, std::shared_ptr<Session> ws)>;
struct Session : std::enable_shared_from_this<Session> {
Session(tcp::socket s, std::shared_ptr<FileLogger> logger, ssl::context& ctx, Router router)
: stream_(std::move(s), ctx), logger_(std::move(logger)), router_(std::move(router)) {}
void start() {
std::cout << "Session accepted (" << peer_ << ")" << std::endl;
do_ssl_handshake(); // no dispatch needed, as session creator will start this
}
~Session() { std::cout << "Session closed (" << peer_ << ")" << std::endl; }
private:
using Stream = websocket::stream<boost::beast::ssl_stream<tcp::socket>>;
using close_code = websocket::close_code;
Stream stream_;
tcp::endpoint peer_{beast::get_lowest_layer(stream_).remote_endpoint()};
std::shared_ptr<FileLogger> logger_;
asio::steady_timer ping_timer{stream_.get_executor()};
Router router_;
beast::flat_buffer buffer; // Use a unique buffer for this read
void do_ssl_handshake() {
stream_.next_layer().async_handshake( //
Stream::next_layer_type::handshake_type::server,
[this, self = shared_from_this()](beast::error_code ec) {
if (!ec)
do_ws_handshake();
else
std::cerr << "Handshake failed: " << ec.message() << std::endl;
});
}
void do_ws_handshake() {
stream_.async_accept([this, self = shared_from_this()](beast::error_code ec) {
if (!ec) {
std::cout << "WebSocket handshake successful" << std::endl;
timer_loop(std::chrono::seconds(30));
read_loop();
} else {
std::cerr << "WebSocket handshake failed: " << ec.message() << std::endl;
}
});
}
// all private member functions are on the strand of the stream
void timer_loop(std::chrono::steady_clock::duration interval) {
ping_timer.expires_after(interval);
ping_timer.async_wait([=, this, self = shared_from_this()](beast::error_code ec) {
if (!ec) {
stream_.async_ping({}, [=, this](beast::error_code ec) {
if (ec) {
std::cerr << "Ping failed: " << ec.message() << std::endl;
stream_.async_close(close_code::going_away, asio::detached);
} else {
timer_loop(interval); // Restart the timer for the next ping
}
});
} else {
std::cerr << "Ping timer: " << ec.message() << std::endl;
if (ec != asio::error::operation_aborted)
stream_.async_close(close_code::abnormal, asio::detached);
}
});
}
void read_loop() {
stream_.async_read( //
buffer, [this, self = shared_from_this()](beast::error_code const ec, size_t xfr) {
if (!ec.failed() && router_) {
// Process the received message
std::string received_message(beast::buffers_to_string(buffer.data()).substr(0, xfr));
buffer.consume(xfr); // Clear the buffer after reading
Message msg = Message::fromJson(received_message);
if (router_(msg, self))
return read_loop(); // early exit
}
if (ec.failed()) {
std::cerr << "IO error " << ec.message() << std::endl;
stream_.async_close(close_code::protocol_error, asio::detached);
} else if (!router_) {
std::cerr << "No router set" << std::endl;
stream_.async_close(close_code::policy_error, asio::detached);
} else {
std::cerr << "Closing" << std::endl;
stream_.async_close(close_code::normal, asio::detached);
}
ping_timer.cancel(); // also cancel the ping timer
});
}
};
struct SessionManager {
SessionManager(ssl::context& ctx, Router router) : ssl_ctx_(ctx), router_(std::move(router)) {}
std::shared_ptr<Session> create(tcp::socket s, std::shared_ptr<FileLogger> logger) {
garbage_collect();
auto sess = std::make_shared<Session>(std::move(s), logger, ssl_ctx_, router_);
sessions_.push_back(sess);
return sess;
}
size_t numberConnected() const { return sessions_.size(); }
private:
ssl::context& ssl_ctx_;
Router router_;
void garbage_collect() {
sessions_.remove_if([](Handle const& h) { return h.expired(); });
}
using Handle = std::weak_ptr<Session>;
std::list<Handle> sessions_;
};
class Server {
public:
Server(asio::io_context& io_context, ssl::context& ssl_ctx, tcp::endpoint const& endpoint,
Router router, std::shared_ptr<FileLogger> logger)
: acceptor_(make_strand(io_context), endpoint)
, logger_(logger)
, session_manager_(ssl_ctx, std::move(router)) {}
void start() {
/* assuming start is called from the single thread that created the
* server, no need for strand dispatch
*/
acceptor_.listen();
do_accept();
}
void stop() {
dispatch(acceptor_.get_executor(), [this] { // strand dispatch
acceptor_.cancel();
acceptor_.close();
});
}
private:
void do_accept() {
std::cout << "Waiting for connections..." << std::endl;
acceptor_.async_accept([this](beast::error_code ec, tcp::socket s) {
if (!ec)
session_manager_.create(std::move(s), logger_)->start();
else
std::cerr << "Error on accept: " << ec.message() << std::endl;
if (ec != asio::error::operation_aborted)
do_accept(); // can also be done before creating the session based on `s`
std::cout << "Active sessions: " << session_manager_.numberConnected() << std::endl;
});
};
tcp::acceptor acceptor_; // bound to strand
std::shared_ptr<FileLogger> logger_;
SessionManager session_manager_;
};
} // namespace WebSocket
int main() {
// Create io_context and SSL context
asio::io_context ioc;
ssl::context ctx(ssl::context::sslv23);
ctx.use_certificate_file("server.pem", ssl::context::pem);
ctx.set_password_callback([](std::size_t, ssl::context::password_purpose) { return "test"; });
ctx.use_private_key_file("server.pem", ssl::context::pem);
ctx.set_verify_mode(ssl::verify_none); // Disable SSL verification for testing
// Define the server endpoint (IP and port)
tcp::endpoint endpoint(tcp::v4(), 8080);
auto logger = std::make_shared<FileLogger>(ioc, "server.log"); // Assuming you have a logger class
// Create the WebSocket server
auto router = [](WebSocket::Message const& msg, std::shared_ptr<WebSocket::Session>) -> bool {
// Route the message based on the message type
if (msg.type == "type1") {
return true;
} else if (msg.type == "type2") {
return true;
}
// Unknown message type
return false;
};
WebSocket::Server server(ioc, ctx, endpoint, router, logger);
server.start();
ioc.run();
}