c++websocketboost-asio

Boost Beast WebSocket + SSL: Operation cancelled error when attempting controlled timer reconnection?


I can't figure out a problem. I am trying to connect to the exchange via WS and everything is working well, but there is a problem. The exchange has a timeout to work on the connection, after which the exchange resets the connection and I need to reconnect. I know this timeout and I don't want to wait for it, I want to reconnect myself in advance at a time convenient for me in order to get a more controlled communication channel.

For the test, I decided to reconnect, say, every 60 seconds, in order to always have an up-to-date token and a working channel.

however: after stable operation, when the timer is triggered to close the connection and reopen by updating the token, I get these errors and cannot establish a new connection:

[WSCO]: [04/22/2025 23:08:12] Reconnection timer triggered. Reconnecting...
[WSCO]: [04/22/2025 23:08:18] Read error: Operation canceled. Reconnecting...
[WSCO]: [04/22/2025 23:08:24] Exception during close: Operation canceled
[WSCO]: [04/22/2025 23:08:30] Read error: Operation canceled. Reconnecting...
[WSCO]: [04/22/2025 23:08:36] Read error: Operation canceled. Reconnecting...
[WSCO]: [04/22/2025 23:08:42] Read error: Operation canceled. Reconnecting...
[WSCO]: [04/22/2025 23:08:48] Read error: Operation canceled. Reconnecting...

I'm most likely doing something wrong with the WS levels in boost or SSL (perhaps something related to synchronous events), but I don't understand what exactly is wrong.

My goal: is to close the connection beautifully after: RECONNECTION_TIMER or if the network connection has been disconnected reconnect after RECONNECTION_TIMEOUT

What I have now: The channel works stably until the connection is called or the reconnection timer is triggered.

Can someone help me tell me what I'm doing wrong?

src/WebSocketClientOrder.h

#ifndef WEBSOCKET_CLIENT_ORDER_H
#define WEBSOCKET_CLIENT_ORDER_H

#include <string>
#include <queue>
#include <mutex>
#include <memory>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <nlohmann/json.hpp>
#include <boost/asio/steady_timer.hpp>

#define RECONNECTION_TIMEOUT 5 // number of seconds before reconnecting in case of disconnection
#define RECONNECTION_TIMER 60 // a timer in seconds at which the client himself breaks the connection and reconnects
                               // to update the timer on the exchange

namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;

class WebSocketClientOrder {
public:
    WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context, const std::string& api_key, const std::string& api_secret);

    void connect(const std::string& host, const std::string& port);
    void place_order(const std::string &symbol_first, const std::string &symbol_second, const std::string &side_first,
                     const std::string &side_second, double qty);

private:
    void do_read();
    void reconnect();
    void close();
    void send_async(const std::string& message);
    void process_next_message();
    std::string generate_signature(long expires);
    void authenticate();

    void heartbeat();

    asio::ip::tcp::resolver resolver_;
    std::unique_ptr<websocket::stream<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>> ws_;
    beast::flat_buffer buffer_;
    asio::io_context& io_context_;
    asio::ssl::context& ssl_context_;
    std::string host_, port_, api_key_, api_secret_;

    std::queue<std::string> message_queue_;
    std::mutex message_queue_mutex_;
    std::unique_ptr<asio::steady_timer> ping_timer_;
    std::unique_ptr<asio::steady_timer> reconnection_timer_;

    static const std::string DEFAULT_URI;
};

#endif // WEBSOCKET_CLIENT_ORDER_H

src/WebSocketClientOrder.cpp

#include <iostream>
#include <iomanip>
#include <sstream>
#include <queue>
#include <mutex>
#include <boost/asio/connect.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <openssl/hmac.h>
#include "WebSocketClientOrder.h"

const std::string WebSocketClientOrder::DEFAULT_URI = "/v5/trade";

WebSocketClientOrder::WebSocketClientOrder(asio::io_context& io_context, asio::ssl::context& ssl_context, const std::string& api_key, const std::string& api_secret)
    : resolver_(io_context),
      ssl_context_(ssl_context),
      io_context_(io_context),
      api_key_(api_key),
      api_secret_(api_secret) {}

