c++mutexshared-ptrpool

On shared_ptr use_count of resource pool C++ std::shared_ptr


I want to construct a resource pool which maintains a set of resources shared between several threads. Each thread pops a resource from the pool, uses it to perform some work and pushes the resource back to pool after work is done. The resource pool is modelled as vector of shared_ptr to resources.

template <typename T>
class resourcePool
{
public:

    resourcePool () : mElements (0) {
        mElements.reserve(128);
    }

    void push(const std::shared_ptr<T> ele)
    {
        std::lock_guard<std::mutex> lock(mMutex);
        mElements.push_back(ele);
    }

    size_t size() const // NOTE! not thread safe
    {
        return mElements.size();
    }

    std::shared_ptr<T> pop()
    {
        // it returns a shared pointer if at least one element exists in the pool.
        // if no elements exist in the queue it returns a null
        std::lock_guard<std::mutex> lock(mMutex);
        if (mElements.size() > 0) {
            auto ptr = mElements.back();
            mElements.pop_back();
            return ptr;
        } else {
            return nullptr;
        }
    }
private:
    std::vector<std::shared_ptr<T>> mElements {};
    std::mutex                      mMutex {};
};

Since push() and pop() are mutex protected, I would imagine that there should only be one shared_ptr managing an object. However, it turns out not to be the case. Given below is the complete test program

#include <iostream>
#include <atomic>
#include <memory>
#include <queue>
#include <mutex>
#include <thread>
#include <chrono>
#include <cassert>
#include <random>
#include <ctime>

static constexpr size_t   numIterations = 0xFFFF;
static constexpr uint32_t numThreads = 16;


static std::atomic<uint64_t> objCnt {};

class resource_t {
public:
    resource_t () : workCount (0), randGen( time(NULL) ), distrib(resource_t::WORK_TIME_MIN, resource_t::WORK_TIME_MAX)
    {
        objCnt++;
    }
    ~resource_t ()
    {
        objCnt--;
    }

    uint64_t get_workCount () const {return workCount;}
    const std::array<uint64_t, numThreads>& get_workCountPerThread () const {return workCountPerThread;}

    void doSomeWork (const size_t threadIdx)
    {
        auto const workTime = distrib(randGen);
        std::this_thread::sleep_for(std::chrono::microseconds(workTime));
        workCount++;
        workCountPerThread[threadIdx]++;
    }

    long sharedPtrUseCountMax {};   // indicates max of number shared pointers managing the object at a given instant

private:
    uint64_t workCount {};          // indicates how many times this object is used
    std::array<uint64_t, numThreads> workCountPerThread {};

    static constexpr int WORK_TIME_MIN  =   1;
    static constexpr int WORK_TIME_MAX  =   10;
    std::mt19937 randGen {};
    std::uniform_int_distribution<> distrib {};
};

template <typename T>
class resourcePool
{
public:

    resourcePool () : mElements (0) {
        mElements.reserve(128);
    }

    void push(const std::shared_ptr<T> ele)
    {
        std::lock_guard<std::mutex> lock(mMutex);
        mElements.push_back(ele);
    }

    size_t size() const // NOTE! not thread safe
    {
        return mElements.size();
    }

    std::shared_ptr<T> pop()
    {
        // it returns a shared pointer if elements exists in the pool.
        // if no elements exists in the queue it returns a null
        std::lock_guard<std::mutex> lock(mMutex);
        if (mElements.size() > 0) {
            auto ptr = mElements.back();
            mElements.pop_back();
            return ptr;
        } else {
            return nullptr;
        }
    }
private:
    std::vector<std::shared_ptr<T>> mElements {};
    std::mutex                      mMutex {};
};

// task function pops elements from resource pool, and does work with the returned object.
// task function also counts the number of shared pointers managing a given resource_t object.
void task(std::shared_ptr<resourcePool<resource_t>> resources, const size_t numIterations, const size_t threadIdx)
{
    for (uint32_t itr = 0; itr < numIterations; ++itr) {
        auto ele = resources->pop();
        if (nullptr == ele) {
            ele = std::make_shared<resource_t> ();
        }
        auto use_count = ele.use_count();
        if (use_count > ele->sharedPtrUseCountMax) {
            ele->sharedPtrUseCountMax = use_count;
        }
        ele->doSomeWork( threadIdx );
        resources->push(ele);
    }
}

int runTest() {

    auto resources = std::make_shared<resourcePool<resource_t>> ();

    std::vector<std::thread> threads {};
    threads.reserve(numThreads);

    // create a number of threads executing a task for a given number of counts
    for (uint32_t idx = 0; idx < numThreads; ++idx) {
        threads.emplace_back ( std::thread (task, resources, numIterations, idx) );
    }

    for (uint32_t idx = 0; idx < threads.size(); ++idx) {
        threads[idx].join();
    }

    size_t workCntTotal = 0;
    size_t workCntPerThreadTotal = 0;
    uint32_t idx = 0;
    while (0 < resources->size()) {
        auto ele = resources->pop();
        workCntTotal += ele->get_workCount();
        std::cout << "idx = " << idx++ << ",\t ";
        auto const& workCountPerThread = ele->get_workCountPerThread();
        std::cout << "workCountPerThread = {";
        for ( auto x : workCountPerThread) {
            std::cout << x << ", ";
            workCntPerThreadTotal += x;
        }
        std::cout << "}";
        std::cout << ",  use_count = " << ele->sharedPtrUseCountMax << std::endl;
    }

    std::cout << "workCntTotal = " << workCntTotal << ", workCntPerThreadTotal = " << workCntPerThreadTotal << std::endl;
    std::cout << "numThreads x numIterations = " << numIterations * numThreads << std::endl;

    return 0;
}

