c++atomicmemory-barriersstdatomicbarrier

Changing code from Sequential Consistency to a less stringent ordering in a barrier implementation


I came across this code for a simple implementation of a barrier (for code that can't use std::experimental::barrier in C++17 or std::barrier in C++20) in C++ Concurrency in Action book.

[Edit] A barrier is a synchronization mechanism where a group of threads (the number of threads is passed to the constructor of the barrier) can arrive at and wait (by calling wait method) or arrive at and drop off (by calling done_waiting). If all the threads in the group arrive at the barrier, then the barrier is reset and the threads can proceed with the next set of actions. If some threads in the group drops off, then the number of threads in the group is reduced accordingly for thenext round of synchronization with the barrier. [End of Edit]

Here is the code provided for a simple implementation of a barrier.

struct barrier
{
   std::atomic<unsigned> count;
   std::atomic<unsigned> spaces;
   std::atomic<unsigned> generation;
   barrier(unsigned count_):count(count_),spaces(count_),generation(0)
   {}
   void wait(){
      unsigned const gen=generation.load();
      if(!--spaces){
         spaces=count.load();
         ++generation;
      }else{
         while(generation.load()==gen){
            std::this_thread::yield();
         }
      }
   }
   void done_waiting(){
      --count;
      if(!--spaces){
         spaces=count.load();
         ++generation;
      }
   }
};

The author, Anthony Williams, mentions that he chose sequential consistency ordering to make it easier to reason about the code and said that relaxed ordering could be used to make the code more efficient. This is how I changed the code to employ relaxed ordering. Please help me understand if my code is right.

struct barrier
{
   std::atomic<unsigned> count;
   std::atomic<unsigned> spaces;
   std::atomic<unsigned> generation;
   barrier(unsigned count_):count(count_),spaces(count_),generation(0)
   {}
   void wait(){
      unsigned const gen=generation.load(std::memory_order_acquire);
      if(1 == spaces.fetch_sub(1, std::memory_order_relaxed)){
         spaces=count.load(std::memory_order_relaxed);
         generation.fetch_add(1, std::memory_order_release);
      }else{
         while(generation.load(std::memory_order_relaxed)==gen){
            std::this_thread::yield();
         }
      }
   }
   void done_waiting(){
      count.fetch_sub(1, std::memory_order_relaxed);
      if(1 == spaces.fetch_sub(1, std::memory_order_relaxed)){
         spaces=count.load(std::memory_order_relaxed);
         generation.fetch_add(1, std::memory_order_release);
      }
   }
};

The reasoning is this. The increment of generation is a release operation that synchronizes with the loading of generation in the wait call. This ensures that the load from count into spaces is visible to all the threads that call wait and read the new value of generation that was stored with release semantics.

All the operations on spaces thereafter are RMW operations which take part in a release sequence and hence can be relaxed operations. Is this reasoning correct or is this code wrong? Please help me understand. Thanks in advance.

[Edit]I tried using my barrier code like this.

void fun(barrier* b){
        std::cout << "In Thread " << std::this_thread::get_id() << std::endl;
        b->wait();
        std::cout << std::this_thread::get_id() << " First wait done" << std::endl;
        b->wait();
        std::cout << std::this_thread::get_id() << " Second wait done" << std::endl;
        b->done_waiting();
}

int main(){
        barrier b{2};
        std::thread t(fun, &b);
        fun(&b);
        std::cout << std::this_thread::get_id() << " " <<  b.get_count() << std::endl;
        t.join();
}

I also tried testing it with many more threads and in superficial runs, it seems to be doing the right thing. But I would still like to understand if my reasoning is correct or if I am missing something very obvious.[End of Edit]


Solution

  • Modified version of the code

    I believe the following version of the code has the weakest orderings that still make it correct.

    struct barrier
    {
       std::atomic<unsigned> count;
       std::atomic<unsigned> spaces;
       std::atomic<unsigned> generation;
       barrier(unsigned count_):count(count_),spaces(count_),generation(0)
       {}
       void wait(){
          unsigned const gen=generation.load(std::memory_order_relaxed);
          if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
             spaces.store(count.load(std::memory_order_relaxed),
                          std::memory_order_relaxed);
             generation.fetch_add(1, std::memory_order_release);
          }else{
             while(generation.load(std::memory_order_acquire)==gen){
                std::this_thread::yield();
             }
          }
       }
       void done_waiting(){
          count.fetch_sub(1, std::memory_order_relaxed);
          if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
             spaces.store(count.load(std::memory_order_relaxed), 
                          std::memory_order_relaxed);
             generation.fetch_add(1, std::memory_order_release);
          }
       }
    };
    

    Discussion of new version

    Throughout this discussion, when we speak of the "last" thread to reach the barrier in a particular generation, we mean the unique thread that sees spaces.fetch_sub(1) return the value 1.

    In general, if a thread is making a store that promises some operation has completed, it needs to be a release store. When another thread loads that value to get proof that the operation is complete, and it is safe to use the results, then that load needs to be acquire.

    Now in the code at hand, the way a thread indicates that it is done with its "regular work" (whatever was sequenced before the call to wait() or done_waiting()) is by decrementing spaces, i.e. storing a value that is 1 less than the previous value in the modification order. For a non-last thread, that's the only store that it does. So that store, namely spaces.fetch_sub(), has to be release.

    For the last thread, the way that it knows it is last is by loading the value 1 in spaces.fetch_sub(). This serves as proof that all other threads have finished their regular work, and that the last thread can safely drop the barrier. So spaces.fetch_sub() needs to be acquire as well. Hence spaces.fetch_sub(1, std::memory_order_acq_rel).

    The way that the non-last threads determine that the barrier is down and they can safely proceed is by loading a value from generation in the yield loop, and observing that it is different from gen. So that load needs to be acquire; and the store it observes, namely generation.fetch_add(), needs to be release.

    I claim that's all the barriers we need. The work done by the last thread to update spaces from count is effectively in its own little critical section, begun with the acquire load of 1 from spaces and ended with the release increment of generation. At this point, all other threads have already loaded and stored spaces, every thread that called wait has already loaded the old value of generation, and every thread that called done_waiting() has already stored its new decremented value for count. So the last thread can safely manipulate them with relaxed ordering, knowing that no other thread will do so.

    And now the initial load gen = generation.load() in wait() does not need to be acquire. It can't be pushed down past the store to spaces, because the latter is release. So it is sure to be safely loaded before the value 1 is stored to spaces, and only after that could a potential last thread update generation.

    A proof

    Now let's try to give a formal proof that this new version is correct. Look at two threads, A and B, which do

    barrier b(N); // N >= 2
    void thrA() {
        work_A();
        b.wait(); // or b.done_waiting(), doesn't matter which 
    }
    
    void thrB() {
        b.wait();
        work_B();
    }
    

    There are N-2 other threads, each of which calls either b.wait() or b.done_waiting(). For simplicity, let's suppose we begin at generation 0.

    We want to prove that work_A() happens before work_B(), so that they can be operating on the same data (conflicting) without a data race. Consider two cases depending on whether or not B is the last thread to reach the barrier.

    Suppose B is the last thread. That means it did an acquire load of the value 1 from spaces. Thread A must necessarily have release-stored some value >= 1, which was then successively decremented by atomic RMW operations (in other threads) until it reached 1. That is a release sequence headed by A's store, and B's load takes its value from the last side effect in that sequence, so A's store synchronizes with B's load. By sequencing, work_A() happens before A's store, and B's load happens before work_B(), hence work_A() happens before work_B() as desired.

    Now suppose B is not the last thread. Then it returns from wait() only when it loads from generation a value different from gen. Let L denote the actual last thread, which could potentially be A.

    I claim, as a first step, that gen in B must be 0. For the load of generation into gen happens before B's release store in spaces.fetch_sub(), and as noted above, this heads a release sequence which eventually stores the value 1 (in the second-to-last thread). The load of spaces in L takes its value from that side effect, so B's store to spaces synchronizes with L's load. B's load of generation happens before its store to spaces, and L's load of spaces happens before its store (fetch_add()) to generation. So B's load of generation happens before L's store. By read-write coherence [intro.races p17], B's load of generation must not take its value from L's store, but instead from some earlier value in the modification order. This must necessarily be 0 as there are no other modifications to generation.

    (If we were working at generation G instead of 0, this would prove only that gen <= G. But as I explain below, all these loads happen after the previous increment of generation, which is where the value G was stored. So that proves the opposite inequality gen >= G.)

    So B returns from wait() only when it loads 1 from generation. Thus, that final acquire load has taken its value from the release store to generation done by L, showing that L's store happens before B's return. Now L's load of 1 from spaces happens before its store to generation (by sequencing), and A's store to spaces happens before L's load of 1 as proved earlier. (If L = A then the same conclusion still holds, by sequencing.) We now have the following operations totally ordered by happens-before:

    A: work_A();
    A: store side of spaces.fetch_sub()
    L: load of 1 in spaces.fetch_sub()
    L: store side of generation.fetch_add()
    B: acquire load of 1 from generation
    B: work_B()
    

    and have the desired conclusion by transitivity. (If L=A then delete lines 2 and 3 above.)

    We could similarly prove that all the decrements of count in the various calls to done_waiting() happen before L's load of count to store it into spaces, thus those can safely be relaxed. The re-initialization of spaces in L, and the increment of generation, both happen before any thread returns from wait(), so even if those are relaxed, any barrier operations sequenced afterwards will see the barrier properly reset.

    I think that covers all the desired semantics.

    Fences

    We could actually weaken things a little more by using fences. For instance, the acq in spaces.fetch_sub() was solely for the benefit of thread L which loads the value 1; other threads didn't need it. So we could do instead

    if (1 == spaces.fetch_sub(1, std::memory_order_release)){
        std::atomic_thread_fence(std::memory_order_acquire);
        // ...
    }
    

    and then only thread L needs to pay the cost of the acquire. Not that it really matters much, since all other threads are going to sleep anyway and so we're unlikely to care if they are slow.

    Data races in OP's original version

    (This section was written earlier, before I made the modified version above.)

    I believe there are at least two bugs.

    Let's analyze wait() by itself. Consider code that does:

    int x = 0;
    barrier b(2);
    
    void thrA() {
        x = 1;
        b.wait();
    }
    
    void thrB() {
        b.wait();
        std::cout << x << std::endl;
    }
    

    We wish to prove that x = 1 in thrA happens before the evaluation of x in thrB, so that the code would have no data race and be forced to print the value 1.

    But I don't think we can. Let's suppose thrB reaches the barrier first, which is to say that it observes spaces.fetch_sub returning 2. So the loads and stores performed in each thread are sequenced as follows:

    thrA:
    x = 1;
    gen=generation.load(std::memory_order_acquire);     // returns 0
    spaces.fetch_sub(1, std::memory_order_relaxed);     // returns 1, stores 0
    spaces=count.load(std::memory_order_relaxed);       // stores 2
    generation.fetch_add(1, std::memory_order_release); // returns 0, stores 1
    
    thrB:
    gen=generation.load(std::memory_order_acquire);     // returns 0
    spaces.fetch_sub(1, std::memory_order_relaxed);     // returns 2, stores 1
    generation.load(std::memory_order_relaxed);         // returns 0
    ... many iterations
    generation.load(std::memory_order_relaxed);         // returns 1
    x;                                                  // returns ??
    

    To have any hope, we have to get some operation A in thrA to synchronize with some operation B in thrB. This is only possible if B is an acquire operation that takes its value from a side effect in the release sequence headed by A. But there is only one acquire operation in thrB, namely the initial generation.load(std::memory_order_acquire). And it does not take its value (namely 0) from any of the operations in thrB, but from the initialization of generation that happened before either thread was started. This side effect is not part of any useful release sequence, certainly not of any release sequence headed by an operation which happens after x=1. So our attempt at a proof breaks down.

    More informally, if we inspect the sequencing of thrB, we see that the evaluation of x could be reordered before any or all of the relaxed operations. The fact that it's only evaluated conditionally on generation.load(std::memory_order_relaxed) returning 1 doesn't help; we could have x loaded speculatively much earlier, and the value only used after generation.load(std::memory_order_relaxed) finally returns 1. So all we know is that x is evaluated sometime after generation.load(std::memory_order_acquire) returns 0, which gives us precisely no useful information at all about what thrA might or might not have done by then.

    This particular issue could be fixed by upgrading the load of generation in the spin loop to acquire, or by placing an acquire fence after the loop exits but before wait() returns.


    As for done_waiting, it seems problematic too. If we have

    void thrA() {
        x = 1;
        b.done_waiting();
    }
    
    void thrB() {
        b.wait();
        std::cout << x;
    }
    

    then presumably we again want 1 to be printed, without data races. But suppose thrA reaches the barrier first. Then all that it does is

    x = 1;
    count.fetch_sub(1, std::memory_order_relaxed); // irrelevant
    spaces.fetch_sub(1, std::memory_order_relaxed); // returns 2, stores 1
    

    with no release stores at all, so it cannot synchronize with thrB.

    Informally, there is no barrier to prevent the store x=1 from being delayed indefinitely, so there cannot be any guarantee that thrB will observe it.

    It is late here, so for the moment I leave it as an exercise how to fix this.


    By the way, ThreadSanitizer detects data races for both cases: https://godbolt.org/z/1MdbMbYob. I probably should have tried that first, but initially it wasn't so clear to me what to actually test.


    I am not sure at this point if these are the only bugs, or if there are more.