c++multithreadingc++20memory-barriersstdatomic

Proper Use of Atomics for an Asynchronous Iterator with Multiple Producers and Consumers in c++20


I am trying to create an asynchronous iterator over a std::vector with a fixed allocation size (known before any thread starts execution). The goal is to have multiple producers incrementing the available data inside the vector, while multiple consumers collect the produced data.

I am unsure if my use of atomic operations is correct for this scenario. Specifically, I am concerned about whether I am correctly using memory order mechanisms to ensure proper synchronization between atomic and non-atomic data access.

Could someone help me verify if my implementation is safe and follows best practices for atomic memory ordering? Any guidance or improvements would be greatly appreciated.

[edit]

Code Structure

I have revised my code so it is fully self-contained and can be copied and compiled in any environment c++20 (online or offline). Additionally, I have added explanations for clarity. The code consists of the following components:

  1. async_iterator:

    • Manages an atomic iterator (_index) over a pre-allocated container (e.g., a non-empty vector or an array).
    • Tracks the size of the data filled by producers (_filled_size).
    • Provides a next() function to increment _index, waiting when necessary based on _filled_size.
  2. async_vector (derived from async_iterator):

    • Wraps a std::vector with an async_iterator.
    • Provides two methods for adding elements:
      • emplace_back_by_unique_thread(): A thread-safe function meant to be called by a single producer thread (I am pretty certain that this function is thread-safe. Well i hope so :D).
      • emplace_back(): Intended for multiple producers, but I am uncertain whether my implementation is thread-safe.
  3. Example 1: example_one_camera_producer_per_buffer

    • Implements a producer-consumer setup where each producer calls emplace_back_by_unique_thread() on its dedicated async_vector.
    • The number of async_vector instances matches the number of producers.
  4. Example 2: example_multi_camera_producer_one_shared_buffer

    • Implements a producer-consumer setup where multiple producers share the same async_vector.
    • Each producer calls emplace_back() on a shared buffer.
    • I am unsure whether emplace_back() is thread-safe in this case.
  5. main()

    • Runs both examples.

I am confident that emplace_back_by_unique_thread() is thread-safe, but I am uncertain about emplace_back().

Any insights or recommendations would be highly appreciated.

#include <atomic>
#include <vector>
#include <string>
#include <optional>
#include <thread>
#include <format>
#include <iostream>
#include <cassert>
#include <syncstream>

template <typename T = std::size_t>
struct async_iterator
{
    async_iterator() = default;
    async_iterator(T capacity) : _capacity(capacity) {}
    async_iterator(const async_iterator&) = delete;
    async_iterator& operator=(const async_iterator&) = delete;
    async_iterator(async_iterator&&) = delete;
    async_iterator& operator=(async_iterator&&) = delete;

    using value_type = T;
    using atomic_value_type = std::atomic<value_type>;
    using optional_value_type = std::optional<value_type>;

    static constexpr value_type ABORT_FLAG = std::numeric_limits<value_type>::max();

    value_type _capacity{0};                  // Maximum container size
    atomic_value_type _filled_size{0};        // Current container size (dynamic in function of the producer)
    atomic_value_type _index{0};              // Current iterator index (used by the consumer)

    optional_value_type next()
    {
        auto _next_index = _index.fetch_add(1, std::memory_order_relaxed);
        if (_next_index >= _capacity)
            return std::nullopt; // End of iteration

        while (true) {
            auto _current_size = _filled_size.load(std::memory_order_acquire);

            if (_current_size == ABORT_FLAG) return std::nullopt; // Abort check
            if (_next_index < _current_size) return _next_index;  // The element is available

            // Wait for the producer to fill the container
            _filled_size.wait(_current_size, std::memory_order_acquire);
        }
    }

    void notify_all() { _filled_size.notify_all(); }

    void abort()
    {
        _filled_size.store(ABORT_FLAG, std::memory_order_release);
        _index.store(_capacity, std::memory_order_release);
        _filled_size.notify_all();
    }

    // Check if the iterator is still valid
    operator bool() const { return _index.load(std::memory_order_relaxed) < _capacity; }

