c++linuxmultithreadingboostc++20

Boost thread crash


Can I use boost thread + atomic built with c++20 flag. I didn't find anything mentioning this possibility in boost documentation of those libraries.

I had an application that works fine with gcc 7.1 c++17 boost 1.75 but when upgrading to gcc 11.1 c++20 I got crash in boost thread

Sanitizer does not report any issue.

The program uses boost condition_variable.

Live On Compiler Explorer

#include "boost/thread/thread.hpp"
#include "boost/shared_ptr.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
#include "boost/utility.hpp"
#include "boost/thread/condition_variable.hpp"
#include <boost/thread/thread.hpp>
#include <thread>

#include <algorithm>
#include <cassert>
#include <atomic>


#include <vector>

class Dispatcher;

class Task
{
public:
   virtual void run() = 0;

   virtual ~Task() {};
};


class TaskPool : boost::noncopyable
{
public:
   typedef boost::shared_ptr<Task>            task_ptr_type;
   typedef boost::shared_ptr<Dispatcher>  dispatcher_ptr_type;
   typedef std::vector<dispatcher_ptr_type>         thread_pool_type;
   typedef boost::posix_time::time_duration         time_duration_type;
   typedef std::size_t                              size_type;

   TaskPool(const size_type Size);

   ~TaskPool();

   size_type maxSize() const { return max_size_; }

   size_type watermark() const { return watermark_; }

   void setInactivTm(const time_duration_type& Inactivity_Time_Out);

   const time_duration_type& getInactivTm() const { return inactivity_time_out_; }

   void execute(const task_ptr_type& Task);

private:
   typedef boost::mutex            mutex_type;
   typedef mutex_type::scoped_lock lock_type;

   mutable mutex_type mutex_;

   size_type          max_size_;
   thread_pool_type * thread_pool_;

   time_duration_type inactivity_time_out_;

   size_type          watermark_;
   size_type          invocations_;
   size_type          executions_;
   size_type          loops_;
};


class Dispatcher : boost::noncopyable
{
public:
   typedef TaskPool::task_ptr_type      task_ptr_type;
   typedef TaskPool::time_duration_type time_duration_type;
   typedef TaskPool::size_type          size_type;

   Dispatcher();

   ~Dispatcher();

   void setInactivTm(const time_duration_type& Inactivity_Time_Out);

   bool waitReady(const time_duration_type& Time_Out);

   void execute(const task_ptr_type& Task);

   void terminate();

   static time_duration_type defaultActivTm()
   {
      return boost::posix_time::milliseconds( 1 );
   }

   static time_duration_type minActivTm()
   {
      return boost::posix_time::milliseconds( 1 );
   }

private:
   typedef boost::mutex              mutex_type;
   typedef boost::condition_variable condition_variable_type;
   typedef mutex_type::scoped_lock   lock_type;

   friend class Runner;

   bool Queued_() const volatile;
   bool NotQueued_() const volatile;

   void execute_();

   boost::thread thread_;
   task_ptr_type task_;

   mutable mutex_type      mutex_;
   condition_variable_type task_busy_cond_;
   condition_variable_type task_available_cond_;

   volatile bool is_terminated_;

   time_duration_type inactivity_time_out_;

   size_type invocations_;
   size_type executions_;
   size_type thread_created_;
   size_type thread_terminated_;
};


class Runner
{
public:
   explicit Runner(Dispatcher * Disp)
    : disp_( Disp )
   { }

   void operator()()
   {
      disp_->execute_();
   }

private:
   Dispatcher * const disp_;
};


Dispatcher::Dispatcher()
 : is_terminated_( false ),
   inactivity_time_out_( defaultActivTm() ),
   invocations_( 0 ),
   executions_( 0 ),
   thread_created_( 0 ),
   thread_terminated_( 0 )
{ }


Dispatcher::~Dispatcher()
{
   terminate();
}


void Dispatcher::setInactivTm(const time_duration_type& Inactivity_Time_Out)
{
   lock_type lock( mutex_ );

   inactivity_time_out_ = Inactivity_Time_Out;
   assert( inactivity_time_out_ >= minActivTm() );
}


bool Dispatcher::waitReady(const time_duration_type& Time_Out)
{
   lock_type lock( mutex_ );

   if ( !is_terminated_ &&
        (thread_.get_id() == boost::thread::id()) )
   {
      return true;
   }
   while ( Queued_() )
   {
      if ( !task_busy_cond_.timed_wait(lock,
                                       Time_Out) )
      {
         return false;
      }
   }
   return !is_terminated_;
}


