I want to construct a resource pool which maintains a set of resources shared between several threads. Each thread pops a resource from the pool, uses it to perform some work and pushes the resource back to pool after work is done. The resource pool is modelled as vector of shared_ptr to resources.
template <typename T>
class resourcePool
{
public:
resourcePool () : mElements (0) {
mElements.reserve(128);
}
void push(const std::shared_ptr<T> ele)
{
std::lock_guard<std::mutex> lock(mMutex);
mElements.push_back(ele);
}
size_t size() const // NOTE! not thread safe
{
return mElements.size();
}
std::shared_ptr<T> pop()
{
// it returns a shared pointer if at least one element exists in the pool.
// if no elements exist in the queue it returns a null
std::lock_guard<std::mutex> lock(mMutex);
if (mElements.size() > 0) {
auto ptr = mElements.back();
mElements.pop_back();
return ptr;
} else {
return nullptr;
}
}
private:
std::vector<std::shared_ptr<T>> mElements {};
std::mutex mMutex {};
};
Since push() and pop() are mutex protected, I would imagine that there should only be one shared_ptr managing an object. However, it turns out not to be the case. Given below is the complete test program
#include <iostream>
#include <atomic>
#include <memory>
#include <queue>
#include <mutex>
#include <thread>
#include <chrono>
#include <cassert>
#include <random>
#include <ctime>
static constexpr size_t numIterations = 0xFFFF;
static constexpr uint32_t numThreads = 16;
static std::atomic<uint64_t> objCnt {};
class resource_t {
public:
resource_t () : workCount (0), randGen( time(NULL) ), distrib(resource_t::WORK_TIME_MIN, resource_t::WORK_TIME_MAX)
{
objCnt++;
}
~resource_t ()
{
objCnt--;
}
uint64_t get_workCount () const {return workCount;}
const std::array<uint64_t, numThreads>& get_workCountPerThread () const {return workCountPerThread;}
void doSomeWork (const size_t threadIdx)
{
auto const workTime = distrib(randGen);
std::this_thread::sleep_for(std::chrono::microseconds(workTime));
workCount++;
workCountPerThread[threadIdx]++;
}
long sharedPtrUseCountMax {}; // indicates max of number shared pointers managing the object at a given instant
private:
uint64_t workCount {}; // indicates how many times this object is used
std::array<uint64_t, numThreads> workCountPerThread {};
static constexpr int WORK_TIME_MIN = 1;
static constexpr int WORK_TIME_MAX = 10;
std::mt19937 randGen {};
std::uniform_int_distribution<> distrib {};
};
template <typename T>
class resourcePool
{
public:
resourcePool () : mElements (0) {
mElements.reserve(128);
}
void push(const std::shared_ptr<T> ele)
{
std::lock_guard<std::mutex> lock(mMutex);
mElements.push_back(ele);
}
size_t size() const // NOTE! not thread safe
{
return mElements.size();
}
std::shared_ptr<T> pop()
{
// it returns a shared pointer if elements exists in the pool.
// if no elements exists in the queue it returns a null
std::lock_guard<std::mutex> lock(mMutex);
if (mElements.size() > 0) {
auto ptr = mElements.back();
mElements.pop_back();
return ptr;
} else {
return nullptr;
}
}
private:
std::vector<std::shared_ptr<T>> mElements {};
std::mutex mMutex {};
};
// task function pops elements from resource pool, and does work with the returned object.
// task function also counts the number of shared pointers managing a given resource_t object.
void task(std::shared_ptr<resourcePool<resource_t>> resources, const size_t numIterations, const size_t threadIdx)
{
for (uint32_t itr = 0; itr < numIterations; ++itr) {
auto ele = resources->pop();
if (nullptr == ele) {
ele = std::make_shared<resource_t> ();
}
auto use_count = ele.use_count();
if (use_count > ele->sharedPtrUseCountMax) {
ele->sharedPtrUseCountMax = use_count;
}
ele->doSomeWork( threadIdx );
resources->push(ele);
}
}
int runTest() {
auto resources = std::make_shared<resourcePool<resource_t>> ();
std::vector<std::thread> threads {};
threads.reserve(numThreads);
// create a number of threads executing a task for a given number of counts
for (uint32_t idx = 0; idx < numThreads; ++idx) {
threads.emplace_back ( std::thread (task, resources, numIterations, idx) );
}
for (uint32_t idx = 0; idx < threads.size(); ++idx) {
threads[idx].join();
}
size_t workCntTotal = 0;
size_t workCntPerThreadTotal = 0;
uint32_t idx = 0;
while (0 < resources->size()) {
auto ele = resources->pop();
workCntTotal += ele->get_workCount();
std::cout << "idx = " << idx++ << ",\t ";
auto const& workCountPerThread = ele->get_workCountPerThread();
std::cout << "workCountPerThread = {";
for ( auto x : workCountPerThread) {
std::cout << x << ", ";
workCntPerThreadTotal += x;
}
std::cout << "}";
std::cout << ", use_count = " << ele->sharedPtrUseCountMax << std::endl;
}
std::cout << "workCntTotal = " << workCntTotal << ", workCntPerThreadTotal = " << workCntPerThreadTotal << std::endl;
std::cout << "numThreads x numIterations = " << numIterations * numThreads << std::endl;
return 0;
}
int main() {
runTest ();
}
Sample output of the above program is:
idx = 0, workCountPerThread = {2839, 4543, 3877, 3356, 5566, 3813, 7626, 4153, 3655, 4840, 3835, 4123, 4109, 4618, 4002, 9577, }, use_count = 3
idx = 1, workCountPerThread = {4072, 3732, 4325, 3332, 9313, 3249, 5995, 5448, 2964, 5220, 4172, 4081, 6152, 2942, 5023, 4985, }, use_count = 5
idx = 2, workCountPerThread = {4800, 7102, 2673, 4104, 7960, 5187, 5742, 3832, 5280, 3606, 3303, 3389, 2003, 4444, 5722, 4035, }, use_count = 3
idx = 3, workCountPerThread = {4096, 7805, 4252, 4594, 4002, 3125, 4292, 4666, 3825, 4495, 2161, 5365, 5164, 3781, 3321, 6327, }, use_count = 3
idx = 4, workCountPerThread = {3657, 3713, 4737, 5228, 4203, 2717, 3916, 4275, 2868, 2690, 4345, 3566, 2390, 5024, 2204, 4337, }, use_count = 3
idx = 5, workCountPerThread = {4928, 3144, 4156, 4475, 2776, 3734, 3357, 4760, 3235, 3585, 5092, 4457, 5287, 4911, 3798, 2933, }, use_count = 5
idx = 6, workCountPerThread = {5342, 3434, 4827, 4525, 2176, 4298, 2294, 4353, 4097, 3801, 3873, 4414, 5723, 4264, 3967, 2307, }, use_count = 3
idx = 7, workCountPerThread = {4450, 3762, 4267, 4280, 3113, 3097, 4653, 3300, 5604, 3802, 5051, 4055, 3692, 3580, 4142, 2501, }, use_count = 3
idx = 8, workCountPerThread = {4116, 4284, 4403, 3897, 3380, 4513, 3379, 3559, 4427, 3834, 5067, 4048, 4101, 3062, 4108, 3884, }, use_count = 3
idx = 9, workCountPerThread = {4227, 2615, 3758, 5343, 2286, 5285, 5209, 3265, 5280, 2531, 4693, 2235, 4388, 4807, 4067, 3360, }, use_count = 3
idx = 10, workCountPerThread = {4041, 3919, 2889, 4778, 4460, 4703, 2312, 3669, 4005, 5153, 3193, 3855, 4387, 5364, 3309, 2590, }, use_count = 3
idx = 11, workCountPerThread = {3729, 4244, 3518, 3321, 2015, 4229, 4562, 4184, 5638, 3982, 3295, 5072, 2613, 3240, 4273, 4290, }, use_count = 3
idx = 12, workCountPerThread = {4178, 2769, 5295, 4813, 3157, 7028, 2592, 3940, 4445, 4659, 3458, 5135, 3505, 3589, 2913, 3081, }, use_count = 3
idx = 13, workCountPerThread = {3118, 3299, 4062, 3682, 2478, 2256, 3203, 5424, 4324, 3659, 3301, 4987, 3623, 4396, 5445, 4931, }, use_count = 5
idx = 14, workCountPerThread = {5340, 2436, 4539, 3176, 2282, 5005, 3359, 3000, 3464, 5798, 4333, 4369, 4799, 4417, 5203, 2900, }, use_count = 3
idx = 15, workCountPerThread = {2602, 4734, 3957, 2631, 6368, 3296, 3044, 3707, 2424, 3880, 6363, 2384, 3599, 3096, 4038, 3497, }, use_count = 3
workCntTotal = 1048560, workCntPerThreadTotal = 1048560
numThreads x numIterations = 1048560
My expectation is that the last column in the output (use_count) which is the maximum of number of shared pointers managing a given object is 1. However I see it varying. Any idea why we have more than one shared_pointer managing the same object?
Consider the line main loop of function task
resources->push(ele);
And note that a copy of ele
is still present immediately after the push
concludes, so you'll have an increased number of use_count
by 1 for a moment.
The push
function is defined as
void push(const std::shared_ptr<T> ele) // creates an instance on call
{
std::lock_guard<std::mutex> lock(mMutex);
mElements.push_back(ele); // creates another copy on call
} // destroys element on scope exit after the mutex was unlocked
So, the use_count
might be elevated by 2 for a short moment. Which, given enough opportunities, will eventually happen.
To fix the issues, utilize move semantics. Fix the line with push
call
resources->push(std::move(ele));
And modify the push
function:
void push(std::shared_ptr<T> ele)
{
std::lock_guard<std::mutex> lock(mMutex);
mElements.emplace_back(std::move(ele));
}
This way, you'll never have any extra copies in this run.
You can also improve pop
call with
auto ptr = std::move(mElements.back());
mElements.pop_back();
So, you remove the unnecessary copying and destruction of the shared pointer and switch it into a move of shared pointer and destruction of an empty shared pointer.