c++multithreadingvisual-studio-2022intel

Memory violation at execution of multithreaded code compiled with Intel C++ 2025 compiler


I am using the basic threadpool found at

https://codereview.stackexchange.com/questions/288042/c-implementation-of-a-concurrent-queue-and-of-a-thread-pool/288044?noredirect=1#comment593398_288044

to which I applied some of the corrections/enhancement proposed in the answer :

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <future>
#include <thread>
#include "concurrent_queue.h"
#include <chrono>



class ThreadPool
{
public:
    using Task = std::packaged_task<bool(void)>;
    using TaskHandle = std::future<bool>;
private:
    //  The one and only instance
    static ThreadPool instance_;
    //  The task queue
    concurrent_queue<Task> concurrent_queue_;
    //  The threads
    std::vector<std::thread> threads_;
    //  Active indicator
    bool is_active_;
    //  Interruption indicator
    bool interrupt_;
    //  Thread number
    static thread_local size_t my_tls_num_;

    //  The function that is executed on every thread
    void threadFunc(const size_t num)
    {
        my_tls_num_ = num;
        //  "Infinite" loop, only broken on destruction
        while (auto t = concurrent_queue_.pop())
        {
            (*t)();
        }
    }

    //  The constructor stays private, ensuring single instance
    ThreadPool() : is_active_(false), interrupt_(false) {}

public:
    //  Access the instance
    static ThreadPool* getInstance()
    {
        return &instance_;
    }
    //  Number of threads
    size_t numThreads() const { return threads_.size(); }
    //  The number of the caller thread
    static size_t threadNum() { return my_tls_num_; }
    //  Starter
    void start(const size_t nThread = std::thread::hardware_concurrency() - 1)
    {
        if (!is_active_)  //  Only start once
        {
            threads_.reserve(nThread);

            //  Launch threads on threadFunc and keep handles in a vector
            for (size_t i = 0; i < nThread; i++)
            {
                std::thread loc = std::thread(&ThreadPool::threadFunc, this, i + 1);
                threads_.push_back(std::move(loc));
            }

            is_active_ = true;
        }
    }
    //dtor
    ~ThreadPool()
    {
        stop();
    }
    void stop()
    {
        if (is_active_)
        {
            //  Interrupt mode
            interrupt_ = true;
            //  Interrupt all waiting threads
            concurrent_queue_.interrupt();
            //  Wait for them all to join
            for_each(threads_.begin(), threads_.end(), std::mem_fn(&std::thread::join));
            //  Clear all threads
            threads_.clear();
            //  Clear the queue and reset interrupt
            concurrent_queue_.clear();
            concurrent_queue_.reset_interrupt();
            //  Mark as inactive
            is_active_ = false;
            //  Reset interrupt
            interrupt_ = false;
        }
    }
    //  Forbid copies etc
    ThreadPool(const ThreadPool& rhs) = delete;
    ThreadPool& operator=(const ThreadPool& rhs) = delete;
    ThreadPool(ThreadPool&& rhs) = delete;
    ThreadPool& operator=(ThreadPool&& rhs) = delete;
    //  Spawn task
    template<typename Callable>
    TaskHandle spawnTask(Callable c)
    {
        Task t(std::move(c));
        TaskHandle f = t.get_future();
        concurrent_queue_.push(std::move(t));
        return f;
    }

    //  Run queued tasks synchronously 
    //  while waiting on a future, 
    //  returns true if at least one task was run
    bool activeWait(const TaskHandle& f)
    {
        using namespace std::chrono_literals;
        bool b = false;
        //  Check whether or not the future is ready without blocking
        //  by waiting 0 seconds and returning status
        while (f.wait_for(0s) != std::future_status::ready)
        {
            //  Non blocking
            if (auto t = concurrent_queue_.try_pop())
            {
                (*t)();
                b = true;
            }
            else // Nothing in the queue: go to sleep
            {
                f.wait();
            }
        }
        return b;
    }
};

inline ThreadPool ThreadPool::instance_;
inline thread_local size_t ThreadPool::my_tls_num_ = 0;



#endif//THREAD_POOL_H

I coded a simple advisor class :

#include <mutex>
#include "ThreadPool.h"
#include "func.h"

class Advisor
{
private:
    static Advisor * pinstance_;
    static std::mutex mutex_;
protected:
    Advisor(const size_t nThread = std::thread::hardware_concurrency() - 1)
    {
        ThreadPool::getInstance()->start(nThread);
    }
    virtual  ~Advisor()
    {
        ThreadPool::getInstance()->stop();
    }
public:
    Advisor(const Advisor& rhs) = delete;
    Advisor& operator=(const Advisor& rhs) = delete;
    Advisor(Advisor&& rhs) = delete;
    Advisor& operator=(Advisor&& rhs) = delete;

    static Advisor* GetInstance(const size_t nThread = std::thread::hardware_concurrency() - 1);