    auto unprocessed_latency() const { return _filled_size.load(std::memory_order_relaxed) - _index.load(std::memory_order_relaxed); }
    auto filled_size() const { return _filled_size.load(std::memory_order_relaxed); }
    auto remaining_capacity() const { return _capacity - _filled_size.load(std::memory_order_relaxed); }
    auto is_aborted() const { return _filled_size.load(std::memory_order_relaxed) == ABORT_FLAG; }
    auto is_finished() const { return !(_index.load(std::memory_order_relaxed) < _capacity); }
};


template <typename T>
struct async_vector : public async_iterator<std::size_t>
{
    std::vector<T> _data;

    async_vector() = default;
    async_vector(auto && ... args) : _data(std::forward<decltype(args)>(args)...) {
        _capacity = _data.size();
        _filled_size.store(_capacity, std::memory_order_release);
    }

    async_vector(size_t fixed_size) : _data(fixed_size) {
        _capacity = fixed_size;
    }

    void resize(size_t size)
    {
        assert((_filled_size == ABORT_FLAG) || (_filled_size >= _capacity && _index >= _capacity) && "Threads are still running on the old vector");
        _filled_size.store(0, std::memory_order_release);
        _index.store(0, std::memory_order_release);
        _data.resize(size);
        _data.shrink_to_fit();
        _capacity = size;
    }

    auto capacity() const {  assert(_data.size() == _capacity); return _data.size(); }

#ifdef _DEBUG
    std::atomic<std::thread::id> _thread_id;

    bool is_same_thread() {
        if (_thread_id == std::thread::id{}) {
            _thread_id = std::this_thread::get_id();
            return true;
        }
        return (_thread_id == std::this_thread::get_id());
    }
#endif

    // This function is only safe to call from the same thread to fill the vector
    bool emplace_back_by_unique_thread(auto&& elem)
    {
#ifdef _DEBUG
        assert(is_same_thread() && "emplace_back_by_unique_thread must be called from the same thread");
#endif

        auto next_index = _filled_size.load(std::memory_order_relaxed);
        if (next_index >= _data.size()) return false; // On abort or complete filled vector

        _data[next_index] = std::forward<decltype(elem)>(elem);
        _filled_size.fetch_add(1, std::memory_order_release);
        _filled_size.notify_all();
        return true;
    }

    // Can be called from multiple threads
    bool pop_front(T& elem)
    {
        if (auto next_index_opt = next())
        {
            elem = _data[*next_index_opt];
            return true;
        }
        return false; // Abort or empty queue
    }

    // Well, i am not sure if this is thread safe.
    // It is safe to call this function from multiple threads ?
    bool emplace_back(auto&& elem)
    {
        auto next_index = _filled_size.fetch_add(1, std::memory_order_acquire);

        // If the vector is full or aborted, revert the increment and return false
        if (next_index >= _data.size()) {
            _filled_size.fetch_sub(1, std::memory_order_release);
            return false;
        }

        _data[next_index] = std::forward<decltype(elem)>(elem);
        std::atomic_thread_fence(std::memory_order_release); // Ensure the data is written before the size is incremented
        _filled_size.notify_all(); // Notify all consumers
        return true;
    }
};


// Variables used inside examples
//--------------------------------

static constexpr int camera_count = 4;       // Number of cameras to simulate as producers
static constexpr int frame_count = 25;       // Number of frames to store per camera
static constexpr int camera_ti = 50;         // camera interval in ms, i.e. camera integration time

static constexpr int consumer_by_camera = 3; // Number of consumers per camera (the consumer processing time is 3 times the camera integration time)
static constexpr int consumer_count = camera_count * consumer_by_camera;         // Number of consumers to simulate as executors
static constexpr int consumer_processing_time = camera_ti * consumer_by_camera;  // Consumer processing time in ms (3 times the camera integration time)

using mat = std::string;  // For example to avoid OpenCV dependency
using async_buffer = async_vector<mat>;


// Example 1
//--------------------------------
// The number of producers (cameras) is the same as the number of buffers (async_vector).