void WebSocketClientOrder::connect(const std::string &host, const std::string &port) {
    host_ = host;
    port_ = port;

    try {
        ws_ = std::make_unique<websocket::stream<asio::ssl::stream<asio::ip::tcp::socket>>>(
            io_context_, ssl_context_);

        auto results = resolver_.resolve(host, port);
        asio::connect(ws_->next_layer().lowest_layer(), results);

        if (!SSL_set_tlsext_host_name(ws_->next_layer().native_handle(), host.c_str())) {
            throw beast::system_error(
                beast::error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()),
                "[WSCO]: Failed to set SNI"
            );
        }

        ws_->next_layer().handshake(asio::ssl::stream_base::client);
        ws_->handshake(host + ":" + port, DEFAULT_URI);

        ws_->control_callback([this](websocket::frame_type kind, boost::beast::string_view payload) {
            if (kind == websocket::frame_type::ping) {
                ws_->pong(payload.data());
            }
        });

        std::cout << "[WSCO]: Connected and authenticated\n";
        authenticate();
        do_read();
        heartbeat();

        reconnection_timer_ = std::make_unique<asio::steady_timer>(io_context_, std::chrono::seconds(RECONNECTION_TIMER));
        reconnection_timer_->async_wait([this](beast::error_code ec) {
            if (!ec) {
                std::cout << "[WSCO]: Reconnection timer triggered. Reconnecting...\n";
                reconnect();
            }
        });

    } catch (const std::exception &e) {
        std::cerr << "[WSCO]: Connection failed: " << e.what() << ". Retrying...\n";
        reconnect();
    }
}

void WebSocketClientOrder::do_read() {
    ws_->async_read(buffer_, [this](beast::error_code ec, std::size_t bytes_transferred) {

        if (ec == websocket::error::closed) {
            std::cerr << "[WSCO]: Websocket closed by server. Attempting reconnect...\n";
            reconnect();
            return;
        }

        if (ec) {
            std::cerr << "[WSCO]: Read error: " << ec.message() << ". Reconnecting...\n";
            reconnect();
            return;
        }

        std::string response = beast::buffers_to_string(buffer_.data());
        std::cout << "[WSCO]: Server Response: " << response << "\n";
        buffer_.consume(bytes_transferred);

        do_read();
    });
}

void WebSocketClientOrder::reconnect() {
    close();
    std::this_thread::sleep_for(std::chrono::seconds(RECONNECTION_TIMEOUT));
    connect(host_, port_);
}

void WebSocketClientOrder::close() {
    try {
        if (ws_ && ws_->is_open()) {
            beast::error_code ec;
            ws_->close(websocket::close_code::normal, ec);
            if (ec && ec != beast::errc::not_connected) {
                std::cerr << "[WSCO]: Close failed: " << ec.message() << "\n";
            }
        }

        beast::error_code ec;
        if (ws_) {
            ws_->next_layer().shutdown(ec);
            ws_->next_layer().lowest_layer().close(ec);
        }
    } catch (const std::exception &e) {
        std::cerr << "[WSCO]: Exception during close: " << e.what() << "\n";
    }
}

void WebSocketClientOrder::send_async(const std::string& message) {
    {
        std::lock_guard<std::mutex> lock(message_queue_mutex_);
        message_queue_.push(message);
    }

    if (message_queue_.size() == 1) {
        process_next_message();
    }
}

void WebSocketClientOrder::process_next_message() {
    std::lock_guard<std::mutex> lock(message_queue_mutex_);

    if (!message_queue_.empty()) {
        std::string message = message_queue_.front();

        ws_->async_write(asio::buffer(message),
            [this, message](beast::error_code ec, std::size_t bytes_transferred) {
                if (ec) {
                    std::cerr << "[WSCO]: Async send failed: " << ec.message() << "\n";
                    reconnect();
                    return;
                }

                std::cout << "[WSCO]: Async message sent: " << message << "\n";

                {
                    std::lock_guard<std::mutex> lock(message_queue_mutex_);
                    message_queue_.pop();
                }

                process_next_message();
            });
    }
}

std::string WebSocketClientOrder::generate_signature(long expires) {
    std::string data = "GET/realtime" + std::to_string(expires);
    unsigned char hmac_result[EVP_MAX_MD_SIZE];
    unsigned int len = 0;
    HMAC(EVP_sha256(), api_secret_.c_str(), api_secret_.length(),
         reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), hmac_result, &len);
    std::ostringstream oss;
    for (unsigned int i = 0; i < len; i++) {
        oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(hmac_result[i]);
    }
    return oss.str();
}

void WebSocketClientOrder::authenticate() {
    long expires = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1) + 5000;
    std::string signature = generate_signature(expires);

    nlohmann::json auth_message = {
        {"op", "auth"},
        {"args", {api_key_, std::to_string(expires), signature}}
    };
    send_async(auth_message.dump());
}