void Dispatcher::execute(const task_ptr_type& Task)
{
   lock_type lock( mutex_ );

   if ( thread_.get_id() == boost::thread::id() )
   {
      //std::cout << "new thread\n";
      thread_created_ += 1;
      thread_ = boost::thread( Runner(this) );
   }
   while ( Queued_() )
   {
      task_busy_cond_.wait(lock);
   }
   if ( !is_terminated_ )
   {
      task_ = Task;
      task_available_cond_.notify_one();
   }
   invocations_ += 1;
}


void Dispatcher::terminate()
{
   is_terminated_ = true;

   thread_.interrupt();
   thread_.join();
}


bool Dispatcher::Queued_() const volatile
{
   return const_cast<const task_ptr_type&>(task_) &&
          !is_terminated_;
}


bool Dispatcher::NotQueued_() const volatile
{
   return !const_cast<const task_ptr_type&>(task_) &&
          !is_terminated_;
}


void Dispatcher::execute_()
{
   {
      lock_type lock( mutex_ );
      is_terminated_ = false;
   }

   while ( 1 )
   {
      task_ptr_type tmp_task;

      // Critical section.
      //
      {
         lock_type lock( mutex_ );

         while ( NotQueued_() )
         {
            if ( !task_available_cond_.timed_wait(lock,
                                                  inactivity_time_out_) )
            {
               thread_terminated_ += 1;
               thread_.detach();
               return;
            }
         }
         if ( is_terminated_ )
         {
            thread_terminated_ += 1;
            return;
         }
         tmp_task.swap( task_ );
         task_busy_cond_.notify_one();
      }
      // Execute task.
      //
      executions_ += 1;

      try
      {
         ////std::cout << "execution in progress\n";
         tmp_task->run();
         ////std::cout << "execution done\n";
      }
      catch (const boost::thread_interrupted&)
      {
         thread_terminated_ += 1;
         thread_.detach();
         return;
      }
      catch (...)
      {
         // Unexpected exception, ignore...
      }
   }
}


TaskPool::TaskPool(const size_type Size)
 : max_size_( Size ),
   thread_pool_( 0 ),
   inactivity_time_out_( Dispatcher::defaultActivTm() ),
   watermark_( 0 ),
   invocations_( 0 ),
   executions_( 0 ),
   loops_( 0 )
{
   assert( max_size_ > 0 );
   thread_pool_ = new thread_pool_type( max_size_ );
}


TaskPool::~TaskPool()
{
   delete thread_pool_;
}


void TaskPool::setInactivTm(const time_duration_type& Inactivity_Time_Out)
{
   lock_type lock( mutex_ );

   inactivity_time_out_ = Inactivity_Time_Out;
   assert( inactivity_time_out_ >= Dispatcher::minActivTm() );

   for (thread_pool_type::iterator iter = thread_pool_->begin();
        thread_pool_->end() != iter;
        ++iter)
   {
      dispatcher_ptr_type& p( *iter );

      if ( p )
      {
         p->setInactivTm( inactivity_time_out_ );
      }
   }
}


void TaskPool::execute(const task_ptr_type& Task)
{
   lock_type lock( mutex_ );

   invocations_ += 1;

   const time_duration_type min_iteration_timeout( boost::posix_time::microsec( 100 ) );
   const time_duration_type max_iteration_timeout( boost::posix_time::microsec( 100000 ) );

   time_duration_type timeout( 1 == max_size_ ? time_duration_type( boost::posix_time::pos_infin )
                                              : time_duration_type( boost::posix_time::microsec(0) ) );

   while ( 1 )
   {
      for (thread_pool_type::iterator iter = thread_pool_->begin();
           thread_pool_->end() != iter;
           ++iter)
      {
         dispatcher_ptr_type& p( *iter );

         loops_ += 1;

         if ( !p )
         {
            //std::cout << "new Dispatcher instance\n";
            p.reset( new Dispatcher );
            p->setInactivTm( inactivity_time_out_ );

            watermark_ = iter - thread_pool_->begin();
         }
         if ( p->waitReady( timeout ) )
         {
            p->execute( Task );
            executions_ += 1;
            return;
         }
      }
      if ( timeout != boost::posix_time::pos_infin )
      {
         timeout *= 2;

         timeout = std::max(timeout,
                            min_iteration_timeout);
         timeout = std::min(timeout,
                            max_iteration_timeout);
      }
   }
}


static TaskPool threadPool = 10;

class Wrapper : public Task
{
public:
   Wrapper()
   {
      listener = new Listener;
   }

   virtual void run()
   {
      boost::this_thread::sleep( boost::posix_time::seconds(10) );
      listener->run();
   }

