c++multithreadingc++11threadpool

C++11 Dynamic Threadpool


Recently, I've been trying to find a library for threading concurrent tasks. Ideally, a simple interface that calls a function on a thread. There are n number of threads at any time, some complete faster than others and arrive at different times.

First I was trying Rx, which is great in C++. I've also looked into Blocks and TBB but they are either platform dependant. For my prototype, I need to remain platform independent as we don't know what it will be running on yet and can change when decisions are made.

C++11 has a number of things for threading and concurrency and I found a number of examples like this one for thread pools.

https://github.com/bilash/threadpool

Similar projects use the same lambda expressions with std::thread and std::mutex.

This looks perfect for what I need. There some issues. The pools are started with a defined number of threads and tasks are queued until a thread is free.

How can I add new threads? Remove expired threads? (.join()??)

Obviously, this is much easier for a known number of threads as they can be initialised in the ctor and then join() in the dtor.

Any tips or pointers here from someone with experience with C++ concurrency?


Solution

    1. Start with maximum number of threads a system can support:

      int Num_Threads =  thread::hardware_concurrency();
      
    2. For an efficient threadpool implementation, once threads are created according to Num_Threads, it's better not to create new ones, or destroy old ones (by joining). There will be performance penalty, might even make your application goes slower than the serial version.

      Each C++11 thread should be running in their function with an infinite loop, constantly waiting for new tasks to grab and run.

      Here is how to attach such function to the thread pool:

      int Num_Threads = thread::hardware_concurrency();
      vector<thread> Pool;
      for(int ii = 0; ii < Num_Threads; ii++)
      {  Pool.push_back(thread(Infinite_loop_function));}
      
    3. The Infinite_loop_function

      This is a "while(true)" loop waiting for the task queue

      void The_Pool:: Infinite_loop_function()
      {
          while(true)
          {
              {
                  unique_lock<mutex> lock(Queue_Mutex);
      
                  condition.wait(lock, []{return !Queue.empty()});
                  Job = Queue.front();
                  Queue.pop();
              }
              Job(); // function<void()> type
          }
      };
      
    4. Make a function to add job to your Queue

      void The_Pool:: Add_Job(function<void()> New_Job)
      {
          {
              unique_lock<mutex> lock(Queue_Mutex);
              Queue.push(New_Job);
          }
          condition.notify_one();
      }
      
    5. Bind an arbitrary function to your Queue

      Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
      

    Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for jobs to do.