I am looking for something similar to SHM (SHared Memory) SPSC queue setup offered by boost::lockfree::spsc_queue
and boost::interprocess
but without allocating strings and storing them flat i.e. next to each other for maximum efficiency.
If I understand correctly that setup stores strings offset in the queue and allocates memory for the string somewhere else in the SHM.
Queue design can be:
| size | string 1 | size | string 2 | size | string 3 | ...
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
SHM segment
in a circular buffer fashion. Idea:
struct Writer {
std::byte *shm;
void write(std::string_view str) {
// write size
const uint32_t sz = str.size();
std::memcpy(shm, &sz, sizeof(sz));
shm += sizeof(sz);
// write string
std::memcpy(shm, str.data(), sz);
shm += sz;
}
};
It is possible, although you'll have to deal with a bunch of extra edge-cases.
Given the information you provided i'm going to assume you want the following properties for the single-producer, single-consumer (spsc) queue:
boost::lockfree::spsc_queue
)boost::lockfree::spsc_queue
)boost::lockfree::spsc_queue
, but not like most stuff in boost::interprocess
(e.g. boost::interprocess::message_queue
utilizes mutexes))Depending on your requirements there could be a few alternative options:
Fixed-length strings:
Like @John Zwinck suggested in his answer you could use a fixed-length string buffer.
The trade-off would be that your maximum string length is bounded by this size, and - depending on your expected variation of possible string sizes - might result in a lot of unused buffer space.
If you go this route i'd recommend you to use boost::static_string
- it's essentially a std::string
with the dynamic allocation stuff removed and solely relying on its internal buffer.
i.e. boost::lockfree::spsc_queue<boost::static_string<N>>
, where N
is the maximum size for the string values.
Only store pointers in the queue and allocate the strings separately:
If you're already using boost::interprocess
you could use a boost::interprocess::basic_string
with a boost::interprocess::allocator
that allocates the string separately in the same shared memory region.
Here's a answer that contains a full example of this approach (it even uses boost::lockfree::spsc_queue
) (direct link to code example)
The trade-off in this case is that the strings will be stored somewhere outside the spsc queue (but still within the same shared memory region).
If your strings are relatively long this might even be faster than storing the strings directly within the queue (the ring buffer can be alot smaller if it only needs to store pointers, and therefore would have a much better cache-locality).
(cache locality won't help in this case - see this excellent comment from @Peter Cordes)
A ringbuffer with fixed-size elements essentially splits the raw buffer into element-sized slots that are contiguous within the buffer, e.g.:
/---------------------- buffer ----------------------\
/ \
+----------+----------+----------+----------+----------+
| Object 1 | Object 2 | Object 3 | ... | Object N |
+----------+----------+----------+----------+----------+
This automatically ensures that all objects within the buffer are contiguous. i.e. you never have to deal with a wrapped-around object:
/---------------------- buffer ----------------------\
/ \
+-----+----------+----------+----------+----------+-----+
|ct 1 | | | | | Obje|
+-----+----------+----------+----------+----------+-----+
If the element-size is not fixed however, we do have to handle this case somehow.
Assume for example an empty 8-byte ringbuffer with the read and write pointer on the 7th byte (the buffer is currently empty):
/--------- read pointer
v
+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+----+----+----+----+----+----+----+----+
| | | | | | | | |
+----+----+----+----+----+----+----+----+
^
\---------- write pointer
If we now attempt to write "bar"
into the buffer (prefixed by it's length), we would get a wrapped-around string:
/--------- read pointer
v
+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+----+----+----+----+----+----+----+----+
| a | r | | | | | 03 | b |
+----+----+----+----+----+----+----+----+
^
\----------------------------- write pointer
There are 2 ways to deal with this:
The 1st option would be the easiest to implement, but it would be quite cumbersome to use for the consumer of the queue, because it needs to deal with 2 separate pointers + sizes that in combination represent the string. i.e. a potential interface for this could be:
class spsc_queue {
// ...
bool push(const char* str, std::size_t size);
bool read(
const char*& str1, std::size_t& size1,
const char*& str2, std::size_t& size2
);
bool pop();
// ...
};
// Usage example (assuming the state from "bar" example above)
const char* str1;
std::size_t size1;
const char* str2;
std::size_t size2;
if(queue.read(str1, size1, str2, size2)) {
// str1 would point to buffer[7] ("b")
// size1 would be 1
// str2 would point to buffer[0] ("ar")
// size2 would be 2
queue.pop();
}
Having to deal with 2 pointers and 2 sizes all the time for the odd case of a wrapped-around write is not the best solution imho, so i went with option 2:
The alternative option would be to prevent wrap-arounds from occuring in the first place.
A simple way to achieve this is to add a special "wrap-around" marker that tells the reader to immediately jump back to the beginning of the buffer (and therefore skip over all bytes after the wrap-around marker)
Example writing "bar": (WA
represents a wrap-around marker)
/--------- read pointer
v
+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+----+----+----+----+----+----+----+----+
| 03 | b | a | r | | | WA | |
+----+----+----+----+----+----+----+----+
^
\------------------- write pointer
So once the reader tries to read the next element it'll encounter the wrap-around marker. This instructs it to directly go back to index 0, where the next element is located:
/--------------------------------------- read pointer
v
+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+----+----+----+----+----+----+----+----+
| 03 | b | a | r | | | WA | |
+----+----+----+----+----+----+----+----+
^
\------------------- write pointer
This technique allows all strings to be stored contiguously within the ring buffer - with the small trade-off that the end of the buffer might not be fully utilized and a couple extra branches in the code.
For this answer i chose the wrap-around marker approach.
Another problem comes up once you want a string-size that's above 255 - at that point the size needs to be larger than 1 byte.
Assume we use a 2 byte-length and write "foo12" (length 5) into the ring buffer:
/--------------------------------------- read pointer
v
+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+----+----+----+----+----+----+----+----+
| 05 | 00 | f | o | o | 1 | 2 | |
+----+----+----+----+----+----+----+----+
^
\---- write pointer
so far so good, but as soon as the read pointer catches up we have a problem:
there is only a single byte left to write before we need to wrap around, which is not enough to fit a 2-byte length!
So we would need to wrap-around the length on the next write (writing "foo" (length 3) into the ringbuffer):
/---- read pointer
v
+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+----+----+----+----+----+----+----+----+
| 00 | f | o | o | | | | 03 |
+----+----+----+----+----+----+----+----+
^
\------------------- write pointer
There are three potential ways this could be resolved:
/---- read pointer
v
+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+----+----+----+----+----+----+----+----+
| 03 | 00 | f | o | o | | | WA |
+----+----+----+----+----+----+----+----+
^
\-------------- write pointer
The downside with this approach is that it's rather difficult to differentiate between a wrap-around marker and an actual string-size. We also would need to probe the first byte of each length first to check if it's a wrap-around marker before reading the full length integer. /---- read pointer
v
+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+----+----+----+----+----+----+----+----+
| 03 | 00 | f | o | o | | | |
+----+----+----+----+----+----+----+----+
^
\-------------- write pointer
Depending on how fast your reader is in comparison to your writer you might have a bit of destructive interference within the ring buffer.
If for example your L1 cache line size is 128 bytes, using 1-byte lengths for the strings in the ring-buffer, and only pushing length 1 strings, e.g.:
/--------------------------------------- read pointer
v
+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+----+----+----+----+----+----+----+----+
| 01 | a | 01 | b | 01 | c | | | ...
+----+----+----+----+----+----+----+----+
^
\--------- write pointer
Then this would result in +/- 64 string entries being stored on the same cache line, which are continously written by the producer while they get read from the consumer => a lot of potential interference.
This can be prevented by padding the strings within the ring buffer to a multiple of your cache line size (in C++ available as std::hardware_destructive_interference_size
)
i.e. strings padded to 4-bytes:
/--------------------------------------- read pointer
v
+----+----+----+----+----+----+----+----+----+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |
+----+----+----+----+----+----+----+----+----+
| 01 | a | | | 01 | c | | | | ...
+----+----+----+----+----+----+----+----+----+
^
\--------- write pointer
The trade-off here is of course that this will potentially waste a lot of space within the ring buffer.
So you'll have to profile how much padding you want for your string values.
The padding value you choose should be between those two values:
1 <= N <= std::hardware_destructive_interference_size
std::hardware_destructive_interference_size
=> worst space utilization, no potential interferenceThis is a full implementation of a wait-free, string only, spsc fifo queue - based on the design considerations listed above.
I've only implemented the bare minimum interface required, but you can easily create all the utility functions boost::lockfree::spsc_queue
provides around those 3 core functions:
template<std::unsigned_integral size_type, std::size_t padding = 1>
class lockfree_spsc_string_queue {
public:
lockfree_spsc_string_queue(void* buffer, std::size_t buffer_size, bool init = false);
// producer:
// tries to push the given string into the queue
// returns true if adding the string was successfull
// (contents will be copied into the ringbuffer)
bool push(std::string_view str);
// consumer:
// reads the next element in the queue.
// returns an empty optional if the queue is empty,
// otherwise a string_view that points to the string
// within the ringbuffer.
// Does NOT remove the element from the queue.
std::optional<std::string_view> front();
// Removes the next element from the queue.
// Returns true if an element has been removed.
bool pop();
};
size_type
is the integral type that is used to store the length of the strings within the buffer, i.e. if you use unsigned char
each string could be at most 254 bytes in length (with unsigned short
(assuming 2 bytes) it would be 65534, etc...) (the maximum value is used as a wrap-around marker)padding
is the alignment that's used for the string values. If it is set to 1 then strings will be packed as tightly as possible into the ring buffer (best space utilization). If you set it to std::hardware_destructive_interference_size
then there will be no interference between different string values in the ring buffer, at the cost of space utilization.Usage example: godbolt
void producer() {
lockfree_spsc_string_queue<unsigned short> queue(BUFFER_POINTER, BUFFER_SIZE, true);
while(true) {
// retry until successful
while(!queue.push("foobar"));
}
}
void consumer() {
lockfree_spsc_string_queue<unsigned short> queue(BUFFER_POINTER, BUFFER_SIZE);
while(true) {
std::optional<std::string_view> result;
// retry until successful
while(!result) result = queue.front();
std::cout << *result << std::endl;
bool pop_result = queue.pop();
assert(pop_result);
}
}
boost::lockfree::spsc_queue::consume_all
e.g. could be implemented like this (in terms of the 3 functions provided by this minimal implementation):
template<class Functor>
std::size_t consume_all(Functor&& f) {
std::size_t cnt = 0;
for(auto el = front(); el; el = front()) {
std::forward<Functor>(f)(*el);
bool pop_result = pop();
assert(pop_result);
++cnt;
}
return cnt;
}
Full implementation: godbolt
// wait-free single-producer, single consumer fifo queue
//
// The constructor must be called once with `init = true` for a specific region.
// After the construction of the queue with `init = true` has succeeded additional instances
// can be created for the region by passing `init = false`.
template<
std::unsigned_integral size_type,
std::size_t padding = 1>
class lockfree_spsc_string_queue {
// we use the max value of size_type as a special marker
// to indicate that the writer needed to wrap-around early to accommodate a string value.
// this means the maximum size a string entry can be is `size_type_max - 1`.
static constexpr size_type size_type_max = std::numeric_limits<size_type>::max();
// calculates the padding necessary that is required after
// a T member to align the next member onto the next cache line.
template<class T>
static constexpr std::size_t padding_to_next_cache_line =
std::hardware_destructive_interference_size -
sizeof(T) % std::hardware_destructive_interference_size;
public:
// internal struct that will be placed in the shared memory region
struct spsc_shm_block {
using atomic_size = std::atomic<std::size_t>;
// read head
atomic_size read_offset;
char pad1[padding_to_next_cache_line<atomic_size>];
// write head
atomic_size write_offset;
char pad2[padding_to_next_cache_line<atomic_size>];
std::size_t usable_buffer_size;
char pad3[padding_to_next_cache_line<std::size_t>];
// actual data
std::byte buffer[];
[[nodiscard]] static inline spsc_shm_block* init_shm(void* ptr, std::size_t size) {
spsc_shm_block* block = open_shm(ptr, size);
// atomics *must* be lock-free, otherwise they won't work across process boundaries.
assert(block->read_offset.is_lock_free());
assert(block->write_offset.is_lock_free());
block->read_offset = 0;
block->write_offset = 0;
block->usable_buffer_size = size - offsetof(spsc_shm_block, buffer);
return block;
}
[[nodiscard]] static inline spsc_shm_block* open_shm(void* ptr, std::size_t size) {
// this type must be trivially copyable, otherwise we can't implicitly start its lifetime.
// It also needs to have a standard layout for offsetof.
static_assert(std::is_trivially_copyable_v<spsc_shm_block>);
static_assert(std::is_standard_layout_v<spsc_shm_block>);
// size must be at least as large as the header
assert(size >= sizeof(spsc_shm_block));
// ptr must be properly aligned for the header
assert(reinterpret_cast<std::uintptr_t>(ptr) % alignof(spsc_shm_block) == 0);
// implicitly start lifetime of spsc_shm_block
return std::launder(reinterpret_cast<spsc_shm_block*>(ptr));
}
};
public:
inline lockfree_spsc_string_queue(void* ptr, std::size_t size, bool init = false)
: block(init ? spsc_shm_block::init_shm(ptr, size) : spsc_shm_block::open_shm(ptr, size))
{
// requires a buffer at least 1 byte larger than size_type
assert(block->usable_buffer_size > sizeof(size_type));
}
// prevent copying / moving
lockfree_spsc_string_queue(lockfree_spsc_string_queue const&) = delete;
lockfree_spsc_string_queue(lockfree_spsc_string_queue&&) = delete;
lockfree_spsc_string_queue& operator=(lockfree_spsc_string_queue const&) = delete;
lockfree_spsc_string_queue& operator=(lockfree_spsc_string_queue&&) = delete;
// producer: tries to add `str` to the queue.
// returns true if the string has been added to the queue.
[[nodiscard]] inline bool push(std::string_view str) {
std::size_t write_size = pad_size(sizeof(size_type) + str.size());
// impossible to satisfy write (not enough space / insufficient size_type)
if(write_size > max_possible_write_size() || str.size() >= size_type_max) [[unlikely]] {
assert(write_size < max_possible_write_size());
assert(str.size() < size_type_max);
return false;
}
std::size_t write_off = block->write_offset.load(std::memory_order_relaxed);
std::size_t read_off = block->read_offset.load(std::memory_order_acquire);
std::size_t new_write_off = write_off;
if(try_align_for_push(read_off, new_write_off, write_size)) {
new_write_off = push_element(new_write_off, write_size, str);
block->write_offset.store(new_write_off, std::memory_order_release);
return true;
}
if(new_write_off != write_off) {
block->write_offset.store(new_write_off, std::memory_order_release);
}
return false;
}
// consumer: discards the current element to be read (if there is one)
// returns true if an element has been removed, false otherwise.
[[nodiscard]] inline bool pop() {
std::size_t read_off;
std::size_t str_size;
if(!read_element(read_off, str_size)) {
return false;
}
std::size_t read_size = pad_size(sizeof(size_type) + str_size);
std::size_t new_read_off = advance_offset(read_off, read_size);
block->read_offset.store(new_read_off, std::memory_order_release);
return true;
}
// consumer: returns the current element to be read (if there is one)
// this does not remove the element from the queue.
[[nodiscard]] inline std::optional<std::string_view> front() {
std::size_t read_off;
std::size_t str_size;
if(!read_element(read_off, str_size)) {
return std::nullopt;
}
// return string_view into buffer
return std::string_view{
reinterpret_cast<std::string_view::value_type*>(&block->buffer[read_off + sizeof(size_type)]),
str_size
};
}
private:
// handles implicit and explicit wrap-around for the writer
[[nodiscard]] inline bool try_align_for_push(
std::size_t read_off,
std::size_t& write_off,
std::size_t write_size) {
std::size_t cont_avail = max_avail_contiguous_write_size(write_off, read_off);
// there is enough contiguous space in the buffer to push the string in one go
if(write_size <= cont_avail) {
return true;
}
// not enough contiguous space in the buffer.
// check if the element could fit contiguously into
// the buffer at the current write_offset.
std::size_t write_off_to_end = block->usable_buffer_size - write_off;
if(write_size <= write_off_to_end) {
// element could fit at current position, but the reader would need
// to consume more elements first
// -> do nothing
return false;
}
// element can NOT fit contiguously at current write_offset
// -> we need a wrap-around
std::size_t avail = max_avail_write_size(write_off, read_off);
// not enough space for a wrap-around marker
// -> implicit wrap-around
if(write_off_to_end < sizeof(size_type)) {
// the read marker has advanced far enough
// that we can perform a wrap-around and try again.
if(avail >= write_off_to_end) {
write_off = 0;
return try_align_for_push(read_off, write_off, write_size);
}
// reader must first read more elements
return false;
}
// explicit wrap-around
if(avail >= write_off_to_end) {
std::memcpy(&block->buffer[write_off], &size_type_max, sizeof(size_type));
write_off = 0;
return try_align_for_push(read_off, write_off, write_size);
}
// explicit wrap-around not possible
// (reader must advance first)
return false;
}
// writes the element into the buffer at the provided offset
// and calculates new write_offset
[[nodiscard]] inline std::size_t push_element(
std::size_t write_off,
std::size_t write_size,
std::string_view str) {
// write size + string into buffer
size_type size = static_cast<size_type>(str.size());
std::memcpy(&block->buffer[write_off], &size, sizeof(size_type));
std::memcpy(&block->buffer[write_off + sizeof(size_type)], str.data(), str.size());
// calculate new write_offset
return advance_offset(write_off, write_size);
}
// returns true if there is an element that can be read (and sets read_off & str_size)
// returns false otherwise.
// internally handles implicit and explicit wrap-around.
[[nodiscard]] inline bool read_element(std::size_t& read_off, std::size_t& str_size) {
std::size_t write_off = block->write_offset.load(std::memory_order_acquire);
std::size_t orig_read_off = block->read_offset.load(std::memory_order_relaxed);
read_off = orig_read_off;
str_size = 0;
if(read_off == write_off) {
return false;
}
// remaining space would be insufficient for a size_type
// -> implicit wrap-around
if(block->usable_buffer_size - read_off < sizeof(size_type)) {
read_off = 0;
if(read_off == write_off) {
block->read_offset.store(read_off, std::memory_order_release);
return false;
}
}
size_type size;
std::memcpy(&size, &block->buffer[read_off], sizeof(size_type));
// wrap-around marker
// -> explicit wrap-around
if(size == size_type_max) {
read_off = 0;
if(read_off == write_off) {
block->read_offset.store(read_off, std::memory_order_release);
return false;
}
std::memcpy(&size, &block->buffer[read_off], sizeof(size_type));
}
// modified read_off -> store
if(read_off != orig_read_off) {
block->read_offset.store(read_off, std::memory_order_release);
}
str_size = size;
return true;
}
// the maximum number of contiguous bytes we are currently able
// to fit within the memory block (without wrapping around)
[[nodiscard]] inline std::size_t max_avail_contiguous_write_size(
std::size_t write_off,
std::size_t read_off) {
if(write_off >= read_off) {
std::size_t ret = block->usable_buffer_size - write_off;
ret -= read_off == 0 ? 1 : 0;
return ret;
}
// write_off < read_off
return read_off - write_off - 1;
}
// the maximum number of bytes we are currently able
// to fit within the memory block (might include a wrap-around)
[[nodiscard]] inline std::size_t max_avail_write_size(std::size_t write_off, std::size_t read_off) {
std::size_t avail = read_off - write_off - 1;
if (write_off >= read_off)
avail += block->usable_buffer_size;
return avail;
}
// the largest possible size an element could be and still
// fit within the memory block.
[[nodiscard]] inline std::size_t max_possible_write_size() {
return block->usable_buffer_size - 1;
}
// pads a given size to be a multiple of the template parameter padding
[[nodiscard]] inline std::size_t pad_size(std::size_t size) {
if(size % padding != 0) {
size += padding - size % padding;
}
return size;
}
// advances offset and wraps around if required
[[nodiscard]] inline std::size_t advance_offset(std::size_t offset, std::size_t element_size) {
std::size_t new_offset = offset + element_size;
// wrap-around
if(new_offset >= block->usable_buffer_size) {
new_offset -= block->usable_buffer_size;
}
return new_offset;
}
private:
spsc_shm_block* block;
};