void WebSocketClientOrder::heartbeat() {
    ping_timer_ = std::make_unique<asio::steady_timer>(io_context_, std::chrono::seconds(20));

    ping_timer_->async_wait([this](beast::error_code ec) {
        if (ec) return;

        if (ws_ && ws_->is_open()) {
            std::string heartbeat_message = R"({"op": "ping"})";
            send_async(heartbeat_message);
        }

        heartbeat();
    });
}

main.cpp

#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include "src/WebSocketClientOrder.h"

int main() {
    const std::string host = "stream.bybit.com";
    const std::string api_key = "";
    const std::string api_secret = "";
    const std::string port = "443";

    try {
        asio::io_context io_context;
        asio::ssl::context ssl_context(asio::ssl::context::tlsv12_client);
        ssl_context.set_default_verify_paths();

        WebSocketClientOrder client(io_context, ssl_context, api_key, api_secret);
        client.connect(host, port);

        // auto work_guard = asio::make_work_guard(io_context);
        io_context.run();
    } catch (const std::exception& e) {
        std::cerr << "[MAIN]: Connection error: " << e.what() << std::endl;
    }

    return 0;
}

CMakeLists.txt

cmake_minimum_required(VERSION 3.20)
project(FST)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)

add_executable(FST
        main.cpp
        src/WebSocketClientOrder.cpp
        src/WebSocketClientOrder.h
)

target_include_directories(FST PRIVATE src)

# Find Boost, OpenSSL, and simdjson
find_package(Boost REQUIRED COMPONENTS system)
find_package(OpenSSL REQUIRED)
find_package(simdjson REQUIRED)

# Include directories
include_directories(${Boost_INCLUDE_DIRS})

# Link libraries
target_link_libraries(FST PRIVATE Boost::boost Boost::system OpenSSL::SSL OpenSSL::Crypto simdjson::simdjson)

Makefile

# Makefile for building the project using CMake with maximum optimization

# Variables
BUILD_DIR := build
CMAKE_FLAGS := -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_FLAGS_RELEASE="-O3 -march=native -mtune=native"

# Default target
all: build

# Generate build system and build the project
build:
    mkdir -p $(BUILD_DIR)
    cmake -B $(BUILD_DIR) $(CMAKE_FLAGS) -S .
    cmake --build $(BUILD_DIR) -- -j$(nproc)

# Clean build directory
clean:
    rm -rf $(BUILD_DIR)

# Rebuild the project from scratch
rebuild: clean build