   struct Listener
   {
      std::string s;
      void run()
      {
         s = "Hello";
         //std::cout << s << '\n';
      }
   };

   Listener* listener;
};

struct Executer
{
   std::vector<std::thread> threads;

   void dispatch()
   {
      //std::cout << "dispatch\n";
      for (auto i=0; i<2; ++i)
      {
         threads.push_back(std::move(std::thread([&]()
         {
            int index = 0;
            while (true)
            {
               {
                  //std::cout << "begin\n";
                  boost::shared_ptr<Wrapper> task( new Wrapper );
                  threadPool.execute( task );
                  //std::cout << "end\n";
               }

               if (index % 1000 == 0) boost::this_thread::sleep( boost::posix_time::seconds(5) );
            }
         })));
      }
   }

   ~Executer()
   {
      for (auto i=0; i<2; ++i) threads[i].join();
   }
};

int main()
{
  std::thread t1([](){Executer a; a.dispatch();});
  std::thread t2([](){Executer a; a.dispatch();});
  t1.join();
  t2.join();
}

The shared ptr has use_count_ = 1 weak_count_ = 1. I do not know why weak count is not 0.

Any help how to find the root cause ?


Solution

  • We discussed this likely being down to infinite recursion with Boost Operators in C++20. Then I remarked:

    I'm very surprised about the code in combination with the compiler being c++20... The code smells "legacy" - almost Java-esque - and is just rife with raw pointer abuse and boost back-ports for stuff that has been standardized in c++11? here

    and

    Would you like to see a standard-library only version of your code? That way you can completely forget about Boost compatibility issues. here

    Here is that version: Coliru in 243 lines of code. That's 210 lines fewer than the original, and with fewer smells¹ and without Boost :)

    Note I changed the Task::run interface to take a std::stop_token, because the original code used Boost's non-standard thread-interruption. If you want to emulate the old behavior, you might add throw boost::thread_interrupted from inside the two interruptible_XXX helpers. Of course you would have to handle them at the top-level in your thread as well.

    If the interrupt was only ever used to shutdown the Dispatcher loop, not intended to actually interact with user-supplied Task implementations, then simply remove the stop_token argument :)

    Adding some fancy tracing and limiting run-length (#define SHORT_DEMO), we get

    Live On Coliru

    #include <algorithm>
    #include <atomic>
    #include <cassert>
    #include <condition_variable>
    #include <memory>
    #include <thread>
    #include <utility>
    #include <vector>
    
    #include <iomanip>
    #include <iostream>
    using namespace std::chrono_literals;
    
    namespace { // diagnostics tracing helpers
        auto now = std::chrono::high_resolution_clock::now;
    
        static auto timestamp() {
            static auto start = now();
            return (now() - start) / 1.ms;
        }
    
        static std::atomic_int tid_gen = 0;
        thread_local int       tid     = tid_gen++;
        std::mutex             console_mx;
    
        void trace(auto const&... args) {
            std::lock_guard lk(console_mx);
            std::cout << "T:" << std::setw(2) << tid << std::right << std::setw(10) << timestamp() << "ms ";
            (std::cout << ... << args) << std::endl;
        }
    
        template <typename> struct CtxTracer {
            const std::string_view _ctx;
            int const              id = [] {
                static std::atomic_int idgen = 0;
                return ++idgen;
            }();
            void operator()(auto const&... args) const { ::trace(_ctx, " #", id, "\t", args...); }
        };
    
    #define TRACE_CTX(T) CtxTracer<struct Tag ## T> trace{#T};
    
    } // namespace
    
    namespace {
        // helpers to help replace boost::thread_interrupted with std::stop_token
        template <typename Lockable, typename Duration, typename Predicate>
        bool interruptible_wait_for(std::condition_variable& cv, std::unique_lock<Lockable>& lock,
                                    Duration const& dur, std::stop_token stoken, Predicate pred) {
    
            // see https://stackoverflow.com/a/66309629/85371
            std::stop_callback callback(stoken, [&cv, initial = true, &mx = *lock.mutex()]() mutable {
                if (std::exchange(initial, false)) // while constructing the callback
                    return;                        // avoid dead-lock
                mx.lock();
                mx.unlock();
                cv.notify_all();
            });
    
            cv.wait_for(lock, dur, [&] { return stoken.stop_requested() || pred(); });
            return pred();
        }
    
        template <typename Duration> // returns true if stop requested
        static bool interruptible_sleep_for(Duration const& dur, std::stop_token stoken) {
            std::mutex       mutex_;
            std::unique_lock lk{mutex_};
    #if 1
            std::condition_variable cv;
            interruptible_wait_for(cv, lk, dur, stoken, std::false_type{});
    #else
            // cleaner, but trips up threadsan in many versions
            std::condition_variable_any cv;
            cv.wait_for(lk, stoken, dur, std::false_type{});
    #endif
            return stoken.stop_requested();
        }
    } // namespace
    
    struct Task {
        virtual ~Task()                   = default;
        virtual void run(std::stop_token) = 0;
    };
    
    using mutex_type    = std::mutex;
    using cond_var_type = std::condition_variable;
    using lock_type     = std::unique_lock<mutex_type>;
    using duration_type = std::chrono::steady_clock::duration;
    using task_ptr_type = std::shared_ptr<Task>;
    
    /*
     * Conceptually a single thread that services a queue of tasks, until no task is available for a given idle timeout.
     * The queue depth is 1. That is, at most one task can be queued while at most one task is running on the thread.
     * The idle timeout can be modified during execution
     */
    class Dispatcher {
        TRACE_CTX(Dispatcher)
        Dispatcher(Dispatcher const&)            = delete;
        Dispatcher& operator=(Dispatcher const&) = delete;
    
      public:
        Dispatcher(duration_type t = default_idle_tm) : idle_timeout_(t) {}
    
        void idle_timeout(duration_type t) { idle_timeout_ = min(min_idle_tm, t); }
    
        // fails if queue slot taken and thread busy > timeout
        bool enqueue(duration_type timeout, task_ptr_type Task);
    
        static constexpr duration_type default_idle_tm = 1ms;
        static constexpr duration_type min_idle_tm     = 1ms;
    
      private:
        task_ptr_type pop(duration_type timeout) noexcept;
        void          worker_impl(std::stop_token stoken) noexcept;
    
        //////
        mutable mutex_type mutex_;
        cond_var_type      producers_, consumer_; // SEHE combine and `notify_all`?
        task_ptr_type      queued_;
        std::jthread       worker_; // the consumer thread
    
        //////
        std::atomic<duration_type> idle_timeout_;
        struct { std::atomic<size_t> queued, executed, created, terminated; } disp_stats;
    };
    
    bool Dispatcher::enqueue(duration_type timeout, task_ptr_type aTask) {
        lock_type lock(mutex_);
    
        if (!worker_.joinable()) {
            trace("new thread");
            disp_stats.created += 1;
            worker_ = std::jthread([this](std::stop_token stoken) { worker_impl(stoken); });
        }
    
        if (interruptible_wait_for(producers_, lock, timeout, worker_.get_stop_token(),
                                   [this] { return !queued_; })) {
            queued_.swap(aTask);
            consumer_.notify_one();
            disp_stats.queued += 1;
            return true;
        } else {
            return false;
        }
    }
    
    task_ptr_type Dispatcher::pop(duration_type timeout) noexcept {
        task_ptr_type task;
    
        lock_type lock(mutex_);
        if (interruptible_wait_for(consumer_, lock, timeout, worker_.get_stop_token(), [this] { return !!queued_; })) {
            task.swap(queued_);
            producers_.notify_one();
        }
        return task;
    }
    
    void Dispatcher::worker_impl(std::stop_token stoken) noexcept {
        duration_type cur_timeout;
        while (auto task = pop((cur_timeout = idle_timeout_))) {
            try {
                disp_stats.executed += 1;
                task->run(stoken);
            } catch (...) {
                trace("unhandled exception ignored");
            }
        }
    
        disp_stats.terminated += 1;
        trace("stopped idle thread (after ", cur_timeout / 1ms, "ms)");
    }
    
    class TaskPool {
        TRACE_CTX(TaskPool)
        TaskPool(TaskPool const&)            = delete; // noncopyable
        TaskPool& operator=(TaskPool const&) = delete; // noncopyable
    
      public:
        using dispatcher_t  = std::shared_ptr<Dispatcher>;
        using dispatchers_t = std::vector<dispatcher_t>;
    
        TaskPool(size_t capacity);
    
        size_t        maxSize() const;
        size_t        watermark() const { return tp_stats.watermark; }
        duration_type idle_timeout() const { return idle_timeout_; }
        void          idle_timeout(duration_type t);
    
        void execute(task_ptr_type const& Task);
    
      private:
        mutable mutex_type mutex_;
        dispatchers_t      dispatchers_;
        duration_type      peak_backoff_;
    
        std::atomic<duration_type> idle_timeout_ = Dispatcher::default_idle_tm;
        struct { std::atomic<size_t> watermark, invocations, executions, scans; } tp_stats;
    };
    
    TaskPool::TaskPool(size_t capacity) : dispatchers_(capacity) { assert(capacity); }
    
    void TaskPool::idle_timeout(duration_type t) {
        assert(t >= Dispatcher::min_idle_tm);
        idle_timeout_ = t;
    
        for (dispatcher_t const& p : dispatchers_)
            if (p)
                p->idle_timeout(t);
    }
    
    void TaskPool::execute(task_ptr_type const& Task) {
        lock_type lock(mutex_);
    
        bool const single = dispatchers_.size() == 1;
        tp_stats.invocations += 1;
    
        constexpr duration_type min = 100ms, max = 100s;
        for (duration_type w = !single ? 0s : 100s; /*true*/; w = clamp(w * 2, min, max)) {
            if (w > peak_backoff_) {
                trace("new peak backoff interval ", w / 1.0s);
                peak_backoff_ = w;
            }
    
            for (dispatcher_t& p : dispatchers_) {
                tp_stats.scans += 1;
    
                if (!p) {
                    p = std::make_shared<Dispatcher>(idle_timeout_);
                    tp_stats.watermark = &p - dispatchers_.data();
                    trace("new Dispatcher (watermark ", tp_stats.watermark, ")");
                }
    
                if (p->enqueue(w, Task)) {
                    tp_stats.executions += 1;
                    return;
                }
            }
        }
    }
    
    size_t TaskPool::maxSize() const {
        lock_type lock(mutex_);
        return dispatchers_.size();
    }
    
    struct Wrapper : Task {
        virtual void run(std::stop_token stoken) override {
            if (!interruptible_sleep_for(10s, stoken))
                listener.run();
        }
    
        struct Listener {
            TRACE_CTX(Listener)
            void run() { trace("Hello"); }
        };
    
        Listener listener;
    };
    
    static void Demo(TaskPool& pool) {
        TRACE_CTX(Demo)
    
        std::stop_source stop;
    
        // emulated application logic that produces tasks
        auto app_logic = [&pool, stoken = stop.get_token()] {
            TRACE_CTX(app_logic)
            for (unsigned index = 0; !stoken.stop_requested(); ++index) {
                auto s = now();
                pool.execute(std::make_shared<Wrapper>());
                trace("index:", index, " enqueued in ", (now() - s) / 1.s, "s");
    
                if (index % 20 == 0) {
                    trace("taking a break from producing tasks");
                    std::this_thread::sleep_for(5s);
                }
            }
            trace("exit app_logic");
        };
    
        trace("start");
        std::vector<std::thread> threads;
        threads.emplace_back(app_logic);
        threads.emplace_back(app_logic);
    
    #ifdef SHORT_DEMO
        std::this_thread::sleep_for(10s); // (2.5min);
        trace("Requesting shutdown for SHORT_DEMO");
        stop.request_stop();
    #endif
    
        trace("joining app_logic threads");
        for (auto& th : threads)
            th.join();
        trace("joined app_logic threads");
    }
    
    int main() {
        TRACE_CTX(Main);
    
        std::cout << std::setprecision(2) << std::fixed;
        trace("main");
    
        {
            TaskPool threadPool{10};
    
            std::thread t1(Demo, std::ref(threadPool));
            std::thread t2(Demo, std::ref(threadPool));
    
            trace("joining t1..."); t1.join();
            trace("joining t2..."); t2.join();
            trace("awaiting task pool");
        }
    
        trace("bye");
    }
    

    With output like

    g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -DSHORT_DEMO
    ./a.out
    T: 0      0.00ms Main #1    main
    T: 0      0.17ms Main #1    joining t1...
    T: 1      0.22ms Demo #1    start
    T: 2      0.27ms Demo #2    start
    T: 3      0.48ms TaskPool #1    new Dispatcher (watermark 0)
    T: 3      0.50ms Dispatcher #1  new thread
    T: 3      0.67ms app_logic #1   index:0 enqueued in 0.00s
    T: 3      0.69ms app_logic #1   taking a break from producing tasks
    T: 4      0.72ms app_logic #2   index:0 enqueued in 0.00s
    T: 4      0.73ms app_logic #2   taking a break from producing tasks
    T: 5      0.88ms TaskPool #1    new Dispatcher (watermark 1)
    T: 5      0.90ms Dispatcher #2  new thread
    T: 5      0.97ms app_logic #3   index:0 enqueued in 0.00s
    T: 5      0.99ms app_logic #3   taking a break from producing tasks
    T: 6      1.17ms app_logic #4   index:0 enqueued in 0.00s
    T: 6      1.19ms app_logic #4   taking a break from producing tasks
    T: 4   5001.26ms TaskPool #1    new Dispatcher (watermark 2)
    T: 4   5001.33ms Dispatcher #3  new thread
    T: 4   5001.47ms app_logic #2   index:1 enqueued in 0.00s
    T: 3   5001.83ms app_logic #1   index:1 enqueued in 0.00s
    T: 5   5002.37ms TaskPool #1    new Dispatcher (watermark 3)
    T: 5   5002.42ms Dispatcher #4  new thread
    T: 5   5002.54ms app_logic #3   index:1 enqueued in 0.00s
    T: 5   5003.07ms app_logic #3   index:2 enqueued in 0.00s
    T: 4   5003.76ms TaskPool #1    new Dispatcher (watermark 4)
    T: 4   5003.77ms Dispatcher #5  new thread
    T: 4   5003.84ms app_logic #2   index:2 enqueued in 0.00s
    T: 3   5004.55ms app_logic #1   index:2 enqueued in 0.00s
    T: 6   5005.41ms TaskPool #1    new Dispatcher (watermark 5)
    T: 6   5005.43ms Dispatcher #6  new thread
    T: 6   5005.51ms app_logic #4   index:1 enqueued in 0.00s
    T: 6   5006.37ms app_logic #4   index:2 enqueued in 0.00s
    T: 4   5007.44ms TaskPool #1    new Dispatcher (watermark 6)
    T: 4   5007.46ms Dispatcher #7  new thread
    T: 4   5007.56ms app_logic #2   index:3 enqueued in 0.00s
    T: 3   5008.58ms app_logic #1   index:3 enqueued in 0.00s
    T: 5   5009.75ms TaskPool #1    new Dispatcher (watermark 7)
    T: 5   5009.77ms Dispatcher #8  new thread
    T: 5   5009.86ms app_logic #3   index:3 enqueued in 0.01s
    T: 6   5011.04ms app_logic #4   index:3 enqueued in 0.00s
    T: 4   5012.41ms TaskPool #1    new Dispatcher (watermark 8)
    T: 4   5012.43ms Dispatcher #9  new thread
    T: 4   5012.51ms app_logic #2   index:4 enqueued in 0.00s
    T: 3   5013.85ms app_logic #1   index:4 enqueued in 0.01s
    T: 5   5015.36ms TaskPool #1    new Dispatcher (watermark 9)
    T: 5   5015.38ms Dispatcher #10 new thread
    T: 5   5015.46ms app_logic #3   index:4 enqueued in 0.01s
    T: 6   5016.97ms app_logic #4   index:4 enqueued in 0.01s
    T: 6   5018.64ms TaskPool #1    new peak backoff interval 0.10
    T: 6   6020.28ms TaskPool #1    new peak backoff interval 0.20
    T: 6   8022.03ms TaskPool #1    new peak backoff interval 0.40
    T: 1  10000.67ms Demo #1    Requesting shutdown for SHORT_DEMO
    T: 1  10000.76ms Demo #1    joining app_logic threads
    T: 2  10000.81ms Demo #2    Requesting shutdown for SHORT_DEMO
    T: 2  10000.84ms Demo #2    joining app_logic threads
    T: 7  10000.87ms Listener #1    Hello
    T: 8  10001.11ms Listener #3    Hello
    T: 6  12023.81ms TaskPool #1    new peak backoff interval 0.80
    T: 6  12023.89ms app_logic #4   index:5 enqueued in 7.01s
    T: 6  12023.91ms app_logic #4   exit app_logic
    T: 3  12024.14ms app_logic #1   index:5 enqueued in 7.01s
    T: 3  12024.19ms app_logic #1   exit app_logic
    T: 9  15001.65ms Listener #6    Hello
    T:10  15002.69ms Listener #7    Hello
    T:11  15015.13ms Listener #9    Hello
    T:12  15015.17ms Listener #8    Hello
    T:13  15015.24ms Listener #13   Hello
    T:14  15015.29ms Listener #12   Hello
    T:15  15015.33ms Listener #17   Hello
    T:16  15015.59ms Listener #19   Hello
    T: 5  15015.65ms app_logic #3   index:5 enqueued in 10.00s
    T: 5  15015.67ms app_logic #3   exit app_logic
    T: 1  15015.73ms Demo #1    joined app_logic threads
    T: 0  15015.80ms Main #1    joining t2...
    T: 4  15016.00ms app_logic #2   index:5 enqueued in 10.00s
    T: 4  15016.02ms app_logic #2   exit app_logic
    T: 2  15016.11ms Demo #2    joined app_logic threads
    T: 0  15016.20ms Main #1    awaiting task pool
    T: 7  20001.13ms Dispatcher #1  stopped idle thread (after 1ms)
    T: 8  20001.31ms Listener #4    Hello
    T: 8  20013.48ms Dispatcher #2  stopped idle thread (after 1ms)
    T: 9  25001.90ms Dispatcher #3  stopped idle thread (after 1ms)
    T:10  25015.25ms Dispatcher #4  stopped idle thread (after 1ms)
    T:11  25017.66ms Listener #10   Hello
    T:12  25017.71ms Listener #15   Hello
    T:13  25017.76ms Listener #14   Hello
    T:14  25017.79ms Listener #16   Hello
    T:15  25017.84ms Listener #18   Hello
    T:16  25017.89ms Listener #20   Hello
    T:11  25018.81ms Dispatcher #5  stopped idle thread (after 1ms)
    T:13  25018.84ms Dispatcher #7  stopped idle thread (after 1ms)
    T:12  25018.88ms Dispatcher #6  stopped idle thread (after 1ms)
    T:14  25018.94ms Dispatcher #8  stopped idle thread (after 1ms)
    T:15  25019.06ms Dispatcher #9  stopped idle thread (after 1ms)
    T:16  35018.10ms Dispatcher #10 stopped idle thread (after 1ms)
    T: 0  35018.30ms Main #1    bye
    

    Design Questions

    I see a number of issues with the design even after the improvements

    Given all this, I'm struggling in what scenario this approach could be better in any respect than a more classical task-queue shared with identical worker threads (which never terminate because they don't consume resources when idle anyways):

    Alternative Design

    This is the alternative design, dimensioned to the same capacity:

    live On Coliru

    #define SHORT_DEMO
    #include <atomic>
    #include <cassert>
    #include <condition_variable>
    #include <deque>
    #include <iomanip>
    #include <iostream>
    #include <thread>
    #include <utility>
    using namespace std::chrono_literals;
    
    namespace { // diagnostics tracing helpers
        auto now = std::chrono::high_resolution_clock::now;
        static auto timestamp() {
            static auto start = now();
            return (now() - start) / 1.ms;
        }
    
        static std::atomic_int tid_gen = 0;
        thread_local int       tid     = tid_gen++;
        std::mutex             console_mx;
    
        void trace(auto const&... args) {
            std::lock_guard lk(console_mx);
            std::cout << "T:" << std::setw(2) << tid << std::right << std::setw(10) << timestamp() << "ms ";
            (std::cout << ... << args) << std::endl;
        }
    
        template <typename> struct CtxTracer {
            const std::string_view _ctx;
            int const              id = [] {
                static std::atomic_int idgen = 0;
                return ++idgen;
            }();
            void operator()(auto const&... args) const { ::trace(_ctx, " #", id, "\t", args...); }
        };
    
        #define TRACE_CTX(T) CtxTracer<struct Tag ## T> trace{#T};
    } // namespace
    
    namespace {
        // helpers to help replace boost::thread_interrupted with std::stop_token
        template <typename Lockable, typename Duration, typename Predicate>
        bool interruptible_wait_for(std::condition_variable& cv, std::unique_lock<Lockable>& lock,
                                    Duration const& dur, std::stop_token stoken, Predicate pred) {
    
            // see https://stackoverflow.com/a/66309629/85371
            std::stop_callback callback(stoken, [&cv, initial = true, &mx = *lock.mutex()]() mutable {
                if (std::exchange(initial, false)) // while constructing the callback
                    return;                        // avoid dead-lock
                mx.lock();
                mx.unlock();
                cv.notify_all();
            });
    
            cv.wait_for(lock, dur, [&] { return stoken.stop_requested() || pred(); });
            return pred();
        }
    
        template <typename Duration> // returns true if stop requested
        static bool interruptible_sleep_for(Duration const& dur, std::stop_token stoken) {
            std::mutex              mutex_;
            std::unique_lock lk{mutex_};
            std::condition_variable cv;
            interruptible_wait_for(cv, lk, dur, stoken, std::false_type{});
    
            return stoken.stop_requested();
        }
    } // namespace
    
    struct Task {
        virtual ~Task()                   = default;
        virtual void run(std::stop_token) = 0;
    };
    class TaskPool {
        TRACE_CTX(TaskPool)
        static constexpr std::chrono::steady_clock::duration externity = 999'999h; // duration::max() gives overflows in some implementations
    
      public:
        using task_ptr = std::shared_ptr<Task>;
    
        TaskPool(size_t capacity);
        ~TaskPool() noexcept;
    
        size_t maxSize()   const { return capacity_;  }
        size_t watermark() const { return watermark_; }
    
        void execute(task_ptr Task);
    
      private:
        mutable std::mutex      mutex_;
        std::condition_variable producers_, consumers_; // SEHE combine and `notify_all`?
    
        size_t const            capacity_;
        std::stop_source        stop_source_;
        std::deque<std::thread> workers_; // workers
        std::deque<task_ptr>    queue_;
    
        // former Dispatcher implementation
        task_ptr pop() noexcept;
        void     worker_impl(std::stop_token stoken) noexcept;
        size_t watermark_ = 0, invocations_ = 0, executed_ = 0;
    };
    
    TaskPool::TaskPool(size_t capacity) : capacity_(capacity) {
        assert(capacity);
        while (capacity--) // assuming same number of workers as queue capacity, for comparability with old design
            workers_.emplace_back(&TaskPool::worker_impl, this, stop_source_.get_token());
    }
    
    TaskPool::~TaskPool() noexcept {
        stop_source_.request_stop();
        for (auto& w : workers_)
            if (w.joinable())
                w.join();
    }
    
    void TaskPool::execute(task_ptr task) {
        std::unique_lock lock(mutex_);
        if (interruptible_wait_for(producers_, lock, externity, stop_source_.get_token(),
                                     [this] { return queue_.size() < capacity_; })) {
            queue_.push_back(std::move(task));
            consumers_.notify_one();
    
            invocations_ += 1;
            watermark_ = std::max(watermark_, queue_.size());
        } // else: stop was requested
    }
    
    TaskPool::task_ptr TaskPool::pop() noexcept {
        task_ptr task;
        std::unique_lock lock(mutex_);
        if (interruptible_wait_for(consumers_, lock, externity, stop_source_.get_token(),
                                   [this] { return !queue_.empty(); })) {
            task.swap(queue_.front());
            queue_.pop_front();
            producers_.notify_one();
        }
        return task;
    }
    
    void TaskPool::worker_impl(std::stop_token stoken) noexcept {
        while (auto task = pop())
            try {
                executed_ += 1;
                task->run(stoken);
            } catch (...) { trace("unhandled exception ignored"); }
        trace("worker exit");
    }
    
    struct Wrapper : Task {
        virtual void run(std::stop_token stoken) override {
            if (!interruptible_sleep_for(10s, stoken))
                listener.run();
        }
    
        struct Listener {
            TRACE_CTX(Listener)
            void run() { trace("Hello"); }
        };
        Listener listener;
    };
    
    static void Demo(TaskPool& pool) {
        TRACE_CTX(Demo)
        std::stop_source stop;
    
        // emulated application logic that produces tasks
        auto app_logic = [&pool, stoken = stop.get_token()] {
            TRACE_CTX(app_logic)
            for (unsigned index = 0; !stoken.stop_requested(); ++index) {
                auto s = now();
                pool.execute(std::make_shared<Wrapper>());
                trace("index:", index, " enqueued in ", (now() - s) / 1.s, "s");
    
                if (index % 20 == 0) {
                    trace("taking a break from producing tasks");
                    std::this_thread::sleep_for(5s);
                }
            }
            trace("exit app_logic");
        };
    
        trace("start");
        std::deque<std::thread> threads;
        threads.emplace_back(app_logic);
        threads.emplace_back(app_logic);
    
    #ifdef SHORT_DEMO
        std::this_thread::sleep_for(10s); // (2.5min);
        trace("Requesting shutdown for SHORT_DEMO");
        stop.request_stop();
    #endif
    
        trace("joining app_logic threads");
        for (auto& th : threads)
            th.join();
        trace("joined app_logic threads");
    }
    
    int main() {
        TRACE_CTX(Main);
    
        std::cout << std::setprecision(2) << std::fixed;
        trace("main");
    
        {
            TaskPool threadPool{10};
    
            std::thread t1(Demo, std::ref(threadPool));
            std::thread t2(Demo, std::ref(threadPool));
    
            trace("joining t1..."); t1.join();
            trace("joining t2..."); t2.join();
            trace("awaiting task pool");
        }
    
        trace("bye");
    }
    

    Note that is completes a full 10s earlier, despite generating the same amount of work with the same spacing, and having identical number of workers and queue capacity. We lost an entire type (Dispatcher) and a lot of complexity.

    Conclusion / Summary

    I may have suffered from a lack of imagination when thinking of loads that benefit from the specific queuing semantics exhibited by the original design. However, I listed a fair number of objective problems. Also, if the design was intentional, I feel there was at least a lack of clear naming and (self)documentation.

    Regardless I hope the two approaches help you along. Compare the behaviors and choose what's best for you.


    ¹ (too many classes not pulling their weight, conflated classes (Runner and Dispatcher are conjoined twins), unnecessary use of raw pointers, volatile and const_cast abuse...).