c++boostshared-memorylock-freeboost-interprocess

Shared memory SPSC queue for strings without allocations in C++


I am looking for something similar to SHM (SHared Memory) SPSC queue setup offered by boost::lockfree::spsc_queue and boost::interprocess but without allocating strings and storing them flat i.e. next to each other for maximum efficiency.

If I understand correctly that setup stores strings offset in the queue and allocates memory for the string somewhere else in the SHM.

Queue design can be:

| size | string 1 | size | string 2 | size | string 3 | ...
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                    SHM segment

in a circular buffer fashion. Idea:

struct Writer {
    std::byte *shm;

    void write(std::string_view str) {
        // write size
        const uint32_t sz = str.size();
        std::memcpy(shm, &sz, sizeof(sz));
        shm += sizeof(sz);

        // write string
        std::memcpy(shm, str.data(), sz);
        shm += sz;

    }
};

Solution

  • tl;dr

    It is possible, although you'll have to deal with a bunch of extra edge-cases.


    1. Assumptions

    Given the information you provided i'm going to assume you want the following properties for the single-producer, single-consumer (spsc) queue:


    2. Potential alternatives

    Depending on your requirements there could be a few alternative options:


    3. Design Considerations

    3.1 Wrapped-around writes

    A ringbuffer with fixed-size elements essentially splits the raw buffer into element-sized slots that are contiguous within the buffer, e.g.:

     /---------------------- buffer ----------------------\
    /                                                      \
    +----------+----------+----------+----------+----------+
    | Object 1 | Object 2 | Object 3 |    ...   | Object N |
    +----------+----------+----------+----------+----------+
    

    This automatically ensures that all objects within the buffer are contiguous. i.e. you never have to deal with a wrapped-around object:

     /---------------------- buffer ----------------------\
    /                                                      \
    +-----+----------+----------+----------+----------+-----+
    |ct 1 |          |          |          |          | Obje|
    +-----+----------+----------+----------+----------+-----+
    

    If the element-size is not fixed however, we do have to handle this case somehow.
    Assume for example an empty 8-byte ringbuffer with the read and write pointer on the 7th byte (the buffer is currently empty):

                                     /--------- read pointer
                                     v
    +----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
    +----+----+----+----+----+----+----+----+
    |    |    |    |    |    |    |    |    |
    +----+----+----+----+----+----+----+----+
                                     ^
                                     \---------- write pointer
    

    If we now attempt to write "bar" into the buffer (prefixed by it's length), we would get a wrapped-around string:

                                     /--------- read pointer
                                     v
    +----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
    +----+----+----+----+----+----+----+----+
    |  a |  r |    |    |    |    | 03 |  b |
    +----+----+----+----+----+----+----+----+
                 ^
                 \----------------------------- write pointer
    

    There are 2 ways to deal with this:

    3.1.1 Making it part of the interface

    The 1st option would be the easiest to implement, but it would be quite cumbersome to use for the consumer of the queue, because it needs to deal with 2 separate pointers + sizes that in combination represent the string. i.e. a potential interface for this could be:

    class spsc_queue {
      // ...
    
      bool push(const char* str, std::size_t size);
    
      bool read(
         const char*& str1, std::size_t& size1,
         const char*& str2, std::size_t& size2
      );
      bool pop();
    
      // ...
    };
    
    // Usage example (assuming the state from "bar" example above)
    const char* str1;
    std::size_t size1;
    const char* str2;
    std::size_t size2;
    if(queue.read(str1, size1, str2, size2)) {
      // str1 would point to buffer[7] ("b")
      // size1 would be 1
      // str2 would point to buffer[0] ("ar")
      // size2 would be 2
      queue.pop();
    }
    

    Having to deal with 2 pointers and 2 sizes all the time for the odd case of a wrapped-around write is not the best solution imho, so i went with option 2:

    3.1.2 Prevent wrap-arounds

    The alternative option would be to prevent wrap-arounds from occuring in the first place.

    A simple way to achieve this is to add a special "wrap-around" marker that tells the reader to immediately jump back to the beginning of the buffer (and therefore skip over all bytes after the wrap-around marker)

    Example writing "bar": (WA represents a wrap-around marker)

                                     /--------- read pointer
                                     v
    +----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
    +----+----+----+----+----+----+----+----+
    | 03 |  b |  a |  r |    |    | WA |    |
    +----+----+----+----+----+----+----+----+
                           ^
                           \------------------- write pointer
    

    So once the reader tries to read the next element it'll encounter the wrap-around marker. This instructs it to directly go back to index 0, where the next element is located:

       /--------------------------------------- read pointer
       v
    +----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
    +----+----+----+----+----+----+----+----+
    | 03 |  b |  a |  r |    |    | WA |    |
    +----+----+----+----+----+----+----+----+
                           ^
                           \------------------- write pointer
    

    This technique allows all strings to be stored contiguously within the ring buffer - with the small trade-off that the end of the buffer might not be fully utilized and a couple extra branches in the code.

    For this answer i chose the wrap-around marker approach.

    3.2 What if there's no space for a wrap-around marker?

    Another problem comes up once you want a string-size that's above 255 - at that point the size needs to be larger than 1 byte.

    Assume we use a 2 byte-length and write "foo12" (length 5) into the ring buffer:

       /--------------------------------------- read pointer
       v
    +----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
    +----+----+----+----+----+----+----+----+
    | 05 | 00 |  f |  o |  o |  1 |  2 |    |
    +----+----+----+----+----+----+----+----+
                                          ^
                                          \---- write pointer
    

    so far so good, but as soon as the read pointer catches up we have a problem:
    there is only a single byte left to write before we need to wrap around, which is not enough to fit a 2-byte length!

    So we would need to wrap-around the length on the next write (writing "foo" (length 3) into the ringbuffer):

                                          /---- read pointer
                                          v
    +----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
    +----+----+----+----+----+----+----+----+
    | 00 |  f |  o |  o |    |    |    | 03 |
    +----+----+----+----+----+----+----+----+
                           ^
                           \------------------- write pointer
    

    There are three potential ways this could be resolved:

    3.3 Destructive interference

    Depending on how fast your reader is in comparison to your writer you might have a bit of destructive interference within the ring buffer.

    If for example your L1 cache line size is 128 bytes, using 1-byte lengths for the strings in the ring-buffer, and only pushing length 1 strings, e.g.:

       /--------------------------------------- read pointer
       v
    +----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
    +----+----+----+----+----+----+----+----+
    | 01 |  a | 01 |  b | 01 |  c |    |    | ...
    +----+----+----+----+----+----+----+----+
                                     ^
                                     \--------- write pointer
    

    Then this would result in +/- 64 string entries being stored on the same cache line, which are continously written by the producer while they get read from the consumer => a lot of potential interference.

    This can be prevented by padding the strings within the ring buffer to a multiple of your cache line size (in C++ available as std::hardware_destructive_interference_size)

    i.e. strings padded to 4-bytes:

       /--------------------------------------- read pointer
       v
    +----+----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |  8 |
    +----+----+----+----+----+----+----+----+----+
    | 01 |  a |    |    | 01 |  c |    |    |    | ...
    +----+----+----+----+----+----+----+----+----+
                                               ^
                                               \--------- write pointer
    

    The trade-off here is of course that this will potentially waste a lot of space within the ring buffer.

    So you'll have to profile how much padding you want for your string values.

    The padding value you choose should be between those two values:
    1 <= N <= std::hardware_destructive_interference_size


    4. Implementation

    This is a full implementation of a wait-free, string only, spsc fifo queue - based on the design considerations listed above.

    I've only implemented the bare minimum interface required, but you can easily create all the utility functions boost::lockfree::spsc_queue provides around those 3 core functions:

    godbolt

    template<std::unsigned_integral size_type, std::size_t padding = 1>
    class lockfree_spsc_string_queue {
    public:
        lockfree_spsc_string_queue(void* buffer, std::size_t buffer_size, bool init = false);
    
        // producer:
    
        // tries to push the given string into the queue
        // returns true if adding the string was successfull
        // (contents will be copied into the ringbuffer)
        bool push(std::string_view str);
    
        // consumer:
    
        // reads the next element in the queue.
        // returns an empty optional if the queue is empty,
        // otherwise a string_view that points to the string
        // within the ringbuffer.
        // Does NOT remove the element from the queue.
        std::optional<std::string_view> front();
    
        // Removes the next element from the queue.
        // Returns true if an element has been removed.
        bool pop();
    };
    

    Usage example: godbolt

    void producer() {
        lockfree_spsc_string_queue<unsigned short> queue(BUFFER_POINTER, BUFFER_SIZE, true);
        while(true) {
            // retry until successful
            while(!queue.push("foobar"));
        }
    }
    
    void consumer() {
        lockfree_spsc_string_queue<unsigned short> queue(BUFFER_POINTER, BUFFER_SIZE);
    
        while(true) {
            std::optional<std::string_view> result;
            // retry until successful
            while(!result) result = queue.front();
    
            std::cout << *result << std::endl;
    
            bool pop_result = queue.pop();
            assert(pop_result);
        }
    }
    

    boost::lockfree::spsc_queue::consume_all e.g. could be implemented like this (in terms of the 3 functions provided by this minimal implementation):

        template<class Functor>
        std::size_t consume_all(Functor&& f) {
            std::size_t cnt = 0;
            for(auto el = front(); el; el = front()) {
                std::forward<Functor>(f)(*el);
                bool pop_result = pop();
                assert(pop_result);
                ++cnt;
            }
            return cnt;
        }
    

    Full implementation: godbolt

    // wait-free single-producer, single consumer fifo queue
    // 
    // The constructor must be called once with `init = true` for a specific region.
    // After the construction of the queue with `init = true` has succeeded additional instances
    // can be created for the region by passing `init = false`.
    template<
            std::unsigned_integral size_type,
            std::size_t padding = 1>
    class lockfree_spsc_string_queue {
        // we use the max value of size_type as a special marker
        // to indicate that the writer needed to wrap-around early to accommodate a string value.
        // this means the maximum size a string entry can be is `size_type_max - 1`.
        static constexpr size_type size_type_max = std::numeric_limits<size_type>::max();
    
        // calculates the padding necessary that is required after
        // a T member to align the next member onto the next cache line.
        template<class T>
        static constexpr std::size_t padding_to_next_cache_line =
            std::hardware_destructive_interference_size -
            sizeof(T) % std::hardware_destructive_interference_size;
    
    public:
        // internal struct that will be placed in the shared memory region
        struct spsc_shm_block {
            using atomic_size = std::atomic<std::size_t>;
    
            // read head
            atomic_size read_offset;
            char pad1[padding_to_next_cache_line<atomic_size>];
    
            // write head
            atomic_size write_offset;
            char pad2[padding_to_next_cache_line<atomic_size>];
    
            std::size_t usable_buffer_size;
            char pad3[padding_to_next_cache_line<std::size_t>];
    
            // actual data
            std::byte buffer[];
    
            [[nodiscard]] static inline spsc_shm_block* init_shm(void* ptr, std::size_t size) {
                spsc_shm_block* block = open_shm(ptr, size);
    
                // atomics *must* be lock-free, otherwise they won't work across process boundaries.
                assert(block->read_offset.is_lock_free());
                assert(block->write_offset.is_lock_free());
    
                block->read_offset = 0;
                block->write_offset = 0;
                block->usable_buffer_size = size - offsetof(spsc_shm_block, buffer);
                return block;
            }
    
            [[nodiscard]] static inline spsc_shm_block* open_shm(void* ptr, std::size_t size) {
                // this type must be trivially copyable, otherwise we can't implicitly start its lifetime.
                // It also needs to have a standard layout for offsetof.
                static_assert(std::is_trivially_copyable_v<spsc_shm_block>);
                static_assert(std::is_standard_layout_v<spsc_shm_block>);
    
                // size must be at least as large as the header
                assert(size >= sizeof(spsc_shm_block));
                // ptr must be properly aligned for the header
                assert(reinterpret_cast<std::uintptr_t>(ptr) % alignof(spsc_shm_block) == 0);
            
                // implicitly start lifetime of spsc_shm_block
                return std::launder(reinterpret_cast<spsc_shm_block*>(ptr));
            }
        };
    
    public:
        inline lockfree_spsc_string_queue(void* ptr, std::size_t size, bool init = false)
            : block(init ? spsc_shm_block::init_shm(ptr, size) : spsc_shm_block::open_shm(ptr, size))
        {
            // requires a buffer at least 1 byte larger than size_type
            assert(block->usable_buffer_size > sizeof(size_type));
        }
    
        // prevent copying / moving
        lockfree_spsc_string_queue(lockfree_spsc_string_queue const&) = delete;
        lockfree_spsc_string_queue(lockfree_spsc_string_queue&&) = delete;
        lockfree_spsc_string_queue& operator=(lockfree_spsc_string_queue const&) = delete;
        lockfree_spsc_string_queue& operator=(lockfree_spsc_string_queue&&) = delete;
    
        // producer: tries to add `str` to the queue.
        // returns true if the string has been added to the queue.
        [[nodiscard]] inline bool push(std::string_view str) {
            std::size_t write_size = pad_size(sizeof(size_type) + str.size());
    
            // impossible to satisfy write (not enough space / insufficient size_type)
            if(write_size > max_possible_write_size() || str.size() >= size_type_max) [[unlikely]] {
                assert(write_size < max_possible_write_size());
                assert(str.size() < size_type_max);
                return false;
            }
    
            std::size_t write_off = block->write_offset.load(std::memory_order_relaxed);
            std::size_t read_off = block->read_offset.load(std::memory_order_acquire);
            
            std::size_t new_write_off = write_off;
            if(try_align_for_push(read_off, new_write_off, write_size)) {
                new_write_off = push_element(new_write_off, write_size, str);
                block->write_offset.store(new_write_off, std::memory_order_release);
                return true;
            }
    
            if(new_write_off != write_off) {
                block->write_offset.store(new_write_off, std::memory_order_release);
            }
    
            return false;
        }
    
        // consumer: discards the current element to be read (if there is one)
        // returns true if an element has been removed, false otherwise.
        [[nodiscard]] inline bool pop() {
            std::size_t read_off;
            std::size_t str_size;
            if(!read_element(read_off, str_size)) {
                return false;
            }
    
            std::size_t read_size = pad_size(sizeof(size_type) + str_size);
            std::size_t new_read_off = advance_offset(read_off, read_size);
            block->read_offset.store(new_read_off, std::memory_order_release);
    
            return true;
        }
    
        // consumer: returns the current element to be read (if there is one)
        // this does not remove the element from the queue.
        [[nodiscard]] inline std::optional<std::string_view> front() {
            std::size_t read_off;
            std::size_t str_size;
            if(!read_element(read_off, str_size)) {
                return std::nullopt;
            }
    
            // return string_view into buffer
            return std::string_view{
                reinterpret_cast<std::string_view::value_type*>(&block->buffer[read_off + sizeof(size_type)]),
                str_size
            };
        }
    
    private:
        // handles implicit and explicit wrap-around for the writer
        [[nodiscard]] inline bool try_align_for_push(
                std::size_t read_off,
                std::size_t& write_off,
                std::size_t write_size) {
            std::size_t cont_avail = max_avail_contiguous_write_size(write_off, read_off);
    
            // there is enough contiguous space in the buffer to push the string in one go
            if(write_size <= cont_avail) {
               return true;
            }
    
            // not enough contiguous space in the buffer.
            // check if the element could fit contiguously into
            // the buffer at the current write_offset.
            std::size_t write_off_to_end = block->usable_buffer_size - write_off;
            if(write_size <= write_off_to_end) {
                // element could fit at current position, but the reader would need
                // to consume more elements first
                // -> do nothing
                return false;
            }
    
            // element can NOT fit contiguously at current write_offset
            // -> we need a wrap-around
            std::size_t avail = max_avail_write_size(write_off, read_off);
    
            // not enough space for a wrap-around marker
            // -> implicit wrap-around
            if(write_off_to_end < sizeof(size_type)) {
                // the read marker has advanced far enough
                // that we can perform a wrap-around and try again.
                if(avail >= write_off_to_end) {
                    write_off = 0;
                    return try_align_for_push(read_off, write_off, write_size);
                }
    
                // reader must first read more elements
                return false;
            }
    
            // explicit wrap-around
            if(avail >= write_off_to_end) {
                std::memcpy(&block->buffer[write_off], &size_type_max, sizeof(size_type));
                write_off = 0;
                return try_align_for_push(read_off, write_off, write_size);
            }
    
            // explicit wrap-around not possible
            // (reader must advance first)
            return false;
        }
    
        // writes the element into the buffer at the provided offset
        // and calculates new write_offset
        [[nodiscard]] inline std::size_t push_element(
                std::size_t write_off,
                std::size_t write_size,
                std::string_view str) {
            // write size + string into buffer
            size_type size = static_cast<size_type>(str.size());
            std::memcpy(&block->buffer[write_off], &size, sizeof(size_type));
            std::memcpy(&block->buffer[write_off + sizeof(size_type)], str.data(), str.size());
    
            // calculate new write_offset
            return advance_offset(write_off, write_size);
        }
    
        // returns true if there is an element that can be read (and sets read_off & str_size)
        // returns false otherwise.
        // internally handles implicit and explicit wrap-around. 
        [[nodiscard]] inline bool read_element(std::size_t& read_off, std::size_t& str_size) {
            std::size_t write_off = block->write_offset.load(std::memory_order_acquire);
            std::size_t orig_read_off = block->read_offset.load(std::memory_order_relaxed);
            read_off = orig_read_off;
            str_size = 0;
    
            if(read_off == write_off) {
                return false;
            }
    
            // remaining space would be insufficient for a size_type
            // -> implicit wrap-around
            if(block->usable_buffer_size - read_off < sizeof(size_type)) {
                read_off = 0;
                if(read_off == write_off) {
                    block->read_offset.store(read_off, std::memory_order_release);
                    return false;
                }
            }
    
            size_type size;
            std::memcpy(&size, &block->buffer[read_off], sizeof(size_type));
    
            // wrap-around marker
            // -> explicit wrap-around
            if(size == size_type_max) {
                read_off = 0;
                if(read_off == write_off) {
                    block->read_offset.store(read_off, std::memory_order_release);
                    return false;
                }
                
                std::memcpy(&size, &block->buffer[read_off], sizeof(size_type));
            }
    
            // modified read_off -> store
            if(read_off != orig_read_off) {
                block->read_offset.store(read_off, std::memory_order_release);
            }
    
            str_size = size;
            return true;
        }
    
        // the maximum number of contiguous bytes we are currently able
        // to fit within the memory block (without wrapping around)
        [[nodiscard]] inline std::size_t max_avail_contiguous_write_size(
                std::size_t write_off,
                std::size_t read_off) {
            if(write_off >= read_off) {
                std::size_t ret = block->usable_buffer_size - write_off;
                ret -= read_off == 0 ?  1 : 0;
                return ret;
            }
    
            // write_off < read_off
            return read_off - write_off - 1;
        }
    
        // the maximum number of bytes we are currently able
        // to fit within the memory block (might include a wrap-around)
        [[nodiscard]] inline std::size_t max_avail_write_size(std::size_t write_off, std::size_t read_off) {
            std::size_t avail = read_off - write_off - 1;
            if (write_off >= read_off)
                avail += block->usable_buffer_size;
            return avail;
        }
    
        // the largest possible size an element could be and still
        // fit within the memory block.
        [[nodiscard]] inline std::size_t max_possible_write_size() {
            return block->usable_buffer_size - 1;
        }
    
        // pads a given size to be a multiple of the template parameter padding
        [[nodiscard]] inline std::size_t pad_size(std::size_t size) {
            if(size % padding != 0) {
                size += padding - size % padding;
            }
            return size;
        }
    
        // advances offset and wraps around if required
        [[nodiscard]] inline std::size_t advance_offset(std::size_t offset, std::size_t element_size) {
            std::size_t new_offset = offset + element_size;
           
            // wrap-around
            if(new_offset >= block->usable_buffer_size) {
                new_offset -= block->usable_buffer_size;
            }
    
            return new_offset;
        }
    
    private:
        spsc_shm_block* block;
    };