c++multithreadingboost-asioboost-beastboost-beast-websocket

IO Context is not running in separate threads


I am trying to run a Kraken websocket client for two symbols, AUD/USD and AUD/JPY, in separate IOContexts in separate threads, using C++ and the Boost.Beast library. I have 6 cores available and want to run each symbol in a separate thread. However, when I run the code, the program terminates immediately without any errors and does not print any data to the console, which should be defined inside the message handler.

http://coliru.stacked-crooked.com/a/51f248c085656de7

kraken_config.json

{

 "AUD/USD": 0,
 "AUD/JPY": 1
}

The overall idea of these two functions is to create a multi-threaded architecture that allows for running multiple WebSocket subscriptions for a large number of symbols in separate threads, while limiting the number of threads to the number of available cores :

void run_event_loop(const std::vector<std::string>& symbols, net::io_context& ioc)
{
    ssl::context ctx{ssl::context::tlsv12_client};
    ctx.set_verify_mode(ssl::verify_peer);
    ctx.set_default_verify_paths();

    for (const auto& symbol : symbols) {
        std::cout << symbol << std::endl;
        auto krakenws = std::make_shared<krakenWS>(ioc.get_executor(), ctx);  
        krakenws->subscribe_orderbook(symbol, 10);
    }
    
    ioc.run(); // this will block until all asynchronous operations have completed
}

void run_threads_in_cores(){
    const std::size_t num_cores = std::thread::hardware_concurrency(); 

    std::vector<std::string> symbols;
    std::map<std::string, int> partition_map = load_symbols_partition_map();
    
    for (const auto& pair : partition_map) {
        symbols.push_back(pair.first);
    }
    
    std::vector<std::thread> threads;
    // partition symbols into groups based on the number of available cores
    std::vector<std::vector<std::string>> symbol_groups(num_cores);

    std::size_t i = 0;
    for (const auto& symbol : symbols) {
        symbol_groups[i++ % num_cores].push_back(symbol);
    }

    for (const auto& symbol_group : symbol_groups) {
        if(symbol_group.empty()){ // if symbols is less than number of cores you dont need to start the thread
            continue;
        }
        net::io_context ioc;
        threads.emplace_back([&symbol_group, &ioc]() { run_event_loop(symbol_group, ioc); });
    }

    std::for_each(threads.begin(), threads.end(), [](std::thread& t) { t.join(); });
}

But when i call run_threads_in_core() it launches two threads for run_event_loop() for each symbol (in this case we have two symbols and num_cores = 6), since we have more cores than number of symbols we can create thread for each symbol. ioc.run() seems not to run inside the run_event_loop() function because the whole program returns immediately with symbols being printed to the console.

But when I manually implement the each symbols websocket io_context in seperate thread it runs fine :

http://coliru.stacked-crooked.com/a/2cc33f6037a3b01f Output :

using host_: ws.kraken.com
using host_: ws.kraken.com
Sending : {"event":"subscribe","pair":["AUD/USD"],"subscription":{"depth":10,"name":"book"}}
Sending : {"event":"subscribe","pair":["AUD/JPY"],"subscription":{"depth":10,"name":"book"}}
Kraken Orderbook snapshot : {"channelID":176,"channelName":"book-10","event":"subscriptionStatus","pair":"AUD/USD","status":"subscribed","subscription":{"depth":10,"name":"book"}}
Kraken Orderbook snapshot : {"channelID":144,"channelName":"book-10","event":"subscriptionStatus","pair":"AUD/JPY","status":"subscribed","subscription":{"depth":10,"name":"book"}}

I tried to sleep for 1 second before ioc.run() http://coliru.stacked-crooked.com/a/c6e0d562ab156154

Output :

AUD/JPY
AUD/USD
using host_: ws.kraken.com
using host_: ws.kraken.com
Sending : {"event":"subscribe","pair":["AUD/JPY"],"subscription":{"depth":10,"name":"book"}}
read: Operation canceled
Sending : {"event":"subscribe","pair":["AUD/USD"],"subscription":{"depth":10,"name":"book"}}
Segmentation fault (core dumped)

I suspect that the issue may be related to how I defined those two functions (run_threads_in_core,run_event_loop), but I am not sure. Can someone please help me identify the issue and suggest a solution? Thank you.


Solution

  • I read through the whole thing. There are multiple things raising concerns.

    Your actual problem is here:

    threads.emplace_back([&symbol_group, &ioc]() { run_event_loop(symbol_group, ioc); });
    

    Don't capture by reference. That is just undefined behaviour since you don't ensure lifetime (symbol_group is a loop variable) and it gets modified leading to data race. E.g. use

    threads.emplace_back([=, &ioc]() { run_event_loop(symbol_group, ioc); });
    

    Out Of The Box

    The whole premise seems to be that "using separate threads is going to be faster". Unless this is true due to reduces lock contention in code not shown, this is very tentative. Usually, network IO benefits from multiplexing on a single thread.