I'm trying to understand the new c++20 concurrency features. I use a std::barrier
in a threadpool implementation but it deadlocks. What could be the problem with the following threadpool implementation?
The main thread is blocked when the destructor of the threadpool is called. I have a guess why that might be but I am not certain and wonder whether I could rewrite the program slightly and still rely on a std::barrier
A reproducible example is as follows:
#include <stop_token>
#include <atomic>
#include <barrier>
#include <functional>
#include <stop_token>
#include <thread>
#include <vector>
class ThreadPool {
public:
friend void ThreadMain(std::stop_token, ThreadPool*);
using function = void(void* context, size_t idx);
explicit ThreadPool(size_t num_threads);
~ThreadPool();
void QueueTask(function*, void* context, int range);
private:
std::barrier<std::function<void()>> sync_point;
std::atomic_flag start{false};
std::atomic<int> idx{0};
function* task{nullptr};
void* ctx{nullptr};
std::vector<std::jthread> threads;
};
void ThreadMain(std::stop_token stoken, ThreadPool* pool) {
while (!stoken.stop_requested()) {
pool->start.wait(false);
if (stoken.stop_requested()) {
return;
}
int i{0};
while ((i = pool->idx.fetch_sub(1) - 1) > -1) {
(*(pool->task))(pool->ctx, i);
}
pool->sync_point.arrive_and_wait();
}
}
ThreadPool::ThreadPool(size_t num_threads)
: sync_point(num_threads, [&]() noexcept { this->start.clear(); }),
threads(num_threads - 1) {
for (uint32_t ii = 1; ii < num_threads; ++ii) {
threads[ii - 1] = std::jthread(ThreadMain, this);
}
}
void ThreadPool::QueueTask(ThreadPool::function* function, void* context,
int r) {
task = function;
ctx = context;
idx.store(r);
start.test_and_set();
start.notify_all();
int i{0};
while ((i = idx.fetch_sub(1) - 1) > -1) {
(*(function))(context, i);
}
sync_point.arrive_and_wait();
}
ThreadPool::~ThreadPool() {
start.test_and_set();
start.notify_all();
for (auto& t : threads) {
t.request_stop();
t.join();
}
}
void function(void* context, size_t idx) {
return;
}
int main() {
ThreadPool pool(2);
for (size_t i = 0; i < 10; ++i) {
pool.QueueTask(function, nullptr, 100);
}
}
The example can be compiled with ggc 13.2.0 with the c++20 flag, as follows: g++ main.cpp -O0 -g --std=c++20
.
Tasks in the threadpool are added with the QueueTask
function. The main thread sets the work and context info for all threads and notifies them of new work. All threads (including the main thread) share the work by picking elements of the task's arguments. When all arguments have been sliced up, they're using a std::barrier
for coordination.
When I debug the threadpool with gdb, I see that the program hangs at the threadpool's destructor. With two threads, the main thread waits joining the other thread, while the other thread waits at the barrier:
pool->sync_point.arrive_and_wait();
c++reference says about the arrive_and_wait
function:
atomically decrements the expected count by 1, then blocks at the synchronization point for the current phase until the phase completion step of the current phase is run. [emphasis mine].
Notably, cppreference does not say that the threads are waiting until the completion function was run I use the completion function to set the start flag again to false. Without that, some threads will continue to pick up work and then wait on the barrier again.
Can I somehow salvage the example above while still using std::barrier
and only using minimal modifications?
There was an error in the destructor with a wrong ordering of events. The desctructor should be written as:
ThreadPool::~ThreadPool() {
for (auto& t : threads) {
t.request_stop();
}
start.test_and_set();
start.notify_all();
}
All in all, I've been pretty satisfied with both the conciseness and the performance brought by std::barrier
.