I have a situation where N threads are working on a data structure simultaneously in small incremental steps. However, sometimes a synchronized action needs to take place.
So all threads need to halt, wait for one of the threads to perform this operation, and continue. I'm looking for a method that has as little as possible impact on the threads when the synchronized action is NOT required.
A simple option is using a shared_mutex
, but I think an option with lower overhead is possible. I've attempted my own solution using a barrier and an atomic below.
So my questions are: Is this an effective solution for the problem? Is there a better solution?
#include <vector>
#include <thread>
#include <atomic>
#include <barrier>
int main()
{
const size_t nr_threads = 10;
std::vector<std::thread> threads;
std::barrier barrier { nr_threads };
std::atomic_bool sync_required { false };
auto rare_condition = []() { return std::rand() == 42; };
for (int i = 0; i < nr_threads; ++i)
{
threads.emplace_back([&, i]()
{
while (true)
{
if (sync_required)
{
if (i == 0)
{
barrier.arrive_and_wait();
sync_required = false;
// solo synchronized work
barrier.arrive_and_wait();
}
else
{
barrier.arrive_and_wait();
barrier.arrive_and_wait();
}
}
// standard loop body ...
// sometimes a global synchronized action is required
if (rare_condition()) sync_required = true;
}
});
}
// eventually ... treads quit
for (auto& thread : threads)
{
thread.join();
}
}
Another solution with the shared_mutex needs a condition variable?
#include <array>
#include <atomic>
#include <thread>
#include <vector>
#include <barrier>
#include <shared_mutex>
int main()
{
const size_t nr_threads = 10;
std::vector<std::thread> threads;
std::shared_mutex sync_mtx;
std::atomic_bool sync_required { false };
auto rare_condition = []() { return std::rand() == 42; };
for (int i = 0; i < nr_threads; ++i)
{
threads.emplace_back([&, i]()
{
std::shared_lock shared_lock { sync_mtx };
while (true)
{
// very rarely another thread requires all the others to stop for a bit
if (sync_required)
{
if (i == 0)
{
// unlock shared, but lock unique, seems a little odd but neccesary
shared_lock.unlock();
{
std::unique_lock unique_lock{ shared_lock };
sync_required = false;
// solo sync work
}
shared_lock.lock();
}
else
{
shared_lock.unlock();
// todo: need condition variable, which adds more complexity to this solution?
shared_lock.lock();
}
}
// sometimes a global syncronized action is required
sync_required = sync_required || rare_condition();
}
});
}
// eventually ... treads quit
for (auto& thread : threads)
{
thread.join();
}
}
The major optimization I'll propose to your code is to replace the following
// Thread: if (sync_required) {
if (i == 0) { // An arbitrary thread is choosen to run sync().
// It has to wait until other threads aknowledges having seen sync_required
barrier.arrive_and_wait();
sync();
By something like this
// In main:
std::atomic<int> unpaused_count{ nr_threads }; // number of threads that haven't noticed sync_required
// Thread: if (sync_required) {
int old_unpaused_count = unpaused_count.fetch_sub(1); // This thread just noticed sync_required, update the count
if (old_unpaused_count == 1) { // The thread noticing last is choosen to run sync()
// It can run sync() immediately, since it knows that all other
// threads have seen sync_required, and have decided to wait.
sync();
// TODO: restore unpaused_count & resume other threads.
} else { // This thread isn't the last to notice: wait.
If the targeted platform has a wait-free std::atomic<int>::fetch_sub
(hopefully the case with all standard libraries on x64), this code can now choose a thread to run sync()
in a wait-free manner, and the chosen thread starts running sync()
immediately. This should be much better than a lock/barrier,
if I didn't mess that bit of lock-free code (that's a big IF).
My second suggestion is to use std::counting_semaphore
to let the sync()
thread notify the waiting threads that the sync period is over. The use case is well described on cppreference:
Semaphores are also often used for the semantics of signaling/notifying rather than mutual exclusion, by initializing the semaphore with 0 and thus blocking the receiver(s) that try to acquire(), until the notifier "signals" by invoking release(n). In this respect semaphores can be considered alternatives to std::condition_variables, often with better performance.
With the two proposed optimizations, the thread chosen to run sync()
never needs to acquire a lock, or wait on some barrier/condition variable. This is highly desirable: the faster it finishes its sync()
section, the faster the whole system can restart.
Finally, full code (godbolt):
#include <atomic>
#include <limits>
#include <semaphore>
#include <thread>
#include <vector>
using ThreadCount = int;
static constexpr auto maxThreadCount = std::numeric_limits<ThreadCount>::max();
int main() {
const ThreadCount nr_threads = 10;
std::vector<std::thread> threads;
struct SharedVars {
std::counting_semaphore<maxThreadCount> end_of_sync{ 0 };
std::atomic<bool> sync_required{ false };
std::atomic<ThreadCount> unpaused_count{ nr_threads };
};
SharedVars shared;
auto rare_condition = []() { return std::rand() == 42; };
for (ThreadCount i = 0; i < nr_threads; ++i) {
threads.emplace_back([&shared, rare_condition]() {
while (true) {
if (shared.sync_required) {
ThreadCount old_unpaused_count = shared.unpaused_count.fetch_sub(1);
if (old_unpaused_count == 1) {
// SYNC section here
shared.unpaused_count.store(nr_threads);
shared.sync_required.store(false);
shared.end_of_sync.release(nr_threads - 1);
} else {
shared.end_of_sync.acquire();
}
}
// standard loop body ...
if (rare_condition()) {
shared.sync_required = true;
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
}
It is certainly possible to use a weaker memory order than SEQ_CST
on some of the atomic operations. Reasoning about weaker memory orders is far above my skill, so I'm leaving it this way.