c++multithreadingasynchronous

What is the best way to execute C++ functions asynchronously when the functions don't return a value?


I am working on a project that I need to be able to introduce some asynchronous methods for, but am not entirely sure the best way to go about it. I have read up some on std::async, std::thread, std::packaged_task, and some of the capabilities with Boost, but all of the examples of these I found seemed dependent of some fort of return value from the function being run, whereas I do not need that capability.

For some context of what we are trying to do, we have several listener classes that read data being broadcast on a server, and at the start of the application we currently have methods to initialize these listeners being run in a serial fashion, which works, but we wish to change this behavior slightly because these initialization functions are blocking, and the functions may not return very quickly, slowing down the startup for the application. We would like to be able to create some capability where the init calls are all run asynchronously, so that it doesn't matter the order that they finish in, and then once they are all connected (or X seconds have passed without successfully initializing), then the application continues.

I am unable to provide actual code, but I can provide something similar to attempt to help clarify.

Important context: There are 9 listeners that all inherit from the same base class (call it ServerListener for simplicity). Some of these listeners are different instances of the same class of that type of listener (EX: We have 3 PositionListeners that all do different things, but are the same class, which inherits from ServerListener).

The actual structure of the Listener is not important, nor do I necessarily believe relevant to the question, as the listeners themselves work perfectly fine.

On to the example code for my question:

//Create the listeners
PositionListener listen1;
PositionListener listen2;

//Init listeners
//This is how we currently have things working, where one Init() runs and finishes, then the next starts.
//This behavior is blocking and is what we wish to change, or at least speed up.
listen1.Init();
listen2.Init();

What we would like to happen is rather than listen1.Init() running, finishing, then starting listen2.Init(), we would like both Init() functions to be started at the same time, whether it be in threads, through tasks, etc., and then once all have been finished, the program may continue to the actual data processing phase.

To try to sum this up (though I feel I am repeating myself a lot, but better safe than sorry), rather than the Init() functions being run sequentially, in some sort of order, we just want to tell them all to Init(), not care about the order they do it in, to try to speed this section up.

The server that broadcasts data that these listeners need to see before the Init() finishes does not always output data in the same order, and it blasts out a lot of data, sometimes taking several seconds before outputting the data that any one listener wants, making the startup for the application very slow if, for example, listen1 needs to see data from the server that is the 50th message sent, whereas listener2 wants to see data that was the 2nd message sent, that won't be sent again for several seconds, possibly not until the say 230th message. Rather than waiting for listen1 to get its data to Init(), then waiting for the data that listen2 wants for its Init(), we want to start them both at the same time so that listen2 can finish its Init() off that hypothetical 2nd message from the server, then listen1 can Init() whenever it sees the data it wants.

As mentioned earlier, the actual operation of the listeners is irrelevant to the question, I am not asking to be given the exact code to be injected to make it work, but I would simply like to know the best methods or libraries to try to use to do what we are wanting, so that I can research myself how to get that method to work for our needs. Ideally, the method used would be non-blocking (at least as it pertains to the listeners, we are absolutely okay with it blocking the rest of the application, we just want the Init() calls to not be blocked by each other). We do have some limitations on libraries we can use, as we are developing on organization machines, but that is a problem I can solve if I can get some suggestions on how to best get the desired behavior. Explanations with code examples (as with my example, I don't need exact full source code, even pseudocode would be acceptable, so long as it conveys how to use the suggested method) would be most appreciated, as I learn best from that.

Thanks in advance for any useful tips. And please, as I have had it happen before, if the question needs clarification, leave a comment asking for more detail rather than just closing the question out with arbitrary "missing information" remarks, as I am NOT able to provide the actual code or even a working example with the same concepts, both because replicating the code is not relevant to the question being asked, and because this is organizational code that I am not at the liberty to share. I am the only one tasked with solving this task, but I am not very well versed in multi-threading or asynchronous operation, and need some assistance, and I am the only programmer on my team, so I cannot ask my coworkers for help here. I am not asking for the solution to be handed to me, just suggestions as to which libraries may make this task easier to undergo, and how those libraries may be used to accomplish what I am trying to do.


Solution

  • If each task takes a few seconds then you can afford to spawn a thread for each task using std::thread or std::jthread, so long as the tasks don't depend on one another, it takes a few tens to hundreds of microseconds to spawn a thread.

    #include <thread>
    #include <chrono>
    #include <iostream>
    #include <syncstream>
    
    struct PositionListener
    {
        void Init()
        {
            std::osyncstream{ std::cout } << "task started!\n";
            std::this_thread::sleep_for(std::chrono::seconds{ 1 });
            std::osyncstream{ std::cout } << "task ended!\n";
        }
    };
    
    int main()
    {
        std::vector<std::thread> threads;
        PositionListener listen1;
        PositionListener listen2;
    
        threads.push_back(std::thread{ [&]() {listen1.Init(); } });
        threads.push_back(std::thread{ [&]() {listen2.Init(); } });
    
        for (auto&& thread : threads)
        {
            thread.join();
        }
    }
    

    If there is dependency among them then std::packaged_task<void()> can be used to "wait" on another function to end, it also establishes the correct synchronization, so you are free to use whatever the other task has produced without race conditions.

    #include <thread>
    #include <chrono>
    #include <future>
    #include <iostream>
    #include <syncstream>
    
    struct PositionListener
    {
        void Init(std::future<void>* dependency)
        {
            if (dependency)
            {
                dependency->get(); // wait for dependency
            }
    
            std::osyncstream{ std::cout } << "task started!\n";
            std::this_thread::sleep_for(std::chrono::seconds{1});
            std::osyncstream{ std::cout } << "task ended!\n";
        }
    };
    
    int main()
    {
        std::vector<std::thread> threads;
        PositionListener listen1;
        PositionListener listen2;
    
        std::packaged_task<void()> work1{ [&]() {listen1.Init(nullptr); } };
        auto work1_future = work1.get_future();
        threads.push_back(std::thread{ [&]() {work1(); } });
    
        // task2 depends on task 1
        threads.push_back(std::thread{ [&]() {listen2.Init(&work1_future); } });
        for (auto&& thread : threads)
        {
            thread.join();
        }
    }
    

    For IO tasks, you shouldn't use std::async, it is designed for computational tasks, not IO ones, it uses a bounded threadpool, so it can execute only a finite number of tasks concurrently, the pool is big enough to saturate the cores with computational tasks, not IO ones.

    However, if the tasks take a few microseconds then you should be using std::async and it will re-use threads, so you don't need to pay a few hundred microseconds of delay on each task.


    Lastly, if you are using a package for asynchronous IO like asio then you can launch them all in parallel on the main thread, which will actually be faster. (no thread spawning overhead), using either coroutines or callbacks when the "events" happen, they will be concurrent, but not parallel.