Solution

  • You're using many un-necessary pointers and re-using them before you free the first.

    Next up, you're mixing blocking sleep (this_thread::sleep_for) with async operations.

    The combination of these makes it really hard to see what's going on. In essence, you will start the new connect() call before the read operation can be canceled. This means that by the time your read operation IS canceled, you think you were already running the new connection. This would cause you to force reconnect again and again and again and...

    Fixing it the "naive" way:

    void WebSocketClientOrder::reconnect() {
        if (auto tmp = std::exchange(ws_, nullptr)) {
            TRACE("[WSCO]: Closing connection...");
            close(std::move(tmp));
    
            ping_timer_.reset();
            reconnection_timer_.reset();
    
            auto timer = std::make_shared<Timer>(io_context_, RECONNECTION_TIMEOUT);
            timer->async_wait([this, timer](beast::error_code ec) {
                if (!ec) {
                    TRACE("[WSCO]: Reconnecting to ", host_, ":", port_);
                    connect(host_, port_);
                }
            });
        }
    }
    

    This works:

    #include <boost/asio.hpp>
    #include <boost/asio/ssl.hpp>
    #include <boost/beast.hpp>
    #include <boost/core/demangle.hpp>
    #include <deque>
    #include <iomanip>
    #include <iostream>
    #include <nlohmann/json.hpp>
    #include <syncstream>
    
    namespace asio      = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    namespace ssl       = asio::ssl;
    using namespace std::chrono_literals;
    static constexpr auto RECONNECTION_TIMEOUT = 2s; // before reconnecting
    static constexpr auto RECONNECTION_TIMER =
        10s; // max session time before closing the connection and reconnecting
    
    static auto s_start = std::chrono::steady_clock::now();
    void inline do_trace(std::ostream& os, auto const&... msg) {
        ((std::osyncstream(os)                                                                    //
          << "[" << std::setw(8) << (std::chrono::steady_clock::now() - s_start) / 1.s << "ms] ") //
         << ... << msg)
            << std::endl;
    }
    
    #define TRACE(...) do_trace(std::cout, "I: ", __FUNCTION__, ':', __LINE__, '\t', __VA_ARGS__)
    #define ERRTRACE(...) do_trace(std::cerr, "E: ", __FUNCTION__, ':', __LINE__, '\t', __VA_ARGS__)
    
    class WebSocketClientOrder {
      public:
        WebSocketClientOrder(asio::io_context& io_context, ssl::context& ssl_context, std::string const& api_key,
                             std::string const& api_secret);
    
        void connect(std::string const& host, std::string const& port);
        void place_order(std::string const& symbol_first, std::string const& symbol_second,
                         std::string const& side_first, std::string const& side_second, double qty);
    
      private:
        using Timer = asio::steady_timer;
        using WS    = websocket::stream<ssl::stream<boost::asio::ip::tcp::socket>>;
    
        void        reconnect();
        void        close(std::unique_ptr<WS> ws);
        void        send_async(std::string message);
        void        do_read_loop();
        void        do_write_loop(); // on strand
        std::string generate_signature(long expires) const;
        void        authenticate();
        void        do_heartbeat();
    
        asio::io_context&       io_context_;
        asio::ip::tcp::resolver resolver_;
        std::unique_ptr<WS>     ws_;
        beast::flat_buffer      buffer_;
        ssl::context&           ssl_context_;
        std::string             host_, port_, api_key_, api_secret_;
    
        std::deque<std::string> message_queue_;
        std::unique_ptr<Timer>  ping_timer_;
        std::unique_ptr<Timer>  reconnection_timer_;
    
        static std::string const DEFAULT_URI;
    };
    
    #include <boost/asio/connect.hpp>
    #include <boost/beast/core.hpp>
    #include <boost/beast/websocket.hpp>
    #include <boost/beast/websocket/ssl.hpp>
    #include <iomanip>
    #include <iostream>
    #include <openssl/hmac.h>
    #include <sstream>
    // #include "WebSocketClientOrder.h"
    
    std::string const WebSocketClientOrder::DEFAULT_URI = "/v5/trade";
    
    WebSocketClientOrder::WebSocketClientOrder(asio::io_context& io_context, ssl::context& ssl_context,
                                               std::string const& api_key, std::string const& api_secret)
        : io_context_(io_context)
        , resolver_(io_context)
        , ssl_context_(ssl_context)
        , api_key_(api_key)
        , api_secret_(api_secret) {}
    
    void WebSocketClientOrder::connect(std::string const& host, std::string const& port) {
        host_ = host;
        port_ = port;
    
        try {
            ws_ = std::make_unique<WS>(make_strand(io_context_), ssl_context_);
    
            auto results = resolver_.resolve(host, port);
            asio::connect(ws_->next_layer().lowest_layer(), results);
    
            if (!SSL_set_tlsext_host_name(ws_->next_layer().native_handle(), host.c_str())) {
                throw beast::system_error(
                    beast::error_code(static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()),
                    "[WSCO]: Failed to set SNI");
            }
    
            ws_->next_layer().handshake(ssl::stream_base::client);
            ws_->handshake(host + ":" + port, DEFAULT_URI);
    
            ws_->control_callback([this](websocket::frame_type kind, boost::beast::string_view payload) {
                if (kind == websocket::frame_type::ping) {
                    ws_->pong(payload.data());
                }
            });
    
            TRACE("[WSCO]: Connected to ", host, ":", port);
            authenticate();
            TRACE("[WSCO]: Authenticated to ", host, ":", port);
            do_read_loop();
            do_heartbeat();
    
            reconnection_timer_ = std::make_unique<Timer>(io_context_, RECONNECTION_TIMER);
            reconnection_timer_->async_wait([this](beast::error_code ec) {
                if (!ec) {
                    TRACE("[WSCO]: Reconnection timer triggered");
                    reconnect();
                }
            });
    
        } catch (std::exception const& e) {
            ERRTRACE("[WSCO]: Connection failed: ", e.what(), ". Retrying...");
            reconnect();
        }
    }
    
    void WebSocketClientOrder::do_read_loop() {
        ws_->async_read(buffer_, [this](beast::error_code ec, size_t bytes_transferred) {
            if (ec == websocket::error::closed) {
                ERRTRACE("[WSCO]: Websocket closed by server");
                reconnect();
                return;
            }
    
            if (ec) {
                ERRTRACE("[WSCO]: Read error: ", ec.message(), ".");
                reconnect();
                return;
            }
    
            std::string response = beast::buffers_to_string(buffer_.data());
            TRACE("[WSCO]: Server Response: ", response);
            buffer_.consume(bytes_transferred);
    
            do_read_loop();
        });
    }
    
    void WebSocketClientOrder::reconnect() {
        if (auto tmp = std::exchange(ws_, nullptr)) {
            TRACE("[WSCO]: Closing connection...");
            close(std::move(tmp));
    
            ping_timer_.reset();
            reconnection_timer_.reset();
    
            auto timer = std::make_shared<Timer>(io_context_, RECONNECTION_TIMEOUT);
            timer->async_wait([this, timer](beast::error_code ec) {
                if (!ec) {
                    TRACE("[WSCO]: Reconnecting to ", host_, ":", port_);
                    connect(host_, port_);
                }
            });
        }
    }
    
    void WebSocketClientOrder::close(std::unique_ptr<WS> ws) {
        try {
            if (ws && ws->is_open()) {
                beast::error_code ec;
                ws->close(websocket::close_code::normal, ec);
                if (ec && ec != beast::errc::not_connected) {
                    ERRTRACE("[WSCO]: Close failed: ", ec.message());
                }
            }
    
            beast::error_code ec;
            if (ws) {
                ws->next_layer().shutdown(ec);
                ws->next_layer().lowest_layer().close(ec);
            }
        } catch (std::exception const& e) {
            ERRTRACE("[WSCO]: Exception during close: ", e.what());
        }
    }
    
    void WebSocketClientOrder::send_async(std::string message) {
        post(ws_->get_executor(), [this, message] mutable {
            message_queue_.push_back(std::move(message));
            if (message_queue_.size() == 1)
                do_write_loop();
        });
    }
    
    void WebSocketClientOrder::do_write_loop() {
        if (message_queue_.empty())
            return;
    
        ws_->async_write( //
            asio::buffer(message_queue_.front()), [this](beast::error_code ec, size_t /*bytes_transferred*/) {
                if (ec) {
                    ERRTRACE("[WSCO]: Async send failed: ", ec.message());
                    reconnect();
                    return;
                }
    
                TRACE("[WSCO]: Async message sent: ", message_queue_.front());
    
                message_queue_.pop_front();
    
                do_write_loop();
            });
    }
    
    std::string WebSocketClientOrder::generate_signature(long expires) const {
        std::string   data = "GET/realtime" + std::to_string(expires);
        unsigned char hmac_result[EVP_MAX_MD_SIZE];
        unsigned int  len = 0;
        HMAC(EVP_sha256(), api_secret_.c_str(), api_secret_.length(),
             reinterpret_cast<unsigned char const*>(data.c_str()), data.length(), hmac_result, &len);
        std::ostringstream oss;
        for (unsigned int i = 0; i < len; i++) {
            oss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(hmac_result[i]);
        }
        return oss.str();
    }
    
    void WebSocketClientOrder::authenticate() {
        long        expires   = (std::chrono::system_clock::now() + 5s).time_since_epoch() / 1ms;
        std::string signature = generate_signature(expires);
    
        nlohmann::json auth_message = {{"op", "auth"}, {"args", {api_key_, std::to_string(expires), signature}}};
        send_async(auth_message.dump());
    }
    
    void WebSocketClientOrder::do_heartbeat() {
        static constexpr auto interval = 6s; // 20s
        if (!ping_timer_)
            ping_timer_ = std::make_unique<Timer>(io_context_, interval);
        ping_timer_->expires_after(interval);
    
        ping_timer_->async_wait([this](beast::error_code ec) {
            if (ec)
                return;
    
            if (ws_ && ws_->is_open()) {
                std::string heartbeat_message = R"({"op": "ping"})";
                send_async(heartbeat_message);
            }
    
            do_heartbeat();
        });
    }
    
    #include <boost/asio.hpp>
    #include <iostream>
    // #include "src/WebSocketClientOrder.h"
    
    int main() try {
        std::string const host       = "stream.bybit.com";
        std::string const api_key    = "nPlgUfp7VapVOrZi3e";
        std::string const api_secret = "wL9k4of15mXmEY3DALcBUNQGagW3oq8IJXmg";
        std::string const port       = "443";
    
        asio::io_context ioc;
        ssl::context     ctx(ssl::context::tlsv12_client);
        ctx.set_default_verify_paths();
    
        WebSocketClientOrder client(ioc, ctx, api_key, api_secret);
        client.connect(host, port);
    
        // auto work_guard = asio::make_work_guard(io_context);
        ioc.run();
    } catch (std::exception const& e) {
        ERRTRACE("[MAIN]: Connection error: ", e.what());
    }
    

    Live demo:

    Better Idea

    Better ideas:

    Also make sure you use modern Asio examples (io_context::work is deprecated) and stop using mutexes in favour of strands.

    Oh, and I fixed the dangling buffer (your message argument to async_write was a local variable. OOPS). There's possibly more things I fixed inadvertently so give the changes a good, close, look!