void example_one_camera_producer_per_buffer()  
{
    std::vector<async_buffer> vFrameBuffer(camera_count);           // Async vector of buffers, one per camera
    for(auto & buffer : vFrameBuffer) buffer.resize(frame_count);   // Resize each buffer with the frame count

    // Producer function to simulate camera frame acquisition and store them in the async vector buffer
    auto producer = [&](int buffer_id, int camera_id)
    {
        for(int i = 0; i < frame_count; ++i)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(camera_ti));
            auto frame = std::format("Hello from camera {} - Frame {}", camera_id, i);
            if (!vFrameBuffer[buffer_id].emplace_back_by_unique_thread(std::move(frame))) // Always the same thread for the same camera
               break; // Queue is full, or abort the producer
        }
    };

    std::atomic_int processing_count = 0; // To count the number of frames processed by the consumers

    // Consumer function to simulate frame processing
    auto consumer = [&](int consumer_id, int buffer_id)
    {
        auto & async_buffer = vFrameBuffer[buffer_id];
        mat frame;
        while(async_buffer.pop_front(frame)) {
            ++processing_count;
            std::this_thread::sleep_for(std::chrono::milliseconds(consumer_processing_time));
            std::osyncstream(std::cout) << std::format("Executor {} - Buffer {} - Frame: {}", consumer_id, buffer_id, frame) << std::endl;
        };
    };

    std::cout << "Example using emplace_back_by_unique_thread" << std::endl;
    {
        {
            std::vector<std::jthread> vProducer;
            for(int i = 0; i < camera_count; ++i)
                vProducer.emplace_back(producer, i, i);    // One producer per camera

            std::vector<std::jthread> vExecutor;
            for (int i = 0; i < consumer_count; ++i)
                vExecutor.emplace_back(consumer, i, i % camera_count); // Multiple consumers per camera
        }  // Join all threads

        assert(processing_count == frame_count * camera_count);
        std::cout << std::format("Processed {} frames", processing_count.load()) << std::endl;
    }
}

// Example 2
//--------------------------------
// Producers (cameras) share some buffers (async_vector)

void example_multi_camera_producer_one_shared_buffer()  
{
    const int shared_buffer_count = 2; // Number of cameras per shared buffer
    assert(camera_count % shared_buffer_count == 0); // Ensure camera_count is a multiple of shared_buffer_count)
    std::vector<async_buffer> vFrameBuffer(shared_buffer_count);       // Async vector of buffers, one buffer for 2 cameras
    for(auto & buffer : vFrameBuffer) buffer.resize(frame_count * camera_count / shared_buffer_count);   // Resize each buffer

    // Producer function to simulate camera frame acquisition and store them in the async vector buffer
    auto producer = [&](int buffer_id, int camera_id)
    {
        for(int i = 0; i < frame_count; ++i)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(camera_ti));
            auto frame = std::format("Hello from Camera {} - Frame {}", camera_id, i);
            if (!vFrameBuffer[buffer_id].emplace_back(std::move(frame))) // Fill a shared buffer
               break; // Queue is full, or abort the producer
        }
    };

    std::atomic_int processing_count = 0; // To count the number of frames processed by the consumers

    // Consumer function to simulate frame processing
    auto consumer = [&](int consumer_id, int buffer_id)
    {
        auto & async_buffer = vFrameBuffer[buffer_id];
        mat frame;
        while(async_buffer.pop_front(frame)) {
            ++processing_count;
            std::this_thread::sleep_for(std::chrono::milliseconds(consumer_processing_time));
            std::osyncstream(std::cout) << std::format("Executor {} - buffer {} - Frame: {}", consumer_id, buffer_id, frame) << std::endl;
        };
    };

    std::cout << "Example using emplace_back (filling a shared buffer)" << std::endl;
    {
        {
            int buffer_count = static_cast<int>(vFrameBuffer.size());
            std::vector<std::jthread> vProducer;
            for(int i = 0; i < camera_count; ++i)
                vProducer.emplace_back(producer, i % buffer_count, i);    // One producer per camera

            std::vector<std::jthread> vExecutor;
            for (int i = 0; i < consumer_count; ++i)
                vExecutor.emplace_back(consumer, i, i % buffer_count); // Multiple consumers per camera
        }  // Join all threads

        assert(processing_count == frame_count * camera_count);
        std::cout << std::format("Processed {} frames", processing_count.load()) << std::endl;
    }
}

