multithreadingc++11boostboost-asioasio

Understanding Boost.Asio Strand Behavior with Multiple Threads


Win12, boost::asio 1.82.0, MSVC 2022 (I used different compilers, c++ 14 and c++ 20)

I try to understand iocontext step by step. Now I want to use many threads (but I don't want to use thread pool yet, it will be the next step). So, I have the following code

#include <iostream>
#include <random>

#define _WIN32_WINNT 0x0A00
#include <boost/asio.hpp>

#include <sstream>

namespace asio = boost::asio;

static std::vector<std::string> parseLine(std::string input) {
    std::vector<std::string> result;
    std::istringstream iss(std::move(input));

    for (std::string token; getline(iss, token, ';');)
        result.push_back(token);

    return result;
}

int main() try {
    asio::io_context ioc;
    auto work = asio::make_work_guard(ioc);
    boost::asio::strand<boost::asio::io_context::executor_type> strand(ioc.get_executor());
    std::thread thread1([&ioc]() {ioc.run(); });
    std::thread thread2([&ioc]() {ioc.run(); });
    std::thread thread3([&ioc]() {ioc.run(); });

    for (std::string input; getline(std::cin, input);) {
        if (input.empty())
            break;

        for (std::string& token : parseLine(std::move(input))) {
            auto handler = [t = std::move(token)] {
                std::cout << "ThreadID = " << std::this_thread::get_id() << "; " << t << std::endl;
                };
            asio::post(ioc, handler);
        }
    }

    work.reset();
    thread1.join();
    thread2.join();
    thread3.join();
}
catch (std::exception const& exc) {
    std::cerr << exc.what() << std::endl;
}
catch (...) {
    std::cerr << "An unexpected error has happened" << std::endl;
}

And for input like

aaa;bbb;ccc;ddd;eee;fff;ggg

there is output like

ThreadID = 61740; aaaThreadID = 42060; ccc
ThreadID = 42060; ddd
ThreadID = 55384; bbb
ThreadID =
ThreadID = 42060; eee
ThreadID = 61740; ggg
55384; fff

All three threads are involved. But I want to see each message on a separate line. So, I can use mutex like this

#include <iostream>
#include <random>

#define _WIN32_WINNT 0x0A00
#include <boost/asio.hpp>

#include <sstream>

#include <mutex>

std::mutex gMutex;

namespace asio = boost::asio;

static std::vector<std::string> parseLine(std::string input) {
    std::vector<std::string> result;
    std::istringstream iss(std::move(input));

    for (std::string token; getline(iss, token, ';');)
        result.push_back(token);

    return result;
}

int main() try {
    asio::io_context ioc;
    auto work = asio::make_work_guard(ioc);
    boost::asio::strand<boost::asio::io_context::executor_type> strand(ioc.get_executor());
    std::thread thread1([&ioc]() {ioc.run(); });
    std::thread thread2([&ioc]() {ioc.run(); });
    std::thread thread3([&ioc]() {ioc.run(); });

    for (std::string input; getline(std::cin, input);) {
        if (input.empty())
            break;

        for (std::string& token : parseLine(std::move(input))) {
            auto handler = [t = std::move(token)] {
                const std::lock_guard<std::mutex> lock(gMutex);
                std::cout << "ThreadID = " << std::this_thread::get_id() << "; " << t << std::endl;
                };
            asio::post(ioc, handler);
        }
    }

    work.reset();
    thread1.join();
    thread2.join();
    thread3.join();
}
catch (std::exception const& exc) {
    std::cerr << exc.what() << std::endl;
}
catch (...) {
    std::cerr << "An unexpected error has happened" << std::endl;
}

And in this case I have output like

ThreadID = 60980; bbb
ThreadID = 36852; aaa
ThreadID = 63436; ccc
ThreadID = 63436; fff
ThreadID = 36852; eee
ThreadID = 60980; ddd
ThreadID = 63436; ggg

It's ok, but I want to use only asio objects. So, I tried to use strand like

#include <iostream>
#include <random>

#define _WIN32_WINNT 0x0A00
#include <boost/asio.hpp>

#include <sstream>

#include <mutex>

namespace asio = boost::asio;

static std::vector<std::string> parseLine(std::string input) {
    std::vector<std::string> result;
    std::istringstream iss(std::move(input));

    for (std::string token; getline(iss, token, ';');)
        result.push_back(token);

    return result;
}

int main() try {
    asio::io_context ioc;
    auto work = asio::make_work_guard(ioc);
    boost::asio::strand<boost::asio::io_context::executor_type> strand(ioc.get_executor());
    std::thread thread1([&ioc]() {ioc.run(); });
    std::thread thread2([&ioc]() {ioc.run(); });
    std::thread thread3([&ioc]() {ioc.run(); });

    for (std::string input; getline(std::cin, input);) {
        if (input.empty())
            break;

        for (std::string& token : parseLine(std::move(input))) {
            auto handler = [t = std::move(token)] {
                std::cout << "ThreadID = " << std::this_thread::get_id() << "; " << t << std::endl;
                };
            asio::post(strand, handler);
        }
    }

    work.reset();
    thread1.join();
    thread2.join();
    thread3.join();
}
catch (std::exception const& exc) {
    std::cerr << exc.what() << std::endl;
}
catch (...) {
    std::cerr << "An unexpected error has happened" << std::endl;
}

But now the output is

ThreadID = 63552; aaa
ThreadID = 50452; bbb
ThreadID = 50452; ccc
ThreadID = 50452; ddd
ThreadID = 50452; eee
ThreadID = 50452; fff
ThreadID = 50452; ggg

And the question is: why does strand use only two threads (not all three)? And why does the first thread is used only one time? Is it possible to understand it from asio documentation?


Solution

  • I saw on coliru.stacked-crooked.com/a/8ee87f50f47703e5 and coliru.stacked-crooked.com/a/318a748e13b267a6, but it seems that strand does not work as I expected even with thread_pool and its output is the same. So, is it worth to use strand (instead of mutex for instance) if I want to distibute all works uniformally? Maybe I try to use strand in a wrong way? Valentyn Vovk

    If using a strand specifically avoid distributing all works uniformly. It causes all work to run sequentially on unspecified threads. In simple cases like this it will often use the same thread as an optimization. (see e.g. boost.asio spawn call handler directly from current stack and `strand::running_in_this_thread()` false positive).

    Your version with the gMutex is pretty much the same as my version with osyncstream and no strand.

    To write the same with only Asio synchronization, you'd have to have an "output strand":

    Live On Coliru

    #include <boost/asio.hpp>
    #include <iostream>
    #include <sstream>
    #include <syncstream>
    #include <random>
    
    using namespace std::chrono_literals;
    namespace asio = boost::asio;
    
    static constexpr auto now        = std::chrono::steady_clock::now;
    static auto const     start_time = now();
    
    inline static std::atomic_int tidgen    = 0;
    thread_local static int const thread_id = tidgen++;
    
    static void fake_work() {
        static thread_local auto dist = bind(      //
            std::uniform_int_distribution(15, 50), //
            std::mt19937{std::random_device{}()});
    
        using namespace std::chrono_literals;
        std::this_thread::sleep_for(dist() * 1ms);
    }
    
    static std::vector<std::string> parseLine(std::string input) {
        std::vector<std::string> result;
        std::istringstream iss(std::move(input));
    
        for (std::string token; getline(iss, token, ';');)
            result.push_back(token);
    
        return result;
    }
    
    static void driver(auto ex, std::istream& is) {
        auto output_strand = make_strand(ex);
        for (std::string input; getline(is, input);) {
            if (input.empty())
                break;
    
            for (std::string& token : parseLine(std::move(input))) {
                auto handler = [output_strand, t = std::move(token)]() mutable {
                    fake_work();
                    post(output_strand, [work_tid = thread_id, t = std::move(t)] {
                        std::cout << std::setw(8) << (now() - start_time) / 1ms << "ms" //
                                  << " ThreadID = " << work_tid << "; " << t            //
                                  << " (output from thread " << thread_id << ")" << std::endl;
                    });
                };
                post(ex, handler);
            }
        }
    }
    
    int main(int argc, char** argv) try {
        asio::thread_pool ioc;
        auto ex = ioc.get_executor();
    
        if (argc > 1) {
            std::stringstream test;
            copy(argv + 1, argv + argc, std::ostream_iterator<std::string_view>(test, "\n"));
            driver(ex, test);
        } else {
            driver(ex, std::cin);
        }
    
        ioc.join();
    }
    catch (std::exception const& exc) {
        std::cerr << exc.what() << std::endl;
    }
    

    Printing

      27ms ThreadID = 0; b (output from thread 0)
      30ms ThreadID = 1; f (output from thread 1)
      32ms ThreadID = 2; e (output from thread 2)
      32ms ThreadID = 3; g (output from thread 2)
      46ms ThreadID = 4; d (output from thread 4)
      47ms ThreadID = 5; a (output from thread 5)
      49ms ThreadID = 6; c (output from thread 6)
    

    HOWEVER

    There are problems with that approach:

    enter image description here

    When we post enough jobs to saturate the pool, the output strand is starved! The second command posts 7+26*26 jobs, and all the output appears at the end:

    1374ms ThreadID = 0; g (output from thread 14)
    1374ms ThreadID = 1; e (output from thread 14)
    1374ms ThreadID = 2; b (output from thread 14)
    1374ms ThreadID = 3; d (output from thread 14)
    1374ms ThreadID = 4; ah (output from thread 14)
    1374ms ThreadID = 5; ag (output from thread 14)
    1374ms ThreadID = 6; aa (output from thread 14)
    1374ms ThreadID = 7; ad (output from thread 14)
    1374ms ThreadID = 8; a (output from thread 14)
    ...
    1400ms ThreadID = 7; zv (output from thread 7)
    1401ms ThreadID = 9; zt (output from thread 9)
    1404ms ThreadID = 8; zz (output from thread 8)
    1418ms ThreadID = 13; zx (output from thread 13)
    

    Basically, at 1374ms finally thread 14 gets the greenlight to run the first output that was posted - only after all jobs - and then thread 14 continuous to invoke all the queued up output handlers, with only the very very last jobs being distributed normally, because the pool wasn't saturated anymore for the last ~15 jobs.

    It's clear that the only way this will work as you'd like is with a dedicated thread for the output:

    static void driver(auto ex, auto output, std::istream& is) {
        for (std::string input; getline(is, input);) {
            if (input.empty())
                break;
    
            for (std::string& token : parseLine(std::move(input))) {
                auto handler = [output, t = std::move(token)]() mutable {
                    fake_work();
                    post(output, [work_tid = thread_id, t = std::move(t)] {
                        std::cout << std::setw(8) << (now() - start_time) / 1ms << "ms" //
                                  << " ThreadID = " << work_tid << "; " << t            //
                                  << " (output from thread " << thread_id << ")" << std::endl;
                    });
                };
                post(ex, handler);
            }
        }
    }
    
    int main(int argc, char** argv) try {
        asio::thread_pool ioc, console_ioc(1);
    
        auto ex     = ioc.get_executor();
        auto output = console_ioc.get_executor();
    
        if (argc > 1) {
            std::stringstream test;
            copy(argv + 1, argv + argc, std::ostream_iterator<std::string_view>(test, "\n"));
            driver(ex, output, test);
        } else {
            driver(ex, output, std::cin);
        }
    
        ioc.join();
    } catch (std::exception const& exc) { std::cerr << exc.what() << std::endl; }
    

    Indeed, now we see fair output, always from the same thread:

      17ms ThreadID = 0; c (output from thread 1)
      20ms ThreadID = 2; d (output from thread 1)
      22ms ThreadID = 3; e (output from thread 1)
    ...
    1433ms ThreadID = 13; zz (output from thread 1)
    1435ms ThreadID = 0; zx (output from thread 1)
    

    Summary

    Yes, locking has overhead. Also, it doesn't compose well with asynchronous operations. However, you will agree that in this particular case the overhead of locking is likely (much) lower than full-blown thread switching and task scheduling.

    As long as the code under lock is "instant" - never blocking - and there are no interdependencies with other async tasks, mutual exclusion is the natural solution, instead of strands.