    double some_func(const size_t N)
    {
        return some_func_calc(N);
    }
};

/**
 * Static methods should be defined outside the class.
 */
Advisor* Advisor::pinstance_{ nullptr };
std::mutex Advisor::mutex_;

/**
 * The first time we call GetInstance we will lock the storage location
 *      and then we make sure again that the variable is null and then we
 *      set the value. RU:
 */
Advisor* Advisor::GetInstance(const size_t nThread)
{
    std::lock_guard<std::mutex> lock(mutex_);
    if (pinstance_ == nullptr)
    {
        pinstance_ = new Advisor(nThread);
    }
    return pinstance_;
}

based on a free function which does an average by splitting the for loop of differents threads :

inline double some_func_calc(const size_t nbPaths)
{
    const std::unique_ptr<my_rng> rng_unique_ptr = std::make_unique<sobol>();
    std::vector<double> resultMat(nbPaths);

    ThreadPool::getInstance()->start(std::thread::hardware_concurrency() - 1);
    ThreadPool* pool = ThreadPool::getInstance();
    const size_t nThread = pool->numThreads();

    std::vector<std::vector<double>> unifVecs(nThread + 1);    //  +1 for the main thread ;)
    for (auto& vec : unifVecs)
        vec.resize(1);
    std::vector<std::vector<double>> paths(nThread + 1);

    //  One RNG per thread
    std::vector<std::unique_ptr<my_rng>> rngs(nThread + 1);
    for (auto& random : rngs)
    {
        random = rng_unique_ptr->clone();
        random->init(1);
    }

#define BATCHSIZE size_t{64}

    //  Reserve memory for futures
    std::vector<ThreadPool::TaskHandle> futures;

    futures.reserve(nbPaths / BATCHSIZE + 1);

    //  Start
    size_t firstPath = 0;
    size_t pathsLeft = nbPaths;



    while (pathsLeft > 0)
    {
        size_t pathsInTask = std::min<size_t>(pathsLeft, BATCHSIZE);

        futures.push_back(pool->spawnTask([&, firstPath, pathsInTask]()
        {
            //  Inside the parallel task, 
            //  pick the right pre-allocated vectors
            const size_t threadNum = pool->threadNum();
            std::vector<double>& unifVec = unifVecs[threadNum];
            std::vector<double>& path = paths[threadNum];

            //  Get a RNG and position it correctly
            auto& random = rngs[threadNum];
            random->skipTo(firstPath);

            //  And conduct the simulations
            for (size_t i = 0; i < pathsInTask; i++)
            {
                //  Next Gaussian vector, dimension D
                random->nextU(unifVec);
                //value on path
                double avg_of_elems = std::reduce(unifVec.begin(), unifVec.end()) / unifVec.size();
                //store result
                resultMat[firstPath + i] = avg_of_elems;
            }
            //  Remember tasks must return bool
            return true;
        }));

        pathsLeft -= pathsInTask;
        firstPath += pathsInTask;
    }

    for (auto& future : futures)
        pool->activeWait(future);

    double actual = std::reduce(resultMat.begin(), resultMat.end()) / resultMat.size();

    return actual;
}

based on a simple random generator :

#ifndef MY_RNG_H
#define MY_RNG_H

#include <memory>
#include <vector>

enum RngType
{
    sob = 0,
};

class my_rng
{
public:

    //  Initialise with dimension simDim
    virtual void init(const size_t simDim) = 0;
    virtual size_t dim() const = 0;

    //  Compute the next vector[simDim] of independent Uniforms or Gaussians
    //  The vector is filled by the function and must be pre-allocated
    virtual void nextU(std::vector<double>& uVec) = 0;

    virtual std::unique_ptr<my_rng> clone() const = 0;

    virtual ~my_rng() = default;

    //  Skip ahead
    virtual void skipTo(const unsigned b) = 0;
};

#endif//MY_RNG_H

with concrete class :

#ifndef SOBOL_H
#define SOBOL_H

#include <algorithm>
#include "my_rng.h"



#define ONEOVER2POW32 2.3283064365387E-10

const unsigned* const* getjkDir();

class sobol : public my_rng
{
    //  Dimension
    size_t                      myDim;

    //  State Y
    std::vector<unsigned>           myState;

    //  Current index in the sequence
    unsigned                    myIndex;

    //  The direction numbers listed in sobol.cpp
    //  Note jkDir[i][dim] gives the i-th (0 to 31) 
    //      direction number of dimension dim
    const unsigned* const* jkDir;

public:
    size_t dim() const override
    {
        return myDim;
    }
    //  Virtual copy constructor
    std::unique_ptr<my_rng> clone() const override
    {
        return std::make_unique<sobol>(*this);
    }

    //  Initializer 
    void init(const size_t simDim) override
    {
        //  Set pointer on direction numbers 
        jkDir = getjkDir();

        //  Dimension
        myDim = simDim;
        myState.resize(myDim);

        //  Reset to 0
        reset();
    }