int main()
{

    example_one_camera_producer_per_buffer();
    example_multi_camera_producer_one_shared_buffer();
    return 0;
}


Solution

  • this container is trying to be a vector and a queue at the same time, this won't work ... pick one.

    let's assume this is a queue, to be used with a single producer and consumer.

    bool emplace_back(auto&& elem)
    {
        auto next_index = _filled_size.fetch_add(1, std::memory_order_acquire);
    
        // thread suspended here by the OS
        // reader sees next_index is ready, but we didn't write it yet!
        
        _data[next_index] = std::forward<decltype(elem)>(elem);
        std::atomic_thread_fence(std::memory_order_release);
        _filled_size.notify_all(); // Notify all consumers
        return true;
    }
    

    this function is not thread-safe, even if the producer didn't get suspended in the middle, it is still a race whether the reader reads the old or new value, you must first write the data then increment _filled_size.

    auto next_index = _filled_size.load(std::memory_order_acquire);
    _data[next_index] = std::forward<decltype(elem)>(elem);
    _filled_size.store(next_index + 1, std::memory_order_release);
    _filled_size.notify_all();
    

    now you have a race between any two producers, this can only be solved by doing atomic compare_exchange on _data[next_index] , and most platforms can only do 1 pointer size CAS, which means if the object is bigger than a pointer then you need to allocate the object on the heap then store only a raw pointer in the vector. some platforms can do 2 pointers size CAS.

    Or you can have a mutex on the write side, allowing only 1 thread inside emplace_back, then you can easily write the value then increment next_index without having to allocate the object on the heap.

    this is why lock-free containers are a lot slower than lock-based ones.


    A queue also usually wraps around as a circular buffer, and the same problems at the producer will happen at the consumer too once you add the wrap around, the consumer will also need to first consume the item then increment the _index, otherwise a wrapped-around producer will overwrite the values we haven't read yet.

    auto _next_index = _index.load(std::memory_order_acquire);
    // wait for next_index < filled_size here
    elem = std::move(_data[_next_index]);
    _index.store(1 + _next_index, std::memory_order_release);
    _filled_size.notify_all();
    

    which also makes it not thread-safe for multiple consumers and you will need to do a CAS on _data[_next_index], or add a readers mutex.


    now let's assume it is a vector, not a queue, we will disallow iteration for now, and only allow emplace_back, but emplace_back will have to return the emplaced item's index.

    size_t emplace_back(auto&& elem)
    {
        auto next_index = _filled_size.fetch_add(1, std::memory_order_acquire);
        // error handling: if (next_index) > max_size, throw exception or wrap around?
        _data[next_index] = std::forward<decltype(elem)>(elem);
        std::atomic_thread_fence(std::memory_order_release);
        return next_index;
    }
    

    we know that the index returned by emplace_back is safe to use and we didn't have to do a heap allocation. we can put this index in another queue for our consumer to read (size_t is the size of a pointer so no heap allocation here either), the consumer is not going to iterate the vector in order, it is only going to access elements that we tell it to access when they are ready.

    If the consumer got index 5 out of the queue this means index 5 is safe, index 4 may still be invalid because its writer didn't finish writing it yet.

    This brings us back to the queue that allocates the objects on the heap, except now we are using this fixed size vector as a heap, which is maybe faster because the global allocator may be slow, but this vector has a fixed size. maybe we can reuse the indices after the consumer consumes them so we don't run out of memory, but reclaiming consumed indices is going to waste more time, making our lock-free implementation even more slower than a lock-based one, but it can support higher concurrency .... trade-offs are to be made.


    Unless you really absolutely must implement your own lock-free queue, i'd recommend using an off the shelf one like tbb concurrent_bounded_queue for multi-producer multi-consumer or a ring-buffer based queue for single producer single consumer. there are a lot of optimizations that you didn't take into account like reusing hot cache and padding to reduce false sharing.

    Lastly the presented queue although doesn't have locks, it is not lock-free, if a producer got suspended after writing the value but before incrementing the atomic then all progress is halted, off the shelf queues get around this issue.