I came across this code for a simple implementation of a barrier (for code that can't use std::experimental::barrier
in C++17 or std::barrier
in C++20) in C++ Concurrency in Action book.
[Edit] A barrier is a synchronization mechanism where a group of threads (the number of threads is passed to the constructor of the barrier) can arrive at and wait (by calling wait method) or arrive at and drop off (by calling done_waiting). If all the threads in the group arrive at the barrier, then the barrier is reset and the threads can proceed with the next set of actions. If some threads in the group drops off, then the number of threads in the group is reduced accordingly for thenext round of synchronization with the barrier. [End of Edit]
Here is the code provided for a simple implementation of a barrier.
struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):count(count_),spaces(count_),generation(0)
{}
void wait(){
unsigned const gen=generation.load();
if(!--spaces){
spaces=count.load();
++generation;
}else{
while(generation.load()==gen){
std::this_thread::yield();
}
}
}
void done_waiting(){
--count;
if(!--spaces){
spaces=count.load();
++generation;
}
}
};
The author, Anthony Williams, mentions that he chose sequential consistency ordering to make it easier to reason about the code and said that relaxed ordering could be used to make the code more efficient. This is how I changed the code to employ relaxed ordering. Please help me understand if my code is right.
struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):count(count_),spaces(count_),generation(0)
{}
void wait(){
unsigned const gen=generation.load(std::memory_order_acquire);
if(1 == spaces.fetch_sub(1, std::memory_order_relaxed)){
spaces=count.load(std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}else{
while(generation.load(std::memory_order_relaxed)==gen){
std::this_thread::yield();
}
}
}
void done_waiting(){
count.fetch_sub(1, std::memory_order_relaxed);
if(1 == spaces.fetch_sub(1, std::memory_order_relaxed)){
spaces=count.load(std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}
}
};
The reasoning is this. The increment of generation is a release operation that synchronizes with the loading of generation in the wait call. This ensures that the load from count into spaces is visible to all the threads that call wait and read the new value of generation that was stored with release semantics.
All the operations on spaces thereafter are RMW operations which take part in a release sequence and hence can be relaxed operations. Is this reasoning correct or is this code wrong? Please help me understand. Thanks in advance.
[Edit]I tried using my barrier code like this.
void fun(barrier* b){
std::cout << "In Thread " << std::this_thread::get_id() << std::endl;
b->wait();
std::cout << std::this_thread::get_id() << " First wait done" << std::endl;
b->wait();
std::cout << std::this_thread::get_id() << " Second wait done" << std::endl;
b->done_waiting();
}
int main(){
barrier b{2};
std::thread t(fun, &b);
fun(&b);
std::cout << std::this_thread::get_id() << " " << b.get_count() << std::endl;
t.join();
}
I also tried testing it with many more threads and in superficial runs, it seems to be doing the right thing. But I would still like to understand if my reasoning is correct or if I am missing something very obvious.[End of Edit]
I believe the following version of the code has the weakest orderings that still make it correct.
struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):count(count_),spaces(count_),generation(0)
{}
void wait(){
unsigned const gen=generation.load(std::memory_order_relaxed);
if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
spaces.store(count.load(std::memory_order_relaxed),
std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}else{
while(generation.load(std::memory_order_acquire)==gen){
std::this_thread::yield();
}
}
}
void done_waiting(){
count.fetch_sub(1, std::memory_order_relaxed);
if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
spaces.store(count.load(std::memory_order_relaxed),
std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}
}
};
Throughout this discussion, when we speak of the "last" thread to reach the barrier in
a particular generation, we mean the unique thread that sees
spaces.fetch_sub(1)
return the value 1.
In general, if a thread is making a store that promises some operation has completed, it needs to be a release store. When another thread loads that value to get proof that the operation is complete, and it is safe to use the results, then that load needs to be acquire.
Now in the code at hand, the way a thread indicates that it is done with its "regular
work" (whatever was sequenced before the call to wait()
or
done_waiting()
) is by decrementing spaces
, i.e. storing a value
that is 1 less than the previous value in the modification order. For
a non-last thread, that's the only store that it does. So
that store, namely spaces.fetch_sub()
, has to be release.
For the last thread, the way that it knows it is last is by loading
the value 1 in spaces.fetch_sub()
. This serves as proof that all
other threads have finished their regular work, and that the last
thread can safely drop the barrier. So spaces.fetch_sub()
needs to
be acquire as well. Hence spaces.fetch_sub(1, std::memory_order_acq_rel)
.
The way that the non-last threads determine that the barrier is down and
they can safely proceed is by loading a value from generation
in
the yield loop, and observing that it is different from gen
. So
that load needs to be acquire; and the store it observes, namely
generation.fetch_add()
, needs to be release.
I claim that's all the barriers we need. The work done by the last
thread to update spaces
from count
is effectively in its own
little critical section, begun with the acquire load of 1 from
spaces
and ended with the release increment of generation
. At
this point, all other threads have already loaded and stored spaces
,
every thread that called wait
has already loaded the old value of
generation
, and every thread that called done_waiting()
has
already stored its new decremented value for count
. So the last
thread can safely manipulate them with relaxed ordering, knowing that
no other thread will do so.
And now the initial load gen = generation.load()
in wait()
does not
need to be acquire. It can't be pushed down past the store to
spaces
, because the latter is release. So it is sure to be safely
loaded before the value 1 is stored to spaces
, and only after that
could a potential last
thread update generation
.
Now let's try to give a formal proof that this new version is correct. Look at two threads, A and B, which do
barrier b(N); // N >= 2
void thrA() {
work_A();
b.wait(); // or b.done_waiting(), doesn't matter which
}
void thrB() {
b.wait();
work_B();
}
There are N-2
other threads, each of which calls either b.wait()
or b.done_waiting()
. For simplicity, let's suppose we begin at
generation 0.
We want to prove that work_A()
happens before work_B()
, so that
they can be operating on the same data (conflicting) without a data
race. Consider two cases depending on whether or not B is the last
thread to reach the barrier.
Suppose B is the last thread. That means it did an acquire load of the
value 1 from spaces
. Thread A must necessarily have release-stored some
value >= 1, which was then successively decremented by atomic RMW
operations (in other threads) until it reached 1. That is a release
sequence headed by A's store, and B's load takes its value from the
last side effect in that sequence, so A's store synchronizes with B's
load. By sequencing, work_A()
happens before A's store, and B's
load happens before work_B()
, hence work_A()
happens before
work_B()
as desired.
Now suppose B is not the last thread. Then it returns from wait()
only when it loads from generation
a value different from gen
.
Let L denote the actual last thread, which could potentially be A.
I claim, as a first step, that gen
in B must be 0. For the load of
generation
into gen
happens before B's release store in spaces.fetch_sub()
, and as noted above, this heads a release sequence which
eventually stores the value 1 (in the second-to-last thread). The
load of spaces
in L takes its value from that side effect, so B's
store to spaces
synchronizes with L's load. B's load of
generation
happens before its store to spaces
, and L's load of
spaces
happens before its store (fetch_add()
) to generation
. So
B's load of generation
happens before L's store. By read-write
coherence [intro.races p17], B's load of generation
must not take its value from L's store, but instead from
some earlier value in the modification order. This must
necessarily be 0 as there are no other modifications to generation
.
(If we were working at generation G instead of 0, this would prove only that gen <= G
. But as I explain below, all these loads happen after the previous increment of generation
, which is where the value G was stored. So that proves the opposite inequality gen >= G
.)
So B returns from wait()
only when it loads 1 from generation
.
Thus, that final acquire load has taken its value from the release
store to generation
done by L, showing that L's store happens before
B's return. Now L's load of 1 from spaces
happens before its store
to generation
(by sequencing), and A's store to spaces
happens
before L's load of 1 as proved earlier. (If L = A then the same
conclusion still holds, by sequencing.) We now have the following
operations totally ordered by happens-before:
A: work_A();
A: store side of spaces.fetch_sub()
L: load of 1 in spaces.fetch_sub()
L: store side of generation.fetch_add()
B: acquire load of 1 from generation
B: work_B()
and have the desired conclusion by transitivity. (If L=A then delete lines 2 and 3 above.)
We could similarly prove that all the decrements of count
in the
various calls to done_waiting()
happen before L's load of count
to
store it into spaces
, thus those can safely be relaxed. The
re-initialization of spaces
in L, and the increment of generation
,
both happen before any thread returns from wait()
, so even if those
are relaxed, any barrier operations sequenced afterwards will see the
barrier properly reset.
I think that covers all the desired semantics.
We could actually weaken things a little more by using fences. For
instance, the acq
in spaces.fetch_sub()
was solely for the benefit
of thread L which loads the value 1; other threads didn't need it. So
we could do instead
if (1 == spaces.fetch_sub(1, std::memory_order_release)){
std::atomic_thread_fence(std::memory_order_acquire);
// ...
}
and then only thread L needs to pay the cost of the acquire. Not that it really matters much, since all other threads are going to sleep anyway and so we're unlikely to care if they are slow.
(This section was written earlier, before I made the modified version above.)
I believe there are at least two bugs.
Let's analyze wait()
by itself. Consider code that does:
int x = 0;
barrier b(2);
void thrA() {
x = 1;
b.wait();
}
void thrB() {
b.wait();
std::cout << x << std::endl;
}
We wish to prove that x = 1
in thrA happens before the evaluation of x
in thrB, so that the code would have no data race and be forced to print the value 1
.
But I don't think we can. Let's suppose thrB reaches the barrier first, which is to say that it observes spaces.fetch_sub
returning 2. So the loads and stores performed in each thread are sequenced as follows:
thrA:
x = 1;
gen=generation.load(std::memory_order_acquire); // returns 0
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 1, stores 0
spaces=count.load(std::memory_order_relaxed); // stores 2
generation.fetch_add(1, std::memory_order_release); // returns 0, stores 1
thrB:
gen=generation.load(std::memory_order_acquire); // returns 0
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 2, stores 1
generation.load(std::memory_order_relaxed); // returns 0
... many iterations
generation.load(std::memory_order_relaxed); // returns 1
x; // returns ??
To have any hope, we have to get some operation A
in thrA to synchronize with some operation B
in thrB. This is only possible if B
is an acquire operation that takes its value from a side effect in the release sequence headed by A
. But there is only one acquire operation in thrB, namely the initial generation.load(std::memory_order_acquire)
. And it does not take its value (namely 0
) from any of the operations in thrB, but from the initialization of generation
that happened before either thread was started. This side effect is not part of any useful release sequence, certainly not of any release sequence headed by an operation which happens after x=1
. So our attempt at a proof breaks down.
More informally, if we inspect the sequencing of thrB
, we see that the evaluation of x
could be reordered before any or all of the relaxed operations. The fact that it's only evaluated conditionally on generation.load(std::memory_order_relaxed)
returning 1 doesn't help; we could have x
loaded speculatively much earlier, and the value only used after generation.load(std::memory_order_relaxed)
finally returns 1. So all we know is that x
is evaluated sometime after generation.load(std::memory_order_acquire)
returns 0, which gives us precisely no useful information at all about what thrA might or might not have done by then.
This particular issue could be fixed by upgrading the load of generation
in the spin loop to acquire, or by placing an acquire fence after the loop exits but before wait()
returns.
As for done_waiting
, it seems problematic too. If we have
void thrA() {
x = 1;
b.done_waiting();
}
void thrB() {
b.wait();
std::cout << x;
}
then presumably we again want 1
to be printed, without data races. But suppose thrA reaches the barrier first. Then all that it does is
x = 1;
count.fetch_sub(1, std::memory_order_relaxed); // irrelevant
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 2, stores 1
with no release stores at all, so it cannot synchronize with thrB.
Informally, there is no barrier to prevent the store x=1
from being delayed indefinitely, so there cannot be any guarantee that thrB will observe it.
It is late here, so for the moment I leave it as an exercise how to fix this.
By the way, ThreadSanitizer detects data races for both cases: https://godbolt.org/z/1MdbMbYob. I probably should have tried that first, but initially it wasn't so clear to me what to actually test.
I am not sure at this point if these are the only bugs, or if there are more.