I am currently studying Anthony Williams' C++ concurrency in action (2E) and I have a lot of technical question but so far this one has been the weirdest even when it works.
#pragma once
template<typename T>
class CLockFreeQueue
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr; //This points to the actual nodes.
};
std::atomic<counted_node_ptr> mHead;
std::atomic<counted_node_ptr> mTail;
struct node_counter
{
unsigned internal_count : 30;
unsigned external_counters : 2;
};
//This is the node that holds make up the list
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next;
node()
{
node_counter new_count;
new_count.internal_count = 0;
new_count.external_counters = 2;
count.store(new_count);
counted_node_ptr dummy = { 0 };
next.store(dummy);
data.store(nullptr);
}
void release_ref()
{
node_counter old_counter =
count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.internal_count;
} while (!count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
//If both values are zero, we can delete
if (!new_counter.internal_count &&
!new_counter.external_counters)
{
delete this;
}
}
};
static void increase_external_count(
std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter = old_counter;
++new_counter.external_count;
} while (!counter.compare_exchange_strong(
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
static void free_external_counter(counted_node_ptr& old_node_ptr)
{
node* const ptr = old_node_ptr.ptr;
int const count_increase = old_node_ptr.external_count - 2;
node_counter old_counter =
ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.external_counters;
new_counter.internal_count += count_increase;
} while (!ptr->count.compare_exchange_strong(
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
if (!new_counter.internal_count &&
!new_counter.external_counters)
{
delete ptr;
}
}
void set_new_tail(counted_node_ptr& old_tail,
counted_node_ptr const& new_tail)
{
node* const current_tail_ptr = old_tail.ptr;
while (!mTail.compare_exchange_weak(old_tail, new_tail) &&
old_tail.ptr == current_tail_ptr);
if (old_tail.ptr == current_tail_ptr)
free_external_counter(old_tail);
else
current_tail_ptr->release_ref();
}
public:
CLockFreeQueue() {
counted_node_ptr h;
h.ptr = new node;
h.external_count = 1;
mHead.store(h);
mTail.store(mHead.load());
}
CLockFreeQueue(const CLockFreeQueue&) = delete;
CLockFreeQueue& operator=(const CLockFreeQueue&) = delete;
~CLockFreeQueue() {
while (pop());
}
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = mTail.load();
for (;;)
{
increase_external_count(mTail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(
old_data, new_data.get())) //if successfully changed the old_tail.ptr->data value atomically
{
counted_node_ptr old_next = { 0 };
if (!old_tail.ptr->next.compare_exchange_strong(
old_next, new_next))
{
delete new_next.ptr;
new_next = old_next;
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
}
else
{
counted_node_ptr old_next = { 0 };
if (old_tail.ptr->next.compare_exchange_strong(
old_next, new_next))
{
old_next = new_next;
new_next.ptr = new node;
}
set_new_tail(old_tail, old_next);
}
}
}
std::unique_ptr<T> pop()
{
counted_node_ptr old_head = mHead.load(std::memory_order_relaxed);
for (;;)
{
increase_external_count(mHead, old_head);
node* const ptr = old_head.ptr;
if (ptr == mTail.load().ptr) //If the list is empty
{
ptr->release_ref();
return std::unique_ptr<T>();
}
counted_node_ptr next = ptr->next.load();
if (mHead.compare_exchange_strong(old_head, next))
{
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};
and right now I managed to get his final lock free queue working but something puzzles me - how does it get a new counted_node_ptr in push()?
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
I mean it looks like each node is encapsulated with a counted_node_ptr but in push() he seems to be using the stack allocation in the function to create a new counted_node_ptr (new_next) before having the tail exchanged with it using
if (!old_tail.ptr->next.compare_exchange_strong(
old_next, new_next))
Wouldn't new_next be invalid(deleted) when push() exits? Does compare_exchange_strong create a new instance of it? This has me curious.
My code works, I tested (not sure if there are memory leaks) but this one has me puzzled.
I know this is probably a mundane question especially since it is working but I am just curious. I am of the mind that if you are adding something to a dynamic queue, you should allocate each new node on the dynamic heap instead of the stack.
How does the new_next (counted_node_ptr) get allocated and maintained in the queue when the push() function exits since it appears to be stacked allocated.
From the definition of counted_node_ptr
:
struct counted_node_ptr
{
int external_count;
node* ptr; //This points to the actual nodes.
};
This is just an int
and a pointer, so copying it around is cheap and easy. Importantly, destroying a counted_node_ptr
will not cause ptr
to be deleted.
When compare_exchange_strong
is called, it could cause new_next
to be copied to old_tail.ptr->next
. This would just require copying the int
and pointer members across.
I am of the mind that if you are adding something to a dynamic queue, you should allocate each new node on the dynamic heap instead of the stack.
The node is being dynamically allocated here:
new_next.ptr = new node;
new_next
just associates that dynamic allocation with a count.