I am using the basic threadpool found at
to which I applied some of the corrections/enhancement proposed in the answer :
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <future>
#include <thread>
#include "concurrent_queue.h"
#include <chrono>
class ThreadPool
{
public:
using Task = std::packaged_task<bool(void)>;
using TaskHandle = std::future<bool>;
private:
// The one and only instance
static ThreadPool instance_;
// The task queue
concurrent_queue<Task> concurrent_queue_;
// The threads
std::vector<std::thread> threads_;
// Active indicator
bool is_active_;
// Interruption indicator
bool interrupt_;
// Thread number
static thread_local size_t my_tls_num_;
// The function that is executed on every thread
void threadFunc(const size_t num)
{
my_tls_num_ = num;
// "Infinite" loop, only broken on destruction
while (auto t = concurrent_queue_.pop())
{
(*t)();
}
}
// The constructor stays private, ensuring single instance
ThreadPool() : is_active_(false), interrupt_(false) {}
public:
// Access the instance
static ThreadPool* getInstance()
{
return &instance_;
}
// Number of threads
size_t numThreads() const { return threads_.size(); }
// The number of the caller thread
static size_t threadNum() { return my_tls_num_; }
// Starter
void start(const size_t nThread = std::thread::hardware_concurrency() - 1)
{
if (!is_active_) // Only start once
{
threads_.reserve(nThread);
// Launch threads on threadFunc and keep handles in a vector
for (size_t i = 0; i < nThread; i++)
{
std::thread loc = std::thread(&ThreadPool::threadFunc, this, i + 1);
threads_.push_back(std::move(loc));
}
is_active_ = true;
}
}
//dtor
~ThreadPool()
{
stop();
}
void stop()
{
if (is_active_)
{
// Interrupt mode
interrupt_ = true;
// Interrupt all waiting threads
concurrent_queue_.interrupt();
// Wait for them all to join
for_each(threads_.begin(), threads_.end(), std::mem_fn(&std::thread::join));
// Clear all threads
threads_.clear();
// Clear the queue and reset interrupt
concurrent_queue_.clear();
concurrent_queue_.reset_interrupt();
// Mark as inactive
is_active_ = false;
// Reset interrupt
interrupt_ = false;
}
}
// Forbid copies etc
ThreadPool(const ThreadPool& rhs) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
ThreadPool(ThreadPool&& rhs) = delete;
ThreadPool& operator=(ThreadPool&& rhs) = delete;
// Spawn task
template<typename Callable>
TaskHandle spawnTask(Callable c)
{
Task t(std::move(c));
TaskHandle f = t.get_future();
concurrent_queue_.push(std::move(t));
return f;
}
// Run queued tasks synchronously
// while waiting on a future,
// returns true if at least one task was run
bool activeWait(const TaskHandle& f)
{
using namespace std::chrono_literals;
bool b = false;
// Check whether or not the future is ready without blocking
// by waiting 0 seconds and returning status
while (f.wait_for(0s) != std::future_status::ready)
{
// Non blocking
if (auto t = concurrent_queue_.try_pop())
{
(*t)();
b = true;
}
else // Nothing in the queue: go to sleep
{
f.wait();
}
}
return b;
}
};
inline ThreadPool ThreadPool::instance_;
inline thread_local size_t ThreadPool::my_tls_num_ = 0;
#endif//THREAD_POOL_H
I coded a simple advisor class :
#include <mutex>
#include "ThreadPool.h"
#include "func.h"
class Advisor
{
private:
static Advisor * pinstance_;
static std::mutex mutex_;
protected:
Advisor(const size_t nThread = std::thread::hardware_concurrency() - 1)
{
ThreadPool::getInstance()->start(nThread);
}
virtual ~Advisor()
{
ThreadPool::getInstance()->stop();
}
public:
Advisor(const Advisor& rhs) = delete;
Advisor& operator=(const Advisor& rhs) = delete;
Advisor(Advisor&& rhs) = delete;
Advisor& operator=(Advisor&& rhs) = delete;
static Advisor* GetInstance(const size_t nThread = std::thread::hardware_concurrency() - 1);
double some_func(const size_t N)
{
return some_func_calc(N);
}
};
/**
* Static methods should be defined outside the class.
*/
Advisor* Advisor::pinstance_{ nullptr };
std::mutex Advisor::mutex_;
/**
* The first time we call GetInstance we will lock the storage location
* and then we make sure again that the variable is null and then we
* set the value. RU:
*/
Advisor* Advisor::GetInstance(const size_t nThread)
{
std::lock_guard<std::mutex> lock(mutex_);
if (pinstance_ == nullptr)
{
pinstance_ = new Advisor(nThread);
}
return pinstance_;
}
based on a free function which does an average by splitting the for loop of differents threads :
inline double some_func_calc(const size_t nbPaths)
{
const std::unique_ptr<my_rng> rng_unique_ptr = std::make_unique<sobol>();
std::vector<double> resultMat(nbPaths);
ThreadPool::getInstance()->start(std::thread::hardware_concurrency() - 1);
ThreadPool* pool = ThreadPool::getInstance();
const size_t nThread = pool->numThreads();
std::vector<std::vector<double>> unifVecs(nThread + 1); // +1 for the main thread ;)
for (auto& vec : unifVecs)
vec.resize(1);
std::vector<std::vector<double>> paths(nThread + 1);
// One RNG per thread
std::vector<std::unique_ptr<my_rng>> rngs(nThread + 1);
for (auto& random : rngs)
{
random = rng_unique_ptr->clone();
random->init(1);
}
#define BATCHSIZE size_t{64}
// Reserve memory for futures
std::vector<ThreadPool::TaskHandle> futures;
futures.reserve(nbPaths / BATCHSIZE + 1);
// Start
size_t firstPath = 0;
size_t pathsLeft = nbPaths;
while (pathsLeft > 0)
{
size_t pathsInTask = std::min<size_t>(pathsLeft, BATCHSIZE);
futures.push_back(pool->spawnTask([&, firstPath, pathsInTask]()
{
// Inside the parallel task,
// pick the right pre-allocated vectors
const size_t threadNum = pool->threadNum();
std::vector<double>& unifVec = unifVecs[threadNum];
std::vector<double>& path = paths[threadNum];
// Get a RNG and position it correctly
auto& random = rngs[threadNum];
random->skipTo(firstPath);
// And conduct the simulations
for (size_t i = 0; i < pathsInTask; i++)
{
// Next Gaussian vector, dimension D
random->nextU(unifVec);
//value on path
double avg_of_elems = std::reduce(unifVec.begin(), unifVec.end()) / unifVec.size();
//store result
resultMat[firstPath + i] = avg_of_elems;
}
// Remember tasks must return bool
return true;
}));
pathsLeft -= pathsInTask;
firstPath += pathsInTask;
}
for (auto& future : futures)
pool->activeWait(future);
double actual = std::reduce(resultMat.begin(), resultMat.end()) / resultMat.size();
return actual;
}
based on a simple random generator :
#ifndef MY_RNG_H
#define MY_RNG_H
#include <memory>
#include <vector>
enum RngType
{
sob = 0,
};
class my_rng
{
public:
// Initialise with dimension simDim
virtual void init(const size_t simDim) = 0;
virtual size_t dim() const = 0;
// Compute the next vector[simDim] of independent Uniforms or Gaussians
// The vector is filled by the function and must be pre-allocated
virtual void nextU(std::vector<double>& uVec) = 0;
virtual std::unique_ptr<my_rng> clone() const = 0;
virtual ~my_rng() = default;
// Skip ahead
virtual void skipTo(const unsigned b) = 0;
};
#endif//MY_RNG_H
with concrete class :
#ifndef SOBOL_H
#define SOBOL_H
#include <algorithm>
#include "my_rng.h"
#define ONEOVER2POW32 2.3283064365387E-10
const unsigned* const* getjkDir();
class sobol : public my_rng
{
// Dimension
size_t myDim;
// State Y
std::vector<unsigned> myState;
// Current index in the sequence
unsigned myIndex;
// The direction numbers listed in sobol.cpp
// Note jkDir[i][dim] gives the i-th (0 to 31)
// direction number of dimension dim
const unsigned* const* jkDir;
public:
size_t dim() const override
{
return myDim;
}
// Virtual copy constructor
std::unique_ptr<my_rng> clone() const override
{
return std::make_unique<sobol>(*this);
}
// Initializer
void init(const size_t simDim) override
{
// Set pointer on direction numbers
jkDir = getjkDir();
// Dimension
myDim = simDim;
myState.resize(myDim);
// Reset to 0
reset();
}
void reset()
{
// Set state to 0
memset(myState.data(), 0, myDim * sizeof(unsigned));
// Set index to 0
myIndex = 0;
}
// Next point
void next()
{
// Gray code, find position j
// of rightmost zero bit of current index n
unsigned n = myIndex, j = 0;
while (n & 1)
{
n >>= 1;
++j;
}
// Direction numbers
const unsigned* dirNums = jkDir[j];
// XOR the appropriate direction number
// into each component of the integer sequence
for (int i = 0; i < myDim; ++i)
{
myState[i] ^= dirNums[i];
}
// Update count
++myIndex;
}
void nextU(std::vector<double>& uVec) override
{
next();
transform(myState.begin(), myState.end(), uVec.begin(),
[](const unsigned long i)
{
return ONEOVER2POW32 * i;
});
}
// Skip ahead (from 0 to b)
void skipTo(const unsigned b) override
{
// Check skip
if (!b) return;
// Reset Sobol to 0
reset();
// The actual Sobol skipping algo
unsigned im = b;
unsigned two_i = 1, two_i_plus_one = 2;
unsigned i = 0;
while (two_i <= im)
{
if (((im + two_i) / two_i_plus_one) & 1)
{
for (unsigned k = 0; k < myDim; ++k)
{
myState[k] ^= jkDir[i][k];
}
}
two_i <<= 1;
two_i_plus_one <<= 1;
++i;
}
// End of skipping algo
// Update next entry
myIndex = unsigned(b);
}
};
#endif//SOBOL_H
whose source I add at the end of the question.
The client code is :
#include <iostream>
#include "Advisor.h"
void ASSERT_NEAR(double actual, double expected, double precision)
{
if (std::abs(actual - expected) <= precision)
std::cout << "Ok !" << std::endl;
else
std::cout << "Not ok !" << std::endl;
}
static Advisor* pricingAdvisorSingleton = Advisor::GetInstance();
int main()
{
try
{
constexpr size_t nbPaths = 131072;
double expected = 0.5;
double actual = pricingAdvisorSingleton->some_func(nbPaths);
ASSERT_NEAR(actual, expected, 1e-3);
std::cout << "Ok!\n";
}
catch (std::exception &e)
{
std::cout << e.what() << "\n";
}
}
This code compiled with Microsoft Visual Studio 2002 v143 compiler in debug or release executes fine with the expecte result Ok!
. More precisely : very complex numerical code (with some_func
analogues performing intensive (CPU and RAM wise) numerical computations), multi-threaded in the same fashion as it my code, has been in production and is working fine with the intended numerical results. (Which doesn't imply that it is sane from a multithreading pov as I am not an expert. But at least no race condtions nor leaks etc.)
Compiled with Intel C++ 2O25 compiler though, and executed, it crashes in
void start(const size_t nThread = std::thread::hardware_concurrency() - 1)
on the instruction
threads_.push_back(std::thread(&ThreadPool::threadFunc, this, i + 1));
which triggers a memory violation (-1073741819 (0xc0000005)) at
_CONSTEXPR20 void _Orphan_range_unlocked(pointer _First, pointer _Last) const
from <vector>
, in its instruction while (*_Pnext) {
.
It is as if objects std::thread(&ThreadPool::threadFunc, this, i + 1)
where corrupted. I am not at all an multi-threading expert in C++, so any idea would be welcome.
Source of sobol
class for the sake of completeness can be found in a gist.
the problem is in the linked code review thread, with your code causing Static Initialization Order Fiasco.
class ThreadPool
{
// The one and only instance
static ThreadPool instance_;
public:
// Access the instance
static ThreadPool* getInstance() { return &instance_; }
and in your code
Advisor::Advisor(const size_t nThread = std::thread::hardware_concurrency() - 1)
{
ThreadPool::getInstance()->start(nThread);
}
Advisor* Advisor::GetInstance(const size_t nThread)
{
std::lock_guard<std::mutex> lock(mutex_);
if (pinstance_ == nullptr)
{
pinstance_ = new Advisor(nThread);
}
return pinstance_;
}
static Advisor* pricingAdvisorSingleton = Advisor::GetInstance();
this will very much cause fiasco and is generally not threadsafe, nothing guarantees that ThreadPool
is initialized before Advisor
, and you crash for calling push_back
on an uninitialized vector, instead the C++11 way is using meyers singleton, which is guaranteed to be threadsafe and no fiasco.
static ThreadPool& ThreadPool::getInstance()
{
static ThreadPool instance_;
return instance_;
}
static Advisor& Advisor::GetInstance()
{
static Advisor instance_;
return instance_;
}