    void reset()
    {
        //  Set state to 0
        memset(myState.data(), 0, myDim * sizeof(unsigned));
        //  Set index to 0
        myIndex = 0;
    }

    //  Next point
    void next()
    {
        //  Gray code, find position j 
        //      of rightmost zero bit of current index n
        unsigned n = myIndex, j = 0;
        while (n & 1)
        {
            n >>= 1;
            ++j;
        }

        //  Direction numbers
        const unsigned* dirNums = jkDir[j];

        //  XOR the appropriate direction number 
        //      into each component of the integer sequence
        for (int i = 0; i < myDim; ++i)
        {
            myState[i] ^= dirNums[i];
        }

        //  Update count
        ++myIndex;
    }

    void nextU(std::vector<double>& uVec) override
    {
        next();
        transform(myState.begin(), myState.end(), uVec.begin(),
            [](const unsigned long i)
            {
                return ONEOVER2POW32 * i;
            });
    }

    //  Skip ahead (from 0 to b)
    void skipTo(const unsigned b) override
    {
        //  Check skip
        if (!b) return;

        //  Reset Sobol to 0 
        reset();

        //  The actual Sobol skipping algo
        unsigned im = b;
        unsigned two_i = 1, two_i_plus_one = 2;

        unsigned i = 0;
        while (two_i <= im)
        {
            if (((im + two_i) / two_i_plus_one) & 1)
            {
                for (unsigned k = 0; k < myDim; ++k)
                {
                    myState[k] ^= jkDir[i][k];
                }
            }

            two_i <<= 1;
            two_i_plus_one <<= 1;
            ++i;
        }

        //  End of skipping algo

        //  Update next entry
        myIndex = unsigned(b);
    }
};

#endif//SOBOL_H

whose source I add at the end of the question.

The client code is :

#include <iostream>
#include "Advisor.h"


void ASSERT_NEAR(double actual, double expected, double precision)
{
    if (std::abs(actual - expected) <= precision)
        std::cout << "Ok !" << std::endl;
    else
        std::cout << "Not ok !" << std::endl;
}



static Advisor* pricingAdvisorSingleton = Advisor::GetInstance();

int main()
{
    try
    {
        constexpr size_t nbPaths = 131072;
        double expected = 0.5;
        double actual = pricingAdvisorSingleton->some_func(nbPaths);

        ASSERT_NEAR(actual, expected, 1e-3);

        std::cout << "Ok!\n";
    }
    catch (std::exception &e)
    {
        std::cout << e.what() << "\n";
    }
}

This code compiled with Microsoft Visual Studio 2002 v143 compiler in debug or release executes fine with the expecte result Ok!. More precisely : very complex numerical code (with some_func analogues performing intensive (CPU and RAM wise) numerical computations), multi-threaded in the same fashion as it my code, has been in production and is working fine with the intended numerical results. (Which doesn't imply that it is sane from a multithreading pov as I am not an expert. But at least no race condtions nor leaks etc.)

Compiled with Intel C++ 2O25 compiler though, and executed, it crashes in

void start(const size_t nThread = std::thread::hardware_concurrency() - 1)

on the instruction

threads_.push_back(std::thread(&ThreadPool::threadFunc, this, i + 1));

which triggers a memory violation (-1073741819 (0xc0000005)) at

_CONSTEXPR20 void _Orphan_range_unlocked(pointer _First, pointer _Last) const

from <vector>, in its instruction while (*_Pnext) {.

It is as if objects std::thread(&ThreadPool::threadFunc, this, i + 1) where corrupted. I am not at all an multi-threading expert in C++, so any idea would be welcome.

Source of sobol class for the sake of completeness can be found in a gist.


Solution

  • the problem is in the linked code review thread, with your code causing Static Initialization Order Fiasco.

    class ThreadPool
    {
        //  The one and only instance
        static ThreadPool instance_;
    public:
        //  Access the instance
        static ThreadPool* getInstance() { return &instance_; }
    
    

    and in your code

    Advisor::Advisor(const size_t nThread = std::thread::hardware_concurrency() - 1)
    {
        ThreadPool::getInstance()->start(nThread);
    }
    
    Advisor* Advisor::GetInstance(const size_t nThread)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        if (pinstance_ == nullptr)
        {
            pinstance_ = new Advisor(nThread);
        }
        return pinstance_;
    }
    
    static Advisor* pricingAdvisorSingleton = Advisor::GetInstance();
    
    

    this will very much cause fiasco and is generally not threadsafe, nothing guarantees that ThreadPool is initialized before Advisor, and you crash for calling push_back on an uninitialized vector, instead the C++11 way is using meyers singleton, which is guaranteed to be threadsafe and no fiasco.

    static ThreadPool& ThreadPool::getInstance() 
    { 
        static ThreadPool instance_;
        return instance_; 
    }
    
    static Advisor& Advisor::GetInstance()
    {
        static Advisor instance_;
        return instance_;
    }