int main() {
    runTest ();
}

Sample output of the above program is:

idx = 0,     workCountPerThread = {2839, 4543, 3877, 3356, 5566, 3813, 7626, 4153, 3655, 4840, 3835, 4123, 4109, 4618, 4002, 9577, },  use_count = 3
idx = 1,     workCountPerThread = {4072, 3732, 4325, 3332, 9313, 3249, 5995, 5448, 2964, 5220, 4172, 4081, 6152, 2942, 5023, 4985, },  use_count = 5
idx = 2,     workCountPerThread = {4800, 7102, 2673, 4104, 7960, 5187, 5742, 3832, 5280, 3606, 3303, 3389, 2003, 4444, 5722, 4035, },  use_count = 3
idx = 3,     workCountPerThread = {4096, 7805, 4252, 4594, 4002, 3125, 4292, 4666, 3825, 4495, 2161, 5365, 5164, 3781, 3321, 6327, },  use_count = 3
idx = 4,     workCountPerThread = {3657, 3713, 4737, 5228, 4203, 2717, 3916, 4275, 2868, 2690, 4345, 3566, 2390, 5024, 2204, 4337, },  use_count = 3
idx = 5,     workCountPerThread = {4928, 3144, 4156, 4475, 2776, 3734, 3357, 4760, 3235, 3585, 5092, 4457, 5287, 4911, 3798, 2933, },  use_count = 5
idx = 6,     workCountPerThread = {5342, 3434, 4827, 4525, 2176, 4298, 2294, 4353, 4097, 3801, 3873, 4414, 5723, 4264, 3967, 2307, },  use_count = 3
idx = 7,     workCountPerThread = {4450, 3762, 4267, 4280, 3113, 3097, 4653, 3300, 5604, 3802, 5051, 4055, 3692, 3580, 4142, 2501, },  use_count = 3
idx = 8,     workCountPerThread = {4116, 4284, 4403, 3897, 3380, 4513, 3379, 3559, 4427, 3834, 5067, 4048, 4101, 3062, 4108, 3884, },  use_count = 3
idx = 9,     workCountPerThread = {4227, 2615, 3758, 5343, 2286, 5285, 5209, 3265, 5280, 2531, 4693, 2235, 4388, 4807, 4067, 3360, },  use_count = 3
idx = 10,    workCountPerThread = {4041, 3919, 2889, 4778, 4460, 4703, 2312, 3669, 4005, 5153, 3193, 3855, 4387, 5364, 3309, 2590, },  use_count = 3
idx = 11,    workCountPerThread = {3729, 4244, 3518, 3321, 2015, 4229, 4562, 4184, 5638, 3982, 3295, 5072, 2613, 3240, 4273, 4290, },  use_count = 3
idx = 12,    workCountPerThread = {4178, 2769, 5295, 4813, 3157, 7028, 2592, 3940, 4445, 4659, 3458, 5135, 3505, 3589, 2913, 3081, },  use_count = 3
idx = 13,    workCountPerThread = {3118, 3299, 4062, 3682, 2478, 2256, 3203, 5424, 4324, 3659, 3301, 4987, 3623, 4396, 5445, 4931, },  use_count = 5
idx = 14,    workCountPerThread = {5340, 2436, 4539, 3176, 2282, 5005, 3359, 3000, 3464, 5798, 4333, 4369, 4799, 4417, 5203, 2900, },  use_count = 3
idx = 15,    workCountPerThread = {2602, 4734, 3957, 2631, 6368, 3296, 3044, 3707, 2424, 3880, 6363, 2384, 3599, 3096, 4038, 3497, },  use_count = 3
workCntTotal = 1048560, workCntPerThreadTotal = 1048560
numThreads x numIterations = 1048560

My expectation is that the last column in the output (use_count) which is the maximum of number of shared pointers managing a given object is 1. However I see it varying. Any idea why we have more than one shared_pointer managing the same object?


Solution

  • Consider the line main loop of function task

    resources->push(ele);
    

    And note that a copy of ele is still present immediately after the push concludes, so you'll have an increased number of use_count by 1 for a moment.

    The push function is defined as

    void push(const std::shared_ptr<T> ele) // creates an instance on call
    {
        std::lock_guard<std::mutex> lock(mMutex);
        mElements.push_back(ele); // creates another copy on call
    } // destroys element on scope exit after the mutex was unlocked
    

    So, the use_count might be elevated by 2 for a short moment. Which, given enough opportunities, will eventually happen.

    To fix the issues, utilize move semantics. Fix the line with push call

    resources->push(std::move(ele));
    

    And modify the push function:

    void push(std::shared_ptr<T> ele)
    {
        std::lock_guard<std::mutex> lock(mMutex);
        mElements.emplace_back(std::move(ele));
    }
    

    This way, you'll never have any extra copies in this run.

    You can also improve pop call with

    auto ptr = std::move(mElements.back());
    mElements.pop_back();
    

    So, you remove the unnecessary copying and destruction of the shared pointer and switch it into a move of shared pointer and destruction of an empty shared pointer.