c++multithreadingqtaccess-violation

Access Violation Error in Multithreaded Disk Scanning with C++


I am trying to use multithreading to scan the system disk and build a Trie tree with all the file paths. However, I encountered an issue during testing.

In my main.cpp, the program runs correctly when the loop index i is 2, 4, or 6. However, when i is 8 or 10 (sometimes i = 8 works), the program crashes with an access violation error at the line if (m_tasks.empty()) return false; in the WorkStealQueue::pop function, with the message "Access violation reading location 0x00000014".

I have tried various approaches to resolve this issue but have been unsuccessful. Any help or guidance to resolve this problem?

Below is the detailed code implementation(My operating platform is Windows.):

// workstealthreadpool.h
#pragma once

#include <deque>
#include <functional>
#include <mutex>
#include <thread>
#include <future>
#include <vector>

class WorkStealQueue
{
public:
    WorkStealQueue() = default;
    WorkStealQueue(const WorkStealQueue& rhs) = delete;
    WorkStealQueue& operator=(const WorkStealQueue& rhs) = delete;
    ~WorkStealQueue() = default;

    void push(std::function<void()> task);
    bool pop(std::function<void()>& task);
    bool steal(std::function<void()>& task);

private:
    std::deque<std::function<void()>> m_tasks;
    std::mutex m_mutex;
};

class WorkStealThreadPool
{
public:
    explicit WorkStealThreadPool(std::size_t threadNums)
        : m_stop(false) { init(threadNums); }
    ~WorkStealThreadPool();

    template<typename Callback, typename... Args>
    auto addTask(Callback&& func, Args&&... args)->std::future<typename std::result_of<Callback(Args...)>::type>;

private:
    void init(std::size_t threadNums);
    bool stealTask(std::function<void()>& task);
    void worker(size_t index);

private:
    std::vector<std::thread> m_workThreads;
    std::vector<std::unique_ptr<WorkStealQueue>> m_taskQueues;
    std::atomic<bool> m_stop;
    static thread_local std::size_t m_index;
};

template<typename Callback, typename... Args>
auto WorkStealThreadPool::addTask(Callback&& func, Args&&... args) -> std::future<typename std::result_of<Callback(Args...)>::type>
{
    using returnType = typename std::result_of<Callback(Args...)>::type;
    auto task = std::make_shared<std::packaged_task<returnType()>>(std::bind(std::forward<Callback>(func), std::forward<Args>(args)...));
    std::future<returnType> result = task->get_future();
    {
        m_taskQueues[m_index]->push([task]() { (*task)(); });
    }
    return result;
}
// workstealthreadpool.cpp
#include "workstealthreadpool.h"

void WorkStealQueue::push(std::function<void()> task)
{
    std::lock_guard<std::mutex> lock(m_mutex);
    m_tasks.emplace_back(std::move(task));
}

bool WorkStealQueue::pop(std::function<void()>& task)
{
    std::lock_guard<std::mutex> lock(m_mutex);
    if (m_tasks.empty()) return false;
    task = std::move(m_tasks.front());
    m_tasks.pop_front();
    return true;
}

bool WorkStealQueue::steal(std::function<void()>& task)
{
    std::lock_guard<std::mutex> lock(m_mutex);
    if (m_tasks.empty()) return false;
    task = std::move(m_tasks.back());
    m_tasks.pop_back();
    return true;
}

thread_local std::size_t WorkStealThreadPool::m_index;

void WorkStealThreadPool::init(std::size_t threadNums)
{
    for (std::size_t i = 0; i < threadNums; ++i)
    {
        m_taskQueues.emplace_back(std::make_unique<WorkStealQueue>());
        m_workThreads.emplace_back(&WorkStealThreadPool::worker, this, i);
    }
}

WorkStealThreadPool::~WorkStealThreadPool()
{
    m_stop = true;
    for (std::thread& workerThread : m_workThreads)
    {
        if (workerThread.joinable())
        {
            workerThread.join();
        }
    }
}

bool WorkStealThreadPool::stealTask(std::function<void()>& task)
{
    for (std::size_t i = 0; i < m_taskQueues.size(); ++i)
    {
        std::size_t index = (m_index + i + 1) % m_taskQueues.size();
        if (m_taskQueues[index]->steal(task))
        {
            return true;
        }
    }
    return false;
}

void WorkStealThreadPool::worker(std::size_t index)
{
    m_index = index;
    while (!m_stop)
    {
        std::function<void()> task;
        if (m_taskQueues[m_index]->pop(task) || stealTask(task))
        {
            task();
        }
        else
        {
            std::this_thread::yield();
        }
    }
}
// scanner.h
#pragma once

#include "workstealthreadpool.h"
#include <QMap>
#include <QString>

struct TrieNode
{
    QMap<TrieNode*, QString> childs;
    TrieNode* parent = nullptr;
};

class Scanner
{
public:
    explicit Scanner(std::size_t threadNums);
    virtual ~Scanner();
    virtual void scanDrives(const QStringList& drives);
    virtual bool isScanCompleted();
    virtual std::vector<TrieNode*> fetchScanResults();

private:
    void scanCore(const QString& currentPath, TrieNode* parent);
    void clearTrie(TrieNode* root);

private:
    TrieNode* m_root;
    WorkStealThreadPool* m_threadPool;
    std::mutex m_mutex;
    std::vector<TrieNode*> m_fileNodes;
    std::atomic<int> m_taskCount;
};

