c++asynchronousconcurrencystdatomic

Program using Atomic Variable gets stuck unexpectedly


I have a program utilizing a thread pool and atomic variables for synchronization within an event-driven architecture. However, it appears to get stuck unexpectedly under certain conditions, and I'm struggling to identify the root cause.

I want to realize a fence event acts as a synchronization point, blocking the event loop until all preceding events have completed their execution. This is achieved through the use of an atomic variable ntasks.

The ntasks variable serves to count the number of ongoing event tasks. It is incremented when an event begins execution (begin_event()) and decremented when an event completes (end_event()). If the count reaches zero, it signals the event loop to proceed.

Here's a simplified version of my code.

#include <cstdint>
#include <atomic>
#include <mutex>
#include <thread>
#include <queue>
#include <cstdio>
#include "ThreadPool.h"

enum event_t {
    update, render, fence = 0xffffffff
};

std::mutex queue_mutex;
std::queue<event_t> queue;
std::atomic<long long> ntasks;
ThreadPool thread_pool(4);

std::mutex loop_mutex;
std::condition_variable loop_cv;

void publish_event(event_t t) {
    std::unique_lock<std::mutex> lock(queue_mutex);
    queue.push(t);
}

bool try_get_event(event_t& x) {
    std::unique_lock<std::mutex> lock(queue_mutex);
    if (queue.empty())
        return false;
    x = queue.front();
    queue.pop();
    return true;
}

void onUpdate() {
    printf("%s", "Update!");
    publish_event(render);
    publish_event(fence); // Wait until render event finished
    publish_event(update); // Start the next pass
}

void onRender() {
    // Pass
}

// Called before event execution start
void begin_event()
{
    ntasks.fetch_add(1, std::memory_order_release); // Increase the count of excution tasks
}

void end_event()
{
    if (ntasks.fetch_sub(1, std::memory_order_acq_rel) == 1) // Decrease the count of excution tasks
        loop_cv.notify_one(); // If there are no more tasks, we can notify to release the fence
}


int main() {
    publish_event(update); // Initial event to start the event loop

    // Event Loop
    while (true) {
        event_t event;
        if (try_get_event(event)) {

            switch (event) {
            case update:
                begin_event();
                thread_pool.enqueue([] { onUpdate(); end_event(); });
                break;
            case render:
                begin_event();
                thread_pool.enqueue([] { onRender(); end_event(); });
                break;
            case fence:
            {
                std::unique_lock<std::mutex> lock(loop_mutex);
                // Wait until there are no more tasks
                loop_cv.wait(lock, [] { return ntasks.load(std::memory_order_acquire) <= 0; });
            }
                break;
            }
        }
    }
}

The weirdest, if I use debugger or add some output statements, the issue is not reproducible.

It seems to be getting stuck at the loopCv.wait() point. This is because when I replace it with a spin lock, the issue disappears.

case fence:
    while (ntasks.load(std::memory_order_acquire) > 0);

The thread pool used is from: https://github.com/progschj/ThreadPool

The platform I used is MSVC compiler, Windows system and Intel i7-10750H CPU.

Despite these mechanisms in place, the program encounters unexpected blocking issues, possibly due to synchronization or deadlock problems.

Could someone review my code and provide insights into why it might get stuck unexpectedly? Additionally, any suggestions or alternative approaches to achieve similar functionality?


Solution

  • You forget to get the lock of loop_mutex before notify the event loop.