As a freshmen to binance-websocket api and std::future using. I write a program to test the time difference between localhost and the binance server, and to get the net delay.
I push my code to my github repository ws_binance_time_diff_delay
I have 2 problems.
It can not run completely.
It shows:
{"id":1,"status":200,"result":{"serverTime":1718697917878},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"M
INUTE","intervalNum":1,"limit":6000,"count":3}]}
type of future_example is class std::future<class std::vector<__int64,class std::allocator<__int64> > >
test group num 10; remain num 90
1718697921760; 1718697921956; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
1718697921763; 1718697921957; 1718697918073;
And it is neither responding nor terminated then.
In the file ws_diff_delay.cpp
, on the line 84 and 85.
If I use the former, it shows:
{"id":1,"status":200,"result":{"serverTime":1718698030116},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"M
INUTE","intervalNum":1,"limit":6000,"count":3}]}
type of future_example is class std::future<class std::vector<__int64,class std::allocator<__int64> > >
test group num 10; remain num 90
1718698034095; 1718698034251; 1718698030291;
1718698034095; 1718698034314; 1718698030342;
1718698034095; 1718698034314; 1718698030342;
Process finished with exit code -1073740791 (0xC0000409)
It terminates by itself and shows a message of stackoverflow "Process finished with exit code -1073740791 (0xC0000409)".
I debugged and traced it to line 97.
It stop on the assemble file debug_assamble at line 24
subl $0xd8, %esp
movq 0x2e3942(%rip), %rax
xorq %rsp, %rax
movq %rax, 0xc0(%rsp)
andq $0x0, 0x28(%rsp)
leaq -0x26(%rip), %rax ; RaiseException
andl $0x1, %edx
movl %ecx, 0x20(%rsp)
movl %edx, 0x24(%rsp)
movq %rax, 0x30(%rsp)
testq %r9, %r9
je 0x18004468a
movl $0xf, %eax
leaq 0x40(%rsp), %rcx
cmpl %eax, %r8d
movq %r9, %rdx
cmovbel %r8d, %eax
movl %eax, %r8d
movl %r8d, 0x38(%rsp)
shlq $0x3, %r8
callq 0x1800b7c77
leaq 0x20(%rsp), %rcx
callq *0x1f864c(%rip)
nopl (%rax,%rax)
movq 0xc0(%rsp), %rcx
xorq %rsp, %rcx
callq 0x1800af760
addq $0xd8, %rsp
retq
int3
andl $0x0, 0x38(%rsp)
jmp 0x180044660
int3
int3
int3
int3
int3
int3
int3
jno 0x18004465c
popq %rbx
The scene before exception is Scene before exception
What is the difference between line 84 and line 85.
I print the type on line 72 to make sure the type is correct.
./CMakeLists.txt
cmake_minimum_required(VERSION 3.24)
project(ws_binance_time_diff_delay)
set(CMAKE_CXX_STANDARD 17)
add_definitions(
-DBOOST_THREAD_PROVIDES_FUTURE
-DBOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
-DBOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
)
add_executable(ws_binance_time_diff_delay
ws_diff_delay.cpp
)
target_include_directories(ws_binance_time_diff_delay PRIVATE ${CMAKE_SOURCE_DIR})
target_include_directories(ws_binance_time_diff_delay PRIVATE "C:/Users/Mike_Wei/CLionProjects/commonUtils")
find_package(RapidJSON CONFIG REQUIRED)
target_link_libraries(ws_binance_time_diff_delay PRIVATE rapidjson)
find_package(OpenSSL REQUIRED)
find_package(ZLIB REQUIRED)
find_package(Boost REQUIRED COMPONENTS system thread date_time log log_setup program_options)
target_include_directories(ws_binance_time_diff_delay PRIVATE ${Boost_INCLUDE_DIRS})
target_link_libraries(ws_binance_time_diff_delay PRIVATE ${Boost_LIBRARIES})
find_package(Threads REQUIRED)
target_include_directories(ws_binance_time_diff_delay PRIVATE ${OPENSSL_INCLUDE_DIR})
target_link_libraries(ws_binance_time_diff_delay PRIVATE
Boost::system
Boost::log
OpenSSL::SSL
OpenSSL::Crypto
Threads::Threads
)
find_package(jsoncpp CONFIG REQUIRED)
target_link_libraries(ws_binance_time_diff_delay PRIVATE JsonCpp::JsonCpp)
find_package(fmt CONFIG REQUIRED)
target_link_libraries(ws_binance_time_diff_delay PRIVATE fmt::fmt) ```
ws_diff_delay.cpp
#include <cstdlib>
#include <stdint.h>
#include <iostream>
#include <string>
#include <optional>
#include <future>
#include <chrono>
#include <functional>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/log/trivial.hpp>
#include <rapidjson/document.h>
#include <commonUtils.h>
class mini_ws_client {
using tcp = boost::asio::ip::tcp;
const std::string host = "testnet.binance.vision";
const std::string port = "443";
const std::string target = "/ws-api/v3";
boost::asio::ssl::context ctx{boost::asio::ssl::context::sslv23_client};
boost::asio::io_context ioc;
std::shared_ptr<boost::beast::websocket::stream<boost::asio::ssl::stream<tcp::socket> > > ws_sp;
public:
explicit mini_ws_client(
const std::string host = "testnet.binance.vision",
const std::string port = "443",
const std::string target = "/ws-api/v3"
): host(host), port(port), target(target) {
tcp::resolver resolver(ioc);
ws_sp = std::make_shared<boost::beast::websocket::stream<boost::asio::ssl::stream<tcp::socket> > >(ioc, ctx);
if (!SSL_set_tlsext_host_name(ws_sp->next_layer().native_handle(), host.c_str())) {
boost::system::error_code ec{static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category()};
throw boost::system::system_error{ec};
}
auto const results = resolver.resolve(host, port);
auto ep = boost::asio::connect(ws_sp->next_layer().next_layer(), results);
ws_sp->next_layer().handshake(boost::asio::ssl::stream_base::client);
ws_sp->set_option(boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::request_type &req) {
req.set(boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
}));
ws_sp->handshake(host, target);
}
~mini_ws_client() {
ws_sp->close(boost::beast::websocket::close_code::normal);
ws_sp.reset();
}
std::pair<double, double> get_diff_delay(
int test_times = 100,
int one_group_num = 10,
const std::string msg = "{\"id\":1,\"method\":\"time\"}"
) const {
auto future_example = std::async(
std::launch::async,
&mini_ws_client::get_one_timestamps,
this,
msg
);
std::cout << "type of future_example is " << typeid(future_example).name() << std::endl;
std::vector<int64_t> timeStamps{};
int cnt{test_times};
while (cnt) {
int cur_group_num{one_group_num};
if (cnt > one_group_num) {
cnt -= one_group_num;
} else {
cur_group_num = test_times;
cnt = 0;
}
std::cout << "test group num " << cur_group_num << "; " << "remain num " << cnt << std::endl;
std::vector<std::future<std::vector<int64_t> > > time_stamps_vec{};
// std::vector<decltype(future_example) > time_stamps_vec{};
for (int i{0}; i < cur_group_num; ++i) {
time_stamps_vec.emplace_back(
std::async(
std::launch::async,
&mini_ws_client::get_one_timestamps,
this,
msg
)
);
}
for (auto ×tamps_future: time_stamps_vec) {
auto one_timestamp = timestamps_future.get();
printVector(one_timestamp);
timeStamps.insert(timeStamps.end(), one_timestamp.begin(), one_timestamp.end());
}
}
auto diff_delay = calculateTimeDiffDelaySub(timeStamps);
return diff_delay;
}
std::vector<int64_t> get_one_timestamps(const std::string &msg) const {
std::vector<int64_t> timeStamps_one_case{};
auto presend_time = getTimeStamp();
auto response_str = request_to_response(msg);
auto postsend_time = getTimeStamp();
auto server_time = get_server_time(response_str);
if (server_time) {
timeStamps_one_case.push_back(presend_time);
timeStamps_one_case.push_back(postsend_time);
timeStamps_one_case.push_back(server_time.value());
}
printVector(timeStamps_one_case);
return timeStamps_one_case;
}
std::pair<double, double> calculateTimeDiffDelaySub(const std::vector<int64_t> &timeStamps) const {
double diff{0.};
double delay{0.};
for (int i{0}; i < timeStamps.size(); i += 3) {
diff += (
timeStamps[i]
+ timeStamps[i + 1]
- (timeStamps[i + 2] << 1)
) * 0.5;
delay += (timeStamps[i + 1] - timeStamps[i]) >> 1;
}
diff /= timeStamps.size() / 3;
delay /= timeStamps.size() / 3;
auto ret = std::make_pair(diff, delay);
return ret;
}
int64_t getTimeStamp(int64_t diff = 0LL) const {
// 获取当前时间点
auto now = std::chrono::system_clock::now();
// 将时间点转换为毫秒
auto duration = now.time_since_epoch();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
// 输出 13 位时间戳
return millis - diff;
}
std::string request_to_response(const std::string &msg) const {
ws_sp->write(boost::asio::buffer(msg));
boost::beast::flat_buffer buffer;
ws_sp->read(buffer);
const char *bufferData = reinterpret_cast<const char *>(buffer.data().data());
std::size_t bufferSize = buffer.data().size();
std::string response(bufferData, bufferSize);
return response;
}
std::optional<int64_t> get_server_time(const std::string &msg) const {
rapidjson::Document msg_doc;
msg_doc.Parse(msg.c_str());
if (msg_doc.HasParseError() || !msg_doc.IsObject()) {
BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " << "JSON parse error or not an object" <<
std::endl;
return std::nullopt;
}
if (!msg_doc.HasMember("result") || !msg_doc["result"].IsObject()) {
BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " <<
"No 'result' field or 'result' is not an object" << std::endl;
return std::nullopt;
}
const rapidjson::Value &result = msg_doc["result"];
if (!result.HasMember("serverTime") || !result["serverTime"].IsInt64()) {
BOOST_LOG_TRIVIAL(fatal) << __FILE__ << ": " << __LINE__ << "# " <<
"'serverTime' field missing or not an int64"
<< std::endl;
return std::nullopt;
}
int64_t serverTime = result["serverTime"].GetInt64();
return serverTime;
}
};
int main() {
auto ws_client = std::make_shared<mini_ws_client>("ws-api.binance.com", "443", "/ws-api/v3");
std::cout << ws_client->request_to_response("{\"id\":1,\"method\":\"time\"}") << std::endl;;
auto &[diff,delay] = ws_client->get_diff_delay();
std::cout << "diff = " << diff << std::endl;
std::cout << "delay = " << delay << std::endl;
}
0xc0000409 means STATUS_STACK_BUFFER_OVERRUN
Running your code with sanitizers:
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address,undefined ")
Quickly diagnoses some issues:
Clearly some buffer isn't valid for the duration of the write operation.
I refactored your code to be readable: https://coliru.stacked-crooked.com/a/079b4b92556cd365
// using Sample = std::array<int64_t, 3>;
struct Sample { int64_t pre, post, server; };
using Samples = std::vector<Sample>;
std::pair<double, double> calculateTimeDiffDelaySub(Samples const& samples) const {
double diff{0.};
double delay{0.};
for (auto& [pre, post, server] : samples) {
diff += (pre + post - (server * 2)) * 0.5;
delay += (post - pre) / 2.0;
}
diff /= samples.size();
delay /= samples.size();
return std::pair(diff, delay);
}
Regardless, you're creating wild threads that all use the same websocket without any synchronization. That's both bad and useless.
Did you mean to repeatedly request on the same thread or did you mean to test with several client connections in parallel?
Where each batch is necessary sequential because to run on a single client:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <iostream>
#include <rapidjson/document.h>
#include <fmt/chrono.h>
#include <fmt/ranges.h>
static inline void printVector(auto const& v) { fmt::print("printVector: {}\n", v); }
// using Sample = std::array<int64_t, 3>;
struct Sample { int64_t pre, post, server; };
using Samples = std::vector<Sample>;
using Message = std::shared_ptr<std::string const>;
static Message default_message() {
static auto instance = std::make_shared<std::string>(R"({"id":1,"method":"time"})");
return instance;
}
namespace asio = boost::asio;
namespace ssl = asio::ssl;
namespace websocket = boost::beast::websocket;
using tcp = asio::ip::tcp;
using error_code = boost::system::error_code;
using namespace std::chrono_literals;
struct Args {
std::string host = "testnet.binance.vision";
std::string port = "443";
std::string target = "/ws-api/v3";
};
class Client {
using tcp = asio::ip::tcp;
Args const args;
ssl::context ctx{ssl::context::sslv23_client};
asio::io_context ioc;
websocket::stream<ssl::stream<tcp::socket>> ws{ioc, ctx};
public:
Client(Args args = {}) : args(std::move(args)) { connect(); }
~Client() {
error_code ec;
ws.close(websocket::close_code::normal, ec);
if (ec.failed())
std::cerr << "~Client: " << ec.message() << std::endl;
}
std::string request(Message msg) {
ws.write(asio::buffer(*msg));
std::string response;
auto buf = asio::dynamic_buffer(response);
ws.read(buf);
return response;
}
private:
void connect() {
tcp::resolver resolver(ioc);
if (!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), args.host.c_str()))
throw boost::system::system_error{
error_code{static_cast<int>(::ERR_get_error()), asio::error::get_ssl_category()}};
auto results = resolver.resolve(args.host, args.port);
asio::connect(ws.next_layer().next_layer(), results);
ws.next_layer().handshake(ssl::stream_base::client);
ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
req.set(boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro");
}));
ws.handshake(args.host, args.target);
}
};
namespace Benchmark {
using clock = std::chrono::system_clock;
using time_point = clock::time_point;
int64_t getTimeStamp() { return clock::now().time_since_epoch() / 1ms; }
std::optional<int64_t> get_server_time(std::string const& msg) {
rapidjson::Document doc;
doc.Parse(msg.c_str());
if (!doc.HasParseError() && doc.IsObject() && doc.HasMember("result"))
if (auto const& result = doc["result"];
result.IsObject() && result.HasMember("serverTime") && result["serverTime"].IsInt64())
return result["serverTime"].GetInt64();
std::cerr << "Unexpected or invalid response" << std::endl;
return {};
}
std::pair<double, double> calculateTimeDiffDelaySub(Samples const& samples) {
double diff{0.};
double delay{0.};
if (!samples.empty()) {
for (auto& [pre, post, server] : samples) {
diff += (pre + post - (server * 2)) * 0.5;
delay += (post - pre) / 2.0;
}
diff /= samples.size();
delay /= samples.size();
}
return std::pair(diff, delay);
}
Samples perform_batch(Args const& args, unsigned n, Message msg) {
Samples samples;
try {
Client client(args);
for (Client c(args); n--;) {
int64_t pre = getTimeStamp();
std::string res = client.request(msg);
int64_t post = getTimeStamp();
if (std::optional<int64_t> server_time = get_server_time(res))
samples.push_back({pre, post, server_time.value()});
else
throw std::runtime_error("No server time in response");
}
} catch (boost::system::system_error const& se) {
std::cerr << "Error in perform_batch: " << se.code().message() << std::endl;
}
return samples;
}
std::pair<double, double> run(Args const& args, unsigned total = 100, unsigned pergroup = 10,
Message msg = default_message()) {
std::vector<std::future<Samples>> futs{};
for (unsigned remain{total}; unsigned batch = std::min(remain, pergroup); remain -= batch) {
std::cout << "batch " << batch << "; remain " << remain << std::endl;
for (unsigned i = 0; i < batch; ++i)
futs.emplace_back(std::async(std::launch::async, perform_batch, args, batch, msg));
}
Samples merged;
for (auto& fut : futs) {
auto one = fut.get();
merged.insert(merged.end(), one.begin(), one.end());
}
return calculateTimeDiffDelaySub(merged);
}
}
int main() {
Args args {"ws-api.binance.com", "443", "/ws-api/v3"};
std::cout << Client(args).request(default_message()) << std::endl;
auto const& [diff, delay] = Benchmark::run(args, 20, 4);
std::cout << std::fixed;
std::cout << "diff=" << diff << " delay=" << delay << std::endl;
}
Spurious errors during Client's destructor indicate that we might run into rate limiting. I'll leave that up to you to diagnose: