c++multithreadingc++11packaged-task

Add a std::packaged_task to an existing thread?


Is there an standard way to add a std::packaged_task to an existing thread? There's a nontrivial amount of overhead that must happen before the task is run, so I want to do that once, then keep the thread running and waiting for tasks to execute. I want to be able to use futures so I can optionally get the result of the task and catch exceptions.

My pre-C++11 implementation requires my tasks to inherit from an abstract base class with a Run() method (a bit of a pain, can't use lambdas), and having a std::deque collection of those that I add to in the main thread and dequeue from in the worker thread. I have to protect that collection from simultaneous access and provide a signal to the worker thread that there's something to do so it isn't spinning or sleeping. Enqueing something returns a "result" object with a synchronization object to wait for the task to complete, and a result value. It all works well but it's time for an upgrade if there's something better.


Solution

  • Here is a toy thread pool:

    template<class T>
    struct threaded_queue {
      using lock = std::unique_lock<std::mutex>;
      void push_back( T t ) {
        {
          lock l(m);
          data.push_back(std::move(t));
        }
        cv.notify_one();
      }
      boost::optional<T> pop_front() {
        lock l(m);
        cv.wait(l, [this]{ return abort || !data.empty(); } );
        if (abort) return {};
        auto r = std::move(data.back());
        data.pop_back();
        return std::move(r);
      }
      void terminate() {
        {
          lock l(m);
          abort = true;
          data.clear();
        }
        cv.notify_all();
      }
      ~threaded_queue()
      {
        terminate();
      }
    private:
      std::mutex m;
      std::deque<T> data;
      std::condition_variable cv;
      bool abort = false;
    };
    struct thread_pool {
      thread_pool( std::size_t n = 1 ) { start_thread(n); }
      thread_pool( thread_pool&& ) = delete;
      thread_pool& operator=( thread_pool&& ) = delete;
      ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
      template<class F, class R=std::result_of_t<F&()>>
      std::future<R> queue_task( F task ) {
        std::packaged_task<R()> p(std::move(task));
        auto r = p.get_future();
        tasks.push_back( std::move(p) );
        return r;
      }
      template<class F, class R=std::result_of_t<F&()>>
      std::future<R> run_task( F task ) {
        if (threads_active() >= total_threads()) {
          start_thread();
        }
        return queue_task( std::move(task) );
      }
      void terminate() {
        tasks.terminate();
      }
      std::size_t threads_active() const {
        return active;
      }
      std::size_t total_threads() const {
        return threads.size();
      }
      void clear_threads() {
        terminate();
        threads.clear();
      }
      void start_thread( std::size_t n = 1 ) {
        while(n-->0) {
          threads.push_back(
            std::async( std::launch::async,
              [this]{
                while(auto task = tasks.pop_front()) {
                  ++active;
                  try{
                    (*task)();
                  } catch(...) {
                    --active;
                    throw;
                  }
                  --active;
                }
              }
            )
          );
        }
      }
    private:
      std::vector<std::future<void>> threads;
      threaded_queue<std::packaged_task<void()>> tasks;
      std::atomic<std::size_t> active;
    };
    

    copied from another answer of mine.

    A thread_pool with 1 thread matches your description pretty much.

    The above is only a toy, a real thread pool I'd replace the std::packaged_task<void()> with a move_only_function<void()>, which is all I use it for. (A packaged_task<void()> can hold a packaged_task<R()> amusingly, if inefficiencly).

    You will have to reason about shutdown and make a plan. The above code locks up if you try to shut it down without first clearing the threads.