c++producer-consumerdouble-buffering

Producer/consumer of type <int *>, how to avoid segmentation fault?


I found a great producer/consumer double buffer example in this question. However, instead of setting T to "int", I want to use "int *". Unfortunately, it doesn't work, and I keep getting segmentation faults when writing to the buffer. The code below is what I've tried. Would anyone mind telling me how to solve that problem?

void processing(DoubleBuffer<int *> *const buf, int num_elts)
{
    std::thread producer([&]() {
       for (int i = 0; i != num_elts; ++i) {
          int **item = buf->start_writing();

          if (item != nullptr) {      // Always true
            **item = i;
          }
          buf->end_writing();
      }
   });

   /*
   std::thread consumer([&]() {
      int prev = -1;
      for (int i = 0; i != 10; ++i) {
         int* item = *buf->start_reading();
         std::cout << "Consumer: " << item << std::endl;
         if (item != nullptr) {
            assert(*item > prev);
            prev = *item;
            std::cout << "prev: " << prev << std::endl;
         }
         buf->end_reading();
       }
    });
    */
   producer.join();
  //consumer.join();
}


int main(void) {

    int* buffer_a;
    int* buffer_b;

    int num_elts = 10;

    ProducerConsumerDoubleBuffer<int *>  buf;
    buf.m_buf[0] = buffer_a + num_elts;
    buf.m_buf[1] = buffer_b + num_elts;
    processing(&buf, num_elts);
}     

Solution

  • I modified the class from the post you linked to work better with pointer types. Playing around with the member variables seemed wrong, so I added a malloc to the constructor to avoid segfaults.

    #include <atomic>
    #include <cstdint>
    #include <thread>
    #include <cassert>
    #include <iostream>
    
    template <typename T>
    class ProducerConsumerDoubleBuffer {
    public:
        ProducerConsumerDoubleBuffer() : m_state(0) { }
        ProducerConsumerDoubleBuffer(const int buf_size) : m_state(0) {
            m_buf[0] = new typename std::remove_pointer<T>::type[buf_size];
            m_buf[1] = new typename std::remove_pointer<T>::type[buf_size];
        }
        ~ProducerConsumerDoubleBuffer() { }
    
        // Never returns nullptr
        T* start_writing() {
            // Increment active users; once we do this, no one
            // can swap the active cell on us until we're done
            auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
            return &m_buf[state & 1];
        }
    
        void end_writing() {
            // We want to swap the active cell, but only if we were the last
            // ones concurrently accessing the data (otherwise the consumer
            // will do it for us when *it's* done accessing the data)
    
            auto state = m_state.load(std::memory_order_relaxed);
            std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
            state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
            if ((state & 0x6) == 0) {
                // The consumer wasn't in the middle of a read, we should
                // swap (unless the consumer has since started a read or
                // already swapped or read a value and is about to swap).
                // If we swap, we also want to clear the full flag on what
                // will become the active cell, otherwise the consumer could
                // eventually read two values out of order (it reads a new
                // value, then swaps and reads the old value while the
                // producer is idle).
                m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
            }
        }
    
        // Returns nullptr if there appears to be no more data to read yet
        T* start_reading() {
            m_readState = m_state.load(std::memory_order_relaxed);
            if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
                // Nothing to read here!
                return nullptr;
            }
    
            // At this point, there is guaranteed to be something to
            // read, because the full flag is never turned off by the
            // producer thread once it's on; the only thing that could
            // happen is that the active cell changes, but that can
            // only happen after the producer wrote a value into it,
            // in which case there's still a value to read, just in a
            // different cell.
    
            m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;
    
            // Now that we've incremented the user count, nobody can swap until
            // we decrement it
            return &m_buf[(m_readState & 1) ^ 1];
        }
    
        void end_reading() {
            if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
                // There was nothing to read; shame to repeat this
                // check, but if these functions are inlined it might
                // not matter. Otherwise the API could be changed.
                // Or just don't call this method if start_reading()
                // returns nullptr -- then you could also get rid
                // of m_readState.
                return;
            }
    
            // Alright, at this point the active cell cannot change on
            // us, but the active cell's flag could change and the user
            // count could change. We want to release our user count
            // and remove the flag on the value we read.
    
            auto state = m_state.load(std::memory_order_relaxed);
            std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
            state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
            if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
                // Oi, we were the last ones accessing the data when we released our cell.
                // That means we should swap, but only if the producer isn't in the middle
                // of producing something, and hasn't already swapped, and hasn't already
                // set the flag we just reset (which would mean they swapped an even number
                // of times).  Note that we don't bother swapping if there's nothing to read
                // in the other cell.
                m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
            }
        }
    private:
        T m_buf[2];
    
        // The bottom (lowest) bit will be the active cell (the one for writing).
        // The active cell can only be switched if there's at most one concurrent
        // user. The next two bits of state will be the number of concurrent users.
        // The fourth bit indicates if there's a value available for reading
        // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
        std::atomic<std::uint32_t> m_state;
    
        std::uint32_t m_readState;
    };
    
    
    
    void processing(ProducerConsumerDoubleBuffer<int *> *const buf, int num_elts)
    {
        std::thread producer([&]() {
           for (int i = 0; i != num_elts; ++i) {
              int **item = buf->start_writing();
    
              if (item != nullptr) {      // Always true
                **item = i;
              }
              buf->end_writing();
          }
       });
    
       std::thread consumer([&]() {
          int prev = -1;
          for (int i = 0; i != num_elts; ++i) {
             int** item = buf->start_reading();
             //std::cout << "Consumer: " << item << std::endl;
             if (item != nullptr) {
                assert(**item > prev);
                prev = **item;
                std::cout << "item: " << **item << std::endl;
             }
             buf->end_reading();
           }
        });
       producer.join();
       consumer.join();
    }
    
    
    int main(void) {
        int num_elts = 500000;
    
        ProducerConsumerDoubleBuffer<int *>  buf(num_elts);
        processing(&buf, num_elts);
    }