c++multithreadingc++11packaged-taskstd-future

Not all std::packaged_tasks executed when inside std::async calls


I have a rather complex code with std::async calls and std::packaged_task, that fails to execute to the very end. I simplified it to the minimal reproducable example.

Two async functions are called one after another, inside of which there are packaged_tasks executed asynchronously using std::async. Then both we wait for both async functions to finish, using corresponding future.wait() method. The execution stops at futY.wait(); and second packaged_task is never executed (No second Inside handler func log).

#include <iostream>     // std::cout
#include <future>       // std::packaged_task, std::future
#include <exception>
#include <vector>
#include <list>
#include <memory>
#include <functional>

std::list<std::function<bool(const std::vector<int> &data)>> handlers_;

std::future<std::vector<int>> countup(int from, int to) {
    std::function<std::vector<int>(std::exception_ptr, std::vector<int>)> func = [=] (std::exception_ptr ex, std::vector<int> data) {
        std::cout << "Inside handler func " << from << " " << to << std::endl;
        if (ex != nullptr) {
            std::rethrow_exception(ex);
        }
        return data;
    };
    auto packageP = std::make_shared<std::packaged_task<std::vector<int>(std::exception_ptr, std::vector<int>)>>(func);

    auto fut = packageP->get_future();
    handlers_.push_back([packageP] (const std::vector<int> &data) mutable -> bool {
            std::cout << "Calling handler with data, size: " << data.size() << std::endl;

            (*packageP)(nullptr, data);
            return data.size();
        }
    );

    auto fut2 = std::async(std::launch::async, [=, &handlers_] {
            std::cout << "Before handler called " << from << to << std::endl;

            std::vector<int> vec ( to, from );

            auto res = (*handlers_.begin())(vec);

            std::cout << "Handler result " << res << " for " << from << " " << to << std::endl;
        });

    std::cout << "Called async in countup for " << from << " " << to << std::endl;

    return fut;
}

int main ()
{
  auto futX = std::async(std::launch::async, [] {
      auto fut1 = std::async(std::launch::async, [] {
        auto fut2 = countup(0, 2);

        std::cout << "Called X countup and waiting to finish" << std::endl;

        fut2.wait();

        auto vec = fut2.get();
        std::cout << "The X countup returned" << std::endl;
      });
      std::cout << "Called X async internal and waiting to finish" << std::endl;
      fut1.wait();

      return 2;
  });

  std::cout << "Called async X and waiting to finish" << std::endl;

  auto futY = std::async(std::launch::async, [] {
      auto fut1 = std::async(std::launch::async, [] {
        auto fut2 = countup(0, 3);

        std::cout << "Called Y countup and waiting to finish" << std::endl;
        fut2.wait();

        auto vec = fut2.get();
        std::cout << "The Y countup returned " << std::endl;
      });
      std::cout << "Called Y async internal and waiting to finish" << std::endl;
      fut1.wait();

      return 3;
  });

  std::cout << "Called async Y and waiting to finish" << std::endl;

  futX.wait();
  std::cout << "After async X and waiting to finish" << std::endl;

  futY.wait();
  std::cout << "After async Y and waiting to finish" << std::endl;

  int valueX = futX.get();                  // wait for the task to finish and get result
  int valueY = futY.get();                  // wait for the task to finish and get result

  std::cout << "The countdown lasted for " << valueX  << " " << valueY << " seconds" << std::endl;

  return 0;
}

The log is the following:

Called async X and waiting to finish
Called async Y and waiting to finish
Called X async internal and waiting to finish
Called Y async internal and waiting to finish
Called async in countup for Before handler called 02
0 2
Calling handler with data, size: 2
Inside handler func 0Called async in countup for  2
Handler result 01 for 0 2
 Before handler called 03
3Calling handler with data, size: 
Called X countup and waiting to finish
The X countup returned
3
After async X and waiting to finish
Called Y countup and waiting to finish

Could you please clarify why the code is never executed till the end?


Solution

  • You are re-using the same packaged task when you call (*handlers_.begin()), and I think you have a data race when you handlers_.push_back.

    You don't need the global handlers_, you can just capture a local handler.

    #include <iostream>     // std::cout
    #include <future>       // std::packaged_task, std::future
    #include <exception>
    #include <vector>
    #include <list>
    #include <memory>
    #include <functional>
    
    std::future<std::vector<int>> countup(int from, int to) {
        auto func = [=] (std::exception_ptr ex, std::vector<int> data) {
            std::cout << "Inside handler func " << from << " " << to << std::endl;
            if (ex != nullptr) {
                std::rethrow_exception(ex);
            }
            return data;
        };
        auto packageP = std::make_shared<std::packaged_task<std::vector<int>(std::exception_ptr, std::vector<int>)>>(func);
    
        auto fut = packageP->get_future();
        auto handler = [packageP] (const std::vector<int> &data) mutable -> bool {
            std::cout << "Calling handler with data, size: " << data.size() << std::endl;
    
            (*packageP)(nullptr, data);
            return data.size();
        };
    
        auto fut2 = std::async(std::launch::async, [=]() mutable {
                std::cout << "Before handler called " << from << to << std::endl;
    
                std::vector<int> vec ( to, from );
    
                auto res = handler(vec);
    
                std::cout << "Handler result " << res << " for " << from << " " << to << std::endl;
            });
    
        std::cout << "Called async in countup for " << from << " " << to << std::endl;
    
        return fut;
    }
    
    int main ()
    {
      auto futX = std::async(std::launch::async, [] {
          auto fut1 = std::async(std::launch::async, [] {
            auto fut2 = countup(0, 2);
    
            std::cout << "Called X countup and waiting to finish" << std::endl;
    
            fut2.wait();
    
            auto vec = fut2.get();
            std::cout << "The X countup returned" << std::endl;
          });
          std::cout << "Called X async internal and waiting to finish" << std::endl;
          fut1.wait();
    
          return 2;
      });
    
      std::cout << "Called async X and waiting to finish" << std::endl;
    
      auto futY = std::async(std::launch::async, [] {
          auto fut1 = std::async(std::launch::async, [] {
            auto fut2 = countup(0, 3);
    
            std::cout << "Called Y countup and waiting to finish" << std::endl;
            fut2.wait();
    
            auto vec = fut2.get();
            std::cout << "The Y countup returned " << std::endl;
          });
          std::cout << "Called Y async internal and waiting to finish" << std::endl;
          fut1.wait();
    
          return 3;
      });
    
      std::cout << "Called async Y and waiting to finish" << std::endl;
    
      futX.wait();
      std::cout << "After async X and waiting to finish" << std::endl;
    
      futY.wait();
      std::cout << "After async Y and waiting to finish" << std::endl;
    
      int valueX = futX.get();                  // wait for the task to finish and get result
      int valueY = futY.get();                  // wait for the task to finish and get result
    
      std::cout << "The countdown lasted for " << valueX  << " " << valueY << " seconds" << std::endl;
    
      return 0;
    }
    

    See it on coliru