I can't figure out a problem. I am trying to connect to the exchange via WS and everything is working well, but there is a problem. The exchange has a timeout to work on the connection, after which the exchange resets the connection and I need to reconnect. I know this timeout and I don't want to wait for it, I want to reconnect myself in advance at a time convenient for me in order to get a more controlled communication channel.
For the test, I decided to reconnect, say, every 60 seconds, in order to always have an up-to-date token and a working channel.
however: after stable operation, when the timer is triggered to close the connection and reopen by updating the token, I get these errors and cannot establish a new connection:
[WSCO]: [04/22/2025 23:08:12] Reconnection timer triggered. Reconnecting...
[WSCO]: [04/22/2025 23:08:18] Read error: Operation canceled. Reconnecting...
[WSCO]: [04/22/2025 23:08:24] Exception during close: Operation canceled
[WSCO]: [04/22/2025 23:08:30] Read error: Operation canceled. Reconnecting...
[WSCO]: [04/22/2025 23:08:36] Read error: Operation canceled. Reconnecting...
[WSCO]: [04/22/2025 23:08:42] Read error: Operation canceled. Reconnecting...
[WSCO]: [04/22/2025 23:08:48] Read error: Operation canceled. Reconnecting...
I'm most likely doing something wrong with the WS levels in boost or SSL (perhaps something related to synchronous events), but I don't understand what exactly is wrong.
My goal: is to close the connection beautifully after: RECONNECTION_TIMER or if the network connection has been disconnected reconnect after RECONNECTION_TIMEOUT
What I have now: The channel works stably until the connection is called or the reconnection timer is triggered.
Can someone help me tell me what I'm doing wrong?
src/WebSocketClientOrder.h
#ifndef WEBSOCKET_CLIENT_ORDER_H
#define WEBSOCKET_CLIENT_ORDER_H
#include <string>
#include <queue>
#include <mutex>
#include <memory>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <nlohmann/json.hpp>
#include <boost/asio/steady_timer.hpp>
#define RECONNECTION_TIMEOUT 5 // number of seconds before reconnecting in case of disconnection
#define RECONNECTION_TIMER 60 // a timer in seconds at which the client himself breaks the connection and reconnects
// to update the timer on the exchange
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
class WebSocketClientOrder {
public:
WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context, const std::string& api_key, const std::string& api_secret);
void connect(const std::string& host, const std::string& port);
void place_order(const std::string &symbol_first, const std::string &symbol_second, const std::string &side_first,
const std::string &side_second, double qty);
private:
void do_read();
void reconnect();
void close();
void send_async(const std::string& message);
void process_next_message();
std::string generate_signature(long expires);
void authenticate();
void heartbeat();
asio::ip::tcp::resolver resolver_;
std::unique_ptr<websocket::stream<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>> ws_;
beast::flat_buffer buffer_;
asio::io_context& io_context_;
asio::ssl::context& ssl_context_;
std::string host_, port_, api_key_, api_secret_;
std::queue<std::string> message_queue_;
std::mutex message_queue_mutex_;
std::unique_ptr<asio::steady_timer> ping_timer_;
std::unique_ptr<asio::steady_timer> reconnection_timer_;
static const std::string DEFAULT_URI;
};
#endif // WEBSOCKET_CLIENT_ORDER_H
src/WebSocketClientOrder.cpp
#include <iostream>
#include <iomanip>
#include <sstream>
#include <queue>
#include <mutex>
#include <boost/asio/connect.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <openssl/hmac.h>
#include "WebSocketClientOrder.h"
const std::string WebSocketClientOrder::DEFAULT_URI = "/v5/trade";
WebSocketClientOrder::WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context, const std::string& api_key, const std::string& api_secret)
: resolver_(io_context),
ssl_context_(ssl_context),
io_context_(io_context),
api_key_(api_key),
api_secret_(api_secret) {}
void WebSocketClientOrder::connect(const std::string &host, const std::string &port) {
host_ = host;
port_ = port;
try {
ws_ = std::make_unique<websocket::stream<asio::ssl::stream<asio::ip::tcp::socket>>>(
io_context_, ssl_context_);
auto results = resolver_.resolve(host, port);
asio::connect(ws_->next_layer().lowest_layer(), results);
if (!SSL_set_tlsext_host_name(ws_->next_layer().native_handle(), host.c_str())) {
throw beast::system_error(
beast::error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()),
"[WSCO]: Failed to set SNI"
);
}
ws_->next_layer().handshake(asio::ssl::stream_base::client);
ws_->handshake(host + ":" + port, DEFAULT_URI);
ws_->control_callback([this](websocket::frame_type kind, boost::beast::string_view payload) {
if (kind == websocket::frame_type::ping) {
ws_->pong(payload.data());
}
});
std::cout << "[WSCO]: Connected and authenticated\n";
authenticate();
do_read();
heartbeat();
reconnection_timer_ = std::make_unique<asio::steady_timer>(io_context_, std::chrono::seconds(RECONNECTION_TIMER));
reconnection_timer_->async_wait([this](beast::error_code ec) {
if (!ec) {
std::cout << "[WSCO]: Reconnection timer triggered. Reconnecting...\n";
reconnect();
}
});
} catch (const std::exception &e) {
std::cerr << "[WSCO]: Connection failed: " << e.what() << ". Retrying...\n";
reconnect();
}
}
void WebSocketClientOrder::do_read() {
ws_->async_read(buffer_, [this](beast::error_code ec, std::size_t bytes_transferred) {
if (ec == websocket::error::closed) {
std::cerr << "[WSCO]: Websocket closed by server. Attempting reconnect...\n";
reconnect();
return;
}
if (ec) {
std::cerr << "[WSCO]: Read error: " << ec.message() << ". Reconnecting...\n";
reconnect();
return;
}
std::string response = beast::buffers_to_string(buffer_.data());
std::cout << "[WSCO]: Server Response: " << response << "\n";
buffer_.consume(bytes_transferred);
do_read();
});
}
void WebSocketClientOrder::reconnect() {
close();
std::this_thread::sleep_for(std::chrono::seconds(RECONNECTION_TIMEOUT));
connect(host_, port_);
}
void WebSocketClientOrder::close() {
try {
if (ws_ && ws_->is_open()) {
beast::error_code ec;
ws_->close(websocket::close_code::normal, ec);
if (ec && ec != beast::errc::not_connected) {
std::cerr << "[WSCO]: Close failed: " << ec.message() << "\n";
}
}
beast::error_code ec;
if (ws_) {
ws_->next_layer().shutdown(ec);
ws_->next_layer().lowest_layer().close(ec);
}
} catch (const std::exception &e) {
std::cerr << "[WSCO]: Exception during close: " << e.what() << "\n";
}
}
void WebSocketClientOrder::send_async(const std::string& message) {
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);
message_queue_.push(message);
}
if (message_queue_.size() == 1) {
process_next_message();
}
}
void WebSocketClientOrder::process_next_message() {
std::lock_guard<std::mutex> lock(message_queue_mutex_);
if (!message_queue_.empty()) {
std::string message = message_queue_.front();
ws_->async_write(asio::buffer(message),
[this, message](beast::error_code ec, std::size_t bytes_transferred) {
if (ec) {
std::cerr << "[WSCO]: Async send failed: " << ec.message() << "\n";
reconnect();
return;
}
std::cout << "[WSCO]: Async message sent: " << message << "\n";
{
std::lock_guard<std::mutex> lock(message_queue_mutex_);
message_queue_.pop();
}
process_next_message();
});
}
}
std::string WebSocketClientOrder::generate_signature(long expires) {
std::string data = "GET/realtime" + std::to_string(expires);
unsigned char hmac_result[EVP_MAX_MD_SIZE];
unsigned int len = 0;
HMAC(EVP_sha256(), api_secret_.c_str(), api_secret_.length(),
reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), hmac_result, &len);
std::ostringstream oss;
for (unsigned int i = 0; i < len; i++) {
oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(hmac_result[i]);
}
return oss.str();
}
void WebSocketClientOrder::authenticate() {
long expires = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1) + 5000;
std::string signature = generate_signature(expires);
nlohmann::json auth_message = {
{"op", "auth"},
{"args", {api_key_, std::to_string(expires), signature}}
};
send_async(auth_message.dump());
}
void WebSocketClientOrder::heartbeat() {
ping_timer_ = std::make_unique<asio::steady_timer>(io_context_, std::chrono::seconds(20));
ping_timer_->async_wait([this](beast::error_code ec) {
if (ec) return;
if (ws_ && ws_->is_open()) {
std::string heartbeat_message = R"({"op": "ping"})";
send_async(heartbeat_message);
}
heartbeat();
});
}
main.cpp
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include "src/WebSocketClientOrder.h"
int main() {
const std::string host = "stream.bybit.com";
const std::string api_key = "";
const std::string api_secret = "";
const std::string port = "443";
try {
asio::io_context io_context;
asio::ssl::context ssl_context(asio::ssl::context::tlsv12_client);
ssl_context.set_default_verify_paths();
WebSocketClientOrder client(io_context, ssl_context, api_key, api_secret);
client.connect(host, port);
// auto work_guard = asio::make_work_guard(io_context);
io_context.run();
} catch (const std::exception& e) {
std::cerr << "[MAIN]: Connection error: " << e.what() << std::endl;
}
return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.20)
project(FST)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
add_executable(FST
main.cpp
src/WebSocketClientOrder.cpp
src/WebSocketClientOrder.h
)
target_include_directories(FST PRIVATE src)
# Find Boost, OpenSSL, and simdjson
find_package(Boost REQUIRED COMPONENTS system)
find_package(OpenSSL REQUIRED)
find_package(simdjson REQUIRED)
# Include directories
include_directories(${Boost_INCLUDE_DIRS})
# Link libraries
target_link_libraries(FST PRIVATE Boost::boost Boost::system OpenSSL::SSL OpenSSL::Crypto simdjson::simdjson)
Makefile
# Makefile for building the project using CMake with maximum optimization
# Variables
BUILD_DIR := build
CMAKE_FLAGS := -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_FLAGS_RELEASE="-O3 -march=native -mtune=native"
# Default target
all: build
# Generate build system and build the project
build:
mkdir -p $(BUILD_DIR)
cmake -B $(BUILD_DIR) $(CMAKE_FLAGS) -S .
cmake --build $(BUILD_DIR) -- -j$(nproc)
# Clean build directory
clean:
rm -rf $(BUILD_DIR)
# Rebuild the project from scratch
rebuild: clean build
You're using many un-necessary pointers and re-using them before you free the first.
Next up, you're mixing blocking sleep (this_thread::sleep_for
) with async operations.
The combination of these makes it really hard to see what's going on. In essence, you will start the new connect()
call before the read
operation can be canceled. This means that by the time your read operation IS canceled, you think you were already running the new connection. This would cause you to force reconnect
again and again and again and...
Fixing it the "naive" way:
void WebSocketClientOrder::reconnect() {
if (auto tmp = std::exchange(ws_, nullptr)) {
TRACE("[WSCO]: Closing connection...");
close(std::move(tmp));
ping_timer_.reset();
reconnection_timer_.reset();
auto timer = std::make_shared<Timer>(io_context_, RECONNECTION_TIMEOUT);
timer->async_wait([this, timer](beast::error_code ec) {
if (!ec) {
TRACE("[WSCO]: Reconnecting to ", host_, ":", port_);
connect(host_, port_);
}
});
}
}
This works:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/core/demangle.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
#include <nlohmann/json.hpp>
#include <syncstream>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
namespace ssl = asio::ssl;
using namespace std::chrono_literals;
static constexpr auto RECONNECTION_TIMEOUT = 2s; // before reconnecting
static constexpr auto RECONNECTION_TIMER =
10s; // max session time before closing the connection and reconnecting
static auto s_start = std::chrono::steady_clock::now();
void inline do_trace(std::ostream& os, auto const&... msg) {
((std::osyncstream(os) //
<< "[" << std::setw(8) << (std::chrono::steady_clock::now() - s_start) / 1.s << "ms] ") //
<< ... << msg)
<< std::endl;
}
#define TRACE(...) do_trace(std::cout, "I: ", __FUNCTION__, ':', __LINE__, '\t', __VA_ARGS__)
#define ERRTRACE(...) do_trace(std::cerr, "E: ", __FUNCTION__, ':', __LINE__, '\t', __VA_ARGS__)
class WebSocketClientOrder {
public:
WebSocketClientOrder(asio::io_context& io_context, ssl::context& ssl_context, std::string const& api_key,
std::string const& api_secret);
void connect(std::string const& host, std::string const& port);
void place_order(std::string const& symbol_first, std::string const& symbol_second,
std::string const& side_first, std::string const& side_second, double qty);
private:
using Timer = asio::steady_timer;
using WS = websocket::stream<ssl::stream<boost::asio::ip::tcp::socket>>;
void reconnect();
void close(std::unique_ptr<WS> ws);
void send_async(std::string message);
void do_read_loop();
void do_write_loop(); // on strand
std::string generate_signature(long expires) const;
void authenticate();
void do_heartbeat();
asio::io_context& io_context_;
asio::ip::tcp::resolver resolver_;
std::unique_ptr<WS> ws_;
beast::flat_buffer buffer_;
ssl::context& ssl_context_;
std::string host_, port_, api_key_, api_secret_;
std::deque<std::string> message_queue_;
std::unique_ptr<Timer> ping_timer_;
std::unique_ptr<Timer> reconnection_timer_;
static std::string const DEFAULT_URI;
};
#include <boost/asio/connect.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <iomanip>
#include <iostream>
#include <openssl/hmac.h>
#include <sstream>
// #include "WebSocketClientOrder.h"
std::string const WebSocketClientOrder::DEFAULT_URI = "/v5/trade";
WebSocketClientOrder::WebSocketClientOrder(asio::io_context& io_context, ssl::context& ssl_context,
std::string const& api_key, std::string const& api_secret)
: io_context_(io_context)
, resolver_(io_context)
, ssl_context_(ssl_context)
, api_key_(api_key)
, api_secret_(api_secret) {}
void WebSocketClientOrder::connect(std::string const& host, std::string const& port) {
host_ = host;
port_ = port;
try {
ws_ = std::make_unique<WS>(make_strand(io_context_), ssl_context_);
auto results = resolver_.resolve(host, port);
asio::connect(ws_->next_layer().lowest_layer(), results);
if (!SSL_set_tlsext_host_name(ws_->next_layer().native_handle(), host.c_str())) {
throw beast::system_error(
beast::error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()),
"[WSCO]: Failed to set SNI");
}
ws_->next_layer().handshake(ssl::stream_base::client);
ws_->handshake(host + ":" + port, DEFAULT_URI);
ws_->control_callback([this](websocket::frame_type kind, boost::beast::string_view payload) {
if (kind == websocket::frame_type::ping) {
ws_->pong(payload.data());
}
});
TRACE("[WSCO]: Connected to ", host, ":", port);
authenticate();
TRACE("[WSCO]: Authenticated to ", host, ":", port);
do_read_loop();
do_heartbeat();
reconnection_timer_ = std::make_unique<Timer>(io_context_, RECONNECTION_TIMER);
reconnection_timer_->async_wait([this](beast::error_code ec) {
if (!ec) {
TRACE("[WSCO]: Reconnection timer triggered");
reconnect();
}
});
} catch (std::exception const& e) {
ERRTRACE("[WSCO]: Connection failed: ", e.what(), ". Retrying...");
reconnect();
}
}
void WebSocketClientOrder::do_read_loop() {
ws_->async_read(buffer_, [this](beast::error_code ec, size_t bytes_transferred) {
if (ec == websocket::error::closed) {
ERRTRACE("[WSCO]: Websocket closed by server");
reconnect();
return;
}
if (ec) {
ERRTRACE("[WSCO]: Read error: ", ec.message(), ".");
reconnect();
return;
}
std::string response = beast::buffers_to_string(buffer_.data());
TRACE("[WSCO]: Server Response: ", response);
buffer_.consume(bytes_transferred);
do_read_loop();
});
}
void WebSocketClientOrder::reconnect() {
if (auto tmp = std::exchange(ws_, nullptr)) {
TRACE("[WSCO]: Closing connection...");
close(std::move(tmp));
ping_timer_.reset();
reconnection_timer_.reset();
auto timer = std::make_shared<Timer>(io_context_, RECONNECTION_TIMEOUT);
timer->async_wait([this, timer](beast::error_code ec) {
if (!ec) {
TRACE("[WSCO]: Reconnecting to ", host_, ":", port_);
connect(host_, port_);
}
});
}
}
void WebSocketClientOrder::close(std::unique_ptr<WS> ws) {
try {
if (ws && ws->is_open()) {
beast::error_code ec;
ws->close(websocket::close_code::normal, ec);
if (ec && ec != beast::errc::not_connected) {
ERRTRACE("[WSCO]: Close failed: ", ec.message());
}
}
beast::error_code ec;
if (ws) {
ws->next_layer().shutdown(ec);
ws->next_layer().lowest_layer().close(ec);
}
} catch (std::exception const& e) {
ERRTRACE("[WSCO]: Exception during close: ", e.what());
}
}
void WebSocketClientOrder::send_async(std::string message) {
post(ws_->get_executor(), [this, message] mutable {
message_queue_.push_back(std::move(message));
if (message_queue_.size() == 1)
do_write_loop();
});
}
void WebSocketClientOrder::do_write_loop() {
if (message_queue_.empty())
return;
ws_->async_write( //
asio::buffer(message_queue_.front()), [this](beast::error_code ec, size_t /*bytes_transferred*/) {
if (ec) {
ERRTRACE("[WSCO]: Async send failed: ", ec.message());
reconnect();
return;
}
TRACE("[WSCO]: Async message sent: ", message_queue_.front());
message_queue_.pop_front();
do_write_loop();
});
}
std::string WebSocketClientOrder::generate_signature(long expires) const {
std::string data = "GET/realtime" + std::to_string(expires);
unsigned char hmac_result[EVP_MAX_MD_SIZE];
unsigned int len = 0;
HMAC(EVP_sha256(), api_secret_.c_str(), api_secret_.length(),
reinterpret_cast<unsigned char const*>(data.c_str()), data.length(), hmac_result, &len);
std::ostringstream oss;
for (unsigned int i = 0; i < len; i++) {
oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(hmac_result[i]);
}
return oss.str();
}
void WebSocketClientOrder::authenticate() {
long expires = (std::chrono::system_clock::now() + 5s).time_since_epoch() / 1ms;
std::string signature = generate_signature(expires);
nlohmann::json auth_message = {{"op", "auth"}, {"args", {api_key_, std::to_string(expires), signature}}};
send_async(auth_message.dump());
}
void WebSocketClientOrder::do_heartbeat() {
static constexpr auto interval = 6s; // 20s
if (!ping_timer_)
ping_timer_ = std::make_unique<Timer>(io_context_, interval);
ping_timer_->expires_after(interval);
ping_timer_->async_wait([this](beast::error_code ec) {
if (ec)
return;
if (ws_ && ws_->is_open()) {
std::string heartbeat_message = R"({"op": "ping"})";
send_async(heartbeat_message);
}
do_heartbeat();
});
}
#include <boost/asio.hpp>
#include <iostream>
// #include "src/WebSocketClientOrder.h"
int main() try {
std::string const host = "stream.bybit.com";
std::string const api_key = "nPlgUfp7VapVOrZi3e";
std::string const api_secret = "wL9k4of15mXmEY3DALcBUNQGagW3oq8IJXmg";
std::string const port = "443";
asio::io_context ioc;
ssl::context ctx(ssl::context::tlsv12_client);
ctx.set_default_verify_paths();
WebSocketClientOrder client(ioc, ctx, api_key, api_secret);
client.connect(host, port);
// auto work_guard = asio::make_work_guard(io_context);
ioc.run();
} catch (std::exception const& e) {
ERRTRACE("[MAIN]: Connection error: ", e.what());
}
Live demo:
Better ideas:
WebSocketClient
for the new connectionAlso make sure you use modern Asio examples (io_context::work
is deprecated) and stop using mutexes in favour of strands.
Oh, and I fixed the dangling buffer (your message
argument to async_write
was a local variable. OOPS). There's possibly more things I fixed inadvertently so give the changes a good, close, look!