#ifdef _DEBUG
void Print(TrieNode* root);
void Print(std::vector<TrieNode*>* fileNodes);
#endif
// scanner.cpp
#include "scanner.h"
#include <QDir>

Scanner::Scanner(std::size_t threadNums)
    : m_root(nullptr)
    , m_threadPool(nullptr)
    , m_taskCount(0)
{
    if (threadNums != 0)
    {
        m_threadPool = new WorkStealThreadPool(threadNums);
    }
}

Scanner::~Scanner()
{
    delete m_threadPool;
    clearTrie(m_root);
}

void Scanner::scanDrives(const QStringList& drives)
{
    clearTrie(m_root);
    m_fileNodes.clear();
    m_root = new TrieNode();
    for (const QString& drive : drives)
    {
        TrieNode* child = new TrieNode();
        child->parent = m_root;
        m_root->childs[child] = drive;
        scanCore(drive, child);
    }
}

bool Scanner::isScanCompleted()
{
    return m_taskCount.load(std::memory_order_acquire) == 0;
}

std::vector<TrieNode*> Scanner::fetchScanResults()
{
    if (!isScanCompleted())
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        return m_fileNodes;
    }
    return m_fileNodes;
}

void Scanner::scanCore(const QString& currentPath, TrieNode* parent)
{
    QDir dir(currentPath);
    if (!dir.exists()) return;

    QStringList fileNames = dir.entryList(QDir::Files);
    for (const QString& fileName : fileNames)
    {
        TrieNode* child = new TrieNode();
        child->parent = parent;
        parent->childs[child] = fileName;

        std::lock_guard<std::mutex> lock(m_mutex);
        m_fileNodes.emplace_back(child);
    }

    QStringList subdirNames = dir.entryList(QDir::Dirs | QDir::NoDotAndDotDot);
    for (const QString& subdirName : subdirNames)
    {
        QString childPath = currentPath + QDir::separator() + subdirName;
        TrieNode* child = new TrieNode();
        child->parent = parent;
        parent->childs[child] = subdirName + "/";

        if (m_threadPool)
        {
            m_taskCount.fetch_add(1, std::memory_order_release);
            m_threadPool->addTask([this, childPath, child]
                {
                    scanCore(childPath, child);
                    m_taskCount.fetch_sub(1, std::memory_order_acquire);
                }
            );
        }
        else {
            scanCore(childPath, child);
        }
    }

}

void Scanner::clearTrie(TrieNode* root)
{
    if (root == nullptr || root->childs.empty()) return;
    for (auto iter = root->childs.begin(); iter != root->childs.end(); ++iter)
    {
        clearTrie(iter.key());
    }
    delete root;
}

#ifdef _DEBUG
void Print(TrieNode* root)
{
    static int level = 0;
    if (root == nullptr || root->childs.empty()) return;
    for (auto iter = root->childs.begin(); iter != root->childs.end(); ++iter)
    {
        qDebug().noquote() << QString("    ").repeated(level) << iter.value();
        ++level;
        Print(iter.key());
        --level;
    }
}
#endif

#ifdef _DEBUG
void Print(std::vector<TrieNode*>* fileNodes)
{
    for (TrieNode* fileNode : *fileNodes)
    {
        qDebug().noquote() << fileNode->parent->childs[fileNode];
    }
}
#endif
//main.cpp
#include "scanner.h"
#include <QStorageInfo>
#include <chrono>

void TestScanDrives(Scanner& scanner, const QStringList& drives)
{
    scanner.scanDrives(drives);
    while (!scanner.isScanCompleted())
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

int main()
{
    QStringList drives;
    for (const QStorageInfo& drive : QStorageInfo::mountedVolumes())
    {
        if (drive.isValid() && drive.isReady())
        {
            drives << drive.rootPath();
        }
    }

    for (int i = 2; i <= 10; i += 2)
    {
        Scanner scanner(i);
        TestScanDrives(scanner, drives);
    }
    
    return 0;
}

I carefully reviewed the implementation of the WorkStealQueue class, particularly the pop and push methods, to ensure they handle tasks correctly. I adjusted the memory order of the atomic variable m_taskCount to different values to see if it affected the issue. I attempted to improve the thread synchronization and task termination logic in WorkStealThreadPool but did not see significant improvements.


Solution

  • you have unsynchronized access and modification to m_taskQueues.size() and m_taskQueues[index]

    for (std::size_t i = 0; i < threadNums; ++i)
        {
            m_taskQueues.emplace_back(std::make_unique<WorkStealQueue>());
            m_workThreads.emplace_back(&WorkStealThreadPool::worker, this, i);
        }
    

    when a worker spawns, it can try to steal work from the thread after it, and m_taskQueues.size() will not be synchronized with m_taskQueues[index] (because there is no mutex on m_taskQueues, and the other thread may see size incremented but not see the vector data modified because that's a race, also it may not see the queue fully constructed because those are not atomic pointers), so split this loop into two.

    for (std::size_t i = 0; i < threadNums; ++i)
        {
            m_taskQueues.emplace_back(std::make_unique<WorkStealQueue>());
        }
    for (std::size_t i = 0; i < threadNums; ++i)
        {
            m_workThreads.emplace_back(&WorkStealThreadPool::worker, this, i);
        }
    

    spawning a thread is a synchronization point, when it happens the queues will all be fully constructed, so there will be no race on m_taskQueues.size().