c++concurrencyqueuelock-free

How does a new node get allocated in push()in Anthony Williams' lock free queue (chapter 7)


I am currently studying Anthony Williams' C++ concurrency in action (2E) and I have a lot of technical question but so far this one has been the weirdest even when it works.

#pragma once


template<typename T>
class CLockFreeQueue
{
private:
    struct node; 
    struct counted_node_ptr 
    {
        int external_count;
        node* ptr; //This points to the actual nodes. 
    };

    std::atomic<counted_node_ptr> mHead; 
    std::atomic<counted_node_ptr> mTail; 

    struct node_counter
    {
        unsigned internal_count : 30; 
        unsigned external_counters : 2;
    };

    //This is the node that holds make up the list
    struct node
    {
        std::atomic<T*> data; 
        std::atomic<node_counter> count; 
        std::atomic<counted_node_ptr> next; 

        
        node()
        {
            node_counter new_count;
            new_count.internal_count = 0;
            new_count.external_counters = 2;
            count.store(new_count);

            counted_node_ptr dummy = { 0 };
            next.store(dummy);
            
            data.store(nullptr);
        }

        void release_ref()
        {
            node_counter old_counter =
                count.load(std::memory_order_relaxed);
            node_counter new_counter;
            do
            {
                new_counter = old_counter; 
                --new_counter.internal_count;
            } while (!count.compare_exchange_strong(old_counter, new_counter,
                                                    std::memory_order_acquire, std::memory_order_relaxed));

            //If both values are zero, we can delete
            if (!new_counter.internal_count && 
                !new_counter.external_counters)
            {
                delete this;
            }
        }
    };

    static void increase_external_count(
        std::atomic<counted_node_ptr>& counter,
        counted_node_ptr& old_counter)
    {
        counted_node_ptr new_counter;
        do
        {
            new_counter = old_counter;
            ++new_counter.external_count;
        } while (!counter.compare_exchange_strong(
            old_counter, new_counter,
            std::memory_order_acquire, std::memory_order_relaxed));
        old_counter.external_count = new_counter.external_count;
    }

    static void free_external_counter(counted_node_ptr& old_node_ptr)
    {
        node* const ptr = old_node_ptr.ptr;
        int const count_increase = old_node_ptr.external_count - 2;
        node_counter old_counter =
            ptr->count.load(std::memory_order_relaxed);
        node_counter new_counter;
        do
        {
            new_counter = old_counter;
            --new_counter.external_counters;
            new_counter.internal_count += count_increase;
        } while (!ptr->count.compare_exchange_strong(
            old_counter, new_counter,
            std::memory_order_acquire, std::memory_order_relaxed));
        if (!new_counter.internal_count &&
            !new_counter.external_counters)
        {
            delete ptr;
        }
    }

    void set_new_tail(counted_node_ptr& old_tail,
        counted_node_ptr const& new_tail)
    {
        node* const current_tail_ptr = old_tail.ptr;
        while (!mTail.compare_exchange_weak(old_tail, new_tail) &&
            old_tail.ptr == current_tail_ptr);
        if (old_tail.ptr == current_tail_ptr)
            free_external_counter(old_tail);
        else
            current_tail_ptr->release_ref();
    }
public:
    CLockFreeQueue() {
        counted_node_ptr h;
        h.ptr = new node;
        h.external_count = 1;

        mHead.store(h);
        mTail.store(mHead.load());
    }

    CLockFreeQueue(const CLockFreeQueue&) = delete;
    CLockFreeQueue& operator=(const CLockFreeQueue&) = delete;

    ~CLockFreeQueue() {
        while (pop());
    }

    void push(T new_value)
    {
        std::unique_ptr<T> new_data(new T(new_value));
        counted_node_ptr new_next;
        new_next.ptr = new node;
        new_next.external_count = 1;
        counted_node_ptr old_tail = mTail.load();
        for (;;)
        {
            increase_external_count(mTail, old_tail);
            T* old_data = nullptr;
            if (old_tail.ptr->data.compare_exchange_strong(
                old_data, new_data.get())) //if successfully changed the old_tail.ptr->data value atomically 
            {
                counted_node_ptr old_next = { 0 };
                if (!old_tail.ptr->next.compare_exchange_strong(
                    old_next, new_next)) 
                { 
                    delete new_next.ptr;
                    new_next = old_next; 
                }

              
                set_new_tail(old_tail, new_next);
                new_data.release();
                break;
            }
            else 
            {
                counted_node_ptr old_next = { 0 };
                if (old_tail.ptr->next.compare_exchange_strong(
                    old_next, new_next)) 
                { 

                    old_next = new_next;
                    new_next.ptr = new node; 
                                       
                } 
         
                set_new_tail(old_tail, old_next);
            }
        }
    }

    std::unique_ptr<T> pop()
    {
        counted_node_ptr old_head = mHead.load(std::memory_order_relaxed);
        for (;;)
        {
            increase_external_count(mHead, old_head);
            node* const ptr = old_head.ptr;
            if (ptr == mTail.load().ptr) //If the list is empty
            {
                ptr->release_ref();
                return std::unique_ptr<T>();
            }
            counted_node_ptr next = ptr->next.load();
            if (mHead.compare_exchange_strong(old_head, next))
            {
                T* const res = ptr->data.exchange(nullptr);
                free_external_counter(old_head);
                return std::unique_ptr<T>(res);
            }
            ptr->release_ref();
        }
    }
};

and right now I managed to get his final lock free queue working but something puzzles me - how does it get a new counted_node_ptr in push()?

    counted_node_ptr new_next;
    new_next.ptr = new node;
    new_next.external_count = 1;

I mean it looks like each node is encapsulated with a counted_node_ptr but in push() he seems to be using the stack allocation in the function to create a new counted_node_ptr (new_next) before having the tail exchanged with it using

          if (!old_tail.ptr->next.compare_exchange_strong(
                old_next, new_next))

Wouldn't new_next be invalid(deleted) when push() exits? Does compare_exchange_strong create a new instance of it? This has me curious.

My code works, I tested (not sure if there are memory leaks) but this one has me puzzled.

I know this is probably a mundane question especially since it is working but I am just curious. I am of the mind that if you are adding something to a dynamic queue, you should allocate each new node on the dynamic heap instead of the stack.

How does the new_next (counted_node_ptr) get allocated and maintained in the queue when the push() function exits since it appears to be stacked allocated.


Solution

  • From the definition of counted_node_ptr:

    struct counted_node_ptr 
    {
        int external_count;
        node* ptr; //This points to the actual nodes. 
    };
    

    This is just an int and a pointer, so copying it around is cheap and easy. Importantly, destroying a counted_node_ptr will not cause ptr to be deleted.

    When compare_exchange_strong is called, it could cause new_next to be copied to old_tail.ptr->next. This would just require copying the int and pointer members across.

    I am of the mind that if you are adding something to a dynamic queue, you should allocate each new node on the dynamic heap instead of the stack.

    The node is being dynamically allocated here:

    new_next.ptr = new node;
    

    new_next just associates that dynamic allocation with a count.