c++c++11boostmultitaskingfiber

Multiple Shared Work Pools With Boost::Fiber


I have been looking into boost::fibers as a method for dealing with some of my problems with data processing and IO. The shared_work scheduler in particular looks promising because it would let me spin up one data processing task for every data processing source and then let them distribute each other as needed across a few threads.

However this brings me to the source of my question: It looks like I can only have one shared_work 'pool' per process. What do I do if I want to have a set of 12 fibers in a processing data shared among 4 threads while, at the same time, a different set of 12 fibers are writing processed data to file shared among another 4 threads.

Something like:

#include<string>
#include<iostream>
#include<vector>
#include<mutex>
#include<thread>
#include<random>
#include<map>
#include<sstream>
#include<boost/bind.hpp>
#include<boost/fiber/all.hpp>

typedef boost::fibers::fiber FiberType;
typedef std::unique_lock<boost::fibers::mutex> LockType;


static const int fiberIterationCount = 5000;
static const int fiberCount          = 12;
static const int threadCount         = 4;
static const int distLowerLimit      = 50;
static const int distUpperLimit      = 500;

static boost::fibers::mutex firstMutex{};
static boost::fibers::mutex secondMutex{};
static boost::fibers::condition_variable firstCondition{};
static boost::fibers::condition_variable secondCondition{};
static boost::fibers::barrier synchronize{2*threadCount};
static int typeOneFibersFinished{0};
static int typeTwoFibersFinished{0};

static std::mt19937 typeOneGenerators[fiberCount];
static std::mt19937 typeTwoGenerators[fiberCount];

static std::mutex typeMapMutex;//lock for writing unnecessary for reads
static std::map<std::thread::id, std::string> threadTypeMap;


//simple function to give a heavy cpu load of variable duration
unsigned long long findPrimeNumber(int n)
{
    int count=0;
    unsigned long long a = 2;
    while(count<n)
    {
        bool isPrime = true;
        for(unsigned long long b = 2; (b * b) <= a; ++b)
        {
            if((a % b) == 0)
            {
                isPrime = false;
                break;
            }
        }
        if(isPrime)
        {
            count++;
        }
        ++a;
    }
    return (a - 1);
}

void fiberTypeOne(int fiberNumber)
{
    std::cout<<"Starting Type One Fiber #"<<fiberNumber;
    std::uniform_int_distribution<int> dist(distLowerLimit, distUpperLimit);
    for(int i=0; i<fiberIterationCount; ++i)
    {
        //generate a randomish load on this fiber so that it does not take a regular time slice
        int tempPrime = dist(typeOneGenerators[fiberNumber]);
        unsigned long long temp = findPrimeNumber(tempPrime);
        std::cout << "T1 fiber #"<<fiberNumber<<" running on "<<threadTypeMap[std::this_thread::get_id()]
                  <<"\n    Generated: "<<tempPrime<<", "<<temp;
        boost::this_fiber::yield();
    }

    {
        LockType lock(firstMutex);
        ++typeOneFibersFinished;
    }
    firstCondition.notify_all();
}

void threadTypeOne(int threadNumber)
{
    //make a shared work scheduler that associates its fibers with "fiber pool 0"
    boost::fibers::use_scheduling_algorithm< multi_pool_scheduler<0> >();
    std::cout<<"Starting Type One Thread #"<<threadNumber<<" With Thread ID: "<<std::this_thread::get_id();

    {
        std::unique_lock<std::mutex> lock{typeMapMutex};
        std::ostringstream gen;
        gen<<"Thread Type 1 - Number: "<<threadNumber<<" with id: "<<std::this_thread::get_id();
        threadTypeMap[std::this_thread::get_id()] = gen.str();
    }
    if(threadNumber == 0)
    { //if we are thread zero, create the fibers then join them to take ourselves off the "fiber list"
        std::cout<<"Spawning Type One Fibers";
        for(int fiberNumber=0; fiberNumber<fiberCount; ++fiberNumber)
        {//create the fibers and instantly detach them
            FiberType(boost::bind(&fiberTypeOne, fiberNumber)).detach();
        }
    }
    synchronize.wait();
    std::cout<<"T1 Thread preparing to wait";
    //now let the fibers do their thing
    LockType lock(firstMutex);
    firstCondition.wait(lock, [](){return (typeOneFibersFinished == fiberCount);});
}

void fiberTypeTwo(int fiberNumber)
{
    std::cout<<"Starting Type Two Fiber #"<<fiberNumber;
    std::uniform_int_distribution<int> dist(distLowerLimit, distUpperLimit);
    for(int i=0; i<fiberIterationCount; ++i)
    {
        //generate a randomish load on this fiber so that it does not take a regular time slice
        int tempPrime = dist(typeTwoGenerators[fiberNumber]);
        unsigned long long temp = findPrimeNumber(tempPrime);
        std::cout << "T2 fiber #"<<fiberNumber<<" running on "<<threadTypeMap[std::this_thread::get_id()]
                  <<"\n    Generated: "<<tempPrime<<", "<<temp;
        boost::this_fiber::yield();
    }

    {
        LockType lock(secondMutex);
        ++typeTwoFibersFinished;
    }
    secondCondition.notify_all();
}

void threadTypeTwo(int threadNumber)
{
    //make a shared work scheduler that associates its fibers with "fiber pool 1"
    boost::fibers::use_scheduling_algorithm< multi_pool_scheduler<1> >();
    std::cout<<"Starting Type Two Thread #"<<threadNumber<<" With Thread ID: "<<std::this_thread::get_id();
    {
        std::unique_lock<std::mutex> lock{typeMapMutex};
        std::ostringstream gen;
        gen<<"Thread Type 2 - Number: "<<threadNumber<<" with id: "<<std::this_thread::get_id();
        threadTypeMap[std::this_thread::get_id()] = gen.str();
    }
    if(threadNumber == 0)
    { //if we are thread zero, create the fibers then join them to take ourselves off the "fiber list"
        std::cout<<"Spawning Type Two Fibers";
        for(int fiberNumber=0; fiberNumber<fiberCount; ++fiberNumber)
        {//create the fibers and instantly detach them
            FiberType(boost::bind(&fiberTypeTwo, fiberNumber)).detach();
        }
    }
    synchronize.wait();
    std::cout<<"T2 Thread preparing to wait";
    //now let the fibers do their thing
    LockType lock(secondMutex);
    secondCondition.wait(lock, [](){return (typeTwoFibersFinished == fiberCount);});
}

int main(int argc, char* argv[])
{
    std::cout<<"Initializing Random Number Generators";
    for(unsigned i=0; i<fiberCount; ++i)
    {
        typeOneGenerators->seed(i*500U - 1U);
        typeTwoGenerators->seed(i*1500U - 1U);
    }

    std::cout<<"Commencing Main Thread Startup Startup";
    std::vector<std::thread> typeOneThreads;
    std::vector<std::thread> typeTwoThreads;
    for(int i=0; i<threadCount; ++i)
    {
        typeOneThreads.emplace_back(std::thread(boost::bind(&threadTypeOne, i)));
        typeTwoThreads.emplace_back(std::thread(boost::bind(&threadTypeTwo, i)));
    }
    //now let the threads do their thing and wait for them to finish with join
    for(unsigned i=0; i<threadCount; ++i)
    {
        typeOneThreads[i].join();
    }
    for(unsigned i=0; i<threadCount; ++i)
    {
        typeTwoThreads[i].join();
    }
    std::cout<<"Shutting Down";
    return 0;
}

Is this possible without writing your own fiber scheduler? If so, how?


Solution

  • I determined that I did require writing my own scheduler. However, the actual amount of work was minimal. The boost::fibers::shared_work scheduler manages the list of fibers that are shared between threads using a single static queue, guarded by a static mutex. There is another queue that governs the main fiber for each thread (since each thread has its own scheduler) but that is local to the class instance instead of shared between all instances of the class the way the static members are.

    The solution then, to prevent the static queue and lock from being shared between separate sets of threads, is to put a, mostly useless, template parameter in front of the class. Then each thread passes a different parameter to this template. In this fashion, since you get a different object for every specialization of the template, you get different set of static variables for each instantiation with a different pool number.

    Below is my implementation of this solution, (mostly a copy of boost::fiber::shared_work with a few variables and types more clearly named and the template parameter added).

    #include <condition_variable>
    #include <chrono>
    #include <deque>
    #include <mutex>
    #include <boost/config.hpp>
    #include <boost/fiber/algo/algorithm.hpp>
    #include <boost/fiber/context.hpp>
    #include <boost/fiber/detail/config.hpp>
    #include <boost/fiber/scheduler.hpp>
    #include <boost/assert.hpp>
    #include "boost/fiber/type.hpp"
    
    #ifdef BOOST_HAS_ABI_HEADERS
    #  include BOOST_ABI_PREFIX
    #endif
    
    #ifdef _MSC_VER
    # pragma warning(push)
    # pragma warning(disable:4251)
    #endif
    
    /*!
    * @class SharedWorkPool
    * @brief A scheduler for boost::fibers that operates in a manner similar to the
    * shared work scheduler, except that it takes a template parameter determining
    * which pool to draw fibers from. In this fashion, one group of threads can share
    * a pool of fibers among themselves while another group of threads can work with
    * a completely separate pool
    * @tparam PoolNumber The index of the pool number for this thread
    */
    template <int PoolNumber>
    class SharedWorkPool : public boost::fibers::algo::algorithm
    {
        typedef std::deque<boost::fibers::context * >      ReadyQueueType;
        typedef boost::fibers::scheduler::ready_queue_type LocalQueueType;
        typedef std::unique_lock<std::mutex>               LockType;
    
    public:
        SharedWorkPool() = default;
        ~SharedWorkPool() override {}
    
        SharedWorkPool( bool suspend) : suspendable{suspend}{}
    
        SharedWorkPool( SharedWorkPool const&) = delete;
        SharedWorkPool( SharedWorkPool &&) = delete;
    
        SharedWorkPool& operator=(const SharedWorkPool&) = delete;
        SharedWorkPool& operator=(SharedWorkPool&&) = delete;
    
        void awakened(boost::fibers::context* ctx) noexcept override;
    
        boost::fibers::context* pick_next() noexcept override;
    
        bool has_ready_fibers() const noexcept override
        {
            LockType lock{readyQueueMutex};
            return ((!readyQueue.empty()) || (!localQueue.empty()));
        }
    
        void suspend_until(const std::chrono::steady_clock::time_point& timePoint) noexcept override;
    
        void notify() noexcept override;
    
    private:
        static ReadyQueueType readyQueue;
        static std::mutex     readyQueueMutex;
    
        LocalQueueType          localQueue{};
        std::mutex              instanceMutex{};
        std::condition_variable suspendCondition{};
        bool                    waitNotifyFlag{false};
        bool                    suspendable{false};
    
    };
    
    template <int PoolNumber>
    void SharedWorkPool<PoolNumber>::awakened(boost::fibers::context* ctx) noexcept
    {
        if(ctx->is_context(boost::fibers::type::pinned_context))
        { // we have been passed the thread's main fiber, never put those in the shared queue
            localQueue.push_back(*ctx);
        }
        else
        {//worker fiber, enqueue on shared queue
            ctx->detach();
            LockType lock{readyQueueMutex};
            readyQueue.push_back(ctx);
        }
    }
    
    
    template <int PoolNumber>
    boost::fibers::context* SharedWorkPool<PoolNumber>::pick_next() noexcept
    {
        boost::fibers::context * ctx = nullptr;
        LockType lock{readyQueueMutex};
        if(!readyQueue.empty())
        { //pop an item from the ready queue
            ctx = readyQueue.front();
            readyQueue.pop_front();
            lock.unlock();
            BOOST_ASSERT( ctx != nullptr);
            boost::fibers::context::active()->attach( ctx); //attach context to current scheduler via the active fiber of this thread
        }
        else
        {
            lock.unlock();
            if(!localQueue.empty())
            { //nothing in the ready queue, return main or dispatcher fiber
                ctx = & localQueue.front();
                localQueue.pop_front();
            }
        }
        return ctx;
    }
    
    template <int PoolNumber>
    void SharedWorkPool<PoolNumber>::suspend_until(const std::chrono::steady_clock::time_point& timePoint) noexcept
    {
        if(suspendable)
        {
            if (std::chrono::steady_clock::time_point::max() == timePoint)
            {
                LockType lock{instanceMutex};
                suspendCondition.wait(lock, [this](){return waitNotifyFlag;});
                waitNotifyFlag = false;
            }
            else
            {
                LockType lock{instanceMutex};
                suspendCondition.wait_until(lock, timePoint, [this](){return waitNotifyFlag;});
                waitNotifyFlag = false;
            }
        }
    }
    
    template <int PoolNumber>
    void SharedWorkPool<PoolNumber>::notify() noexcept
    {
        if(suspendable)
        {
            LockType lock{instanceMutex};
            waitNotifyFlag = true;
            lock.unlock();
            suspendCondition.notify_all();
        }
    }
    
    template <int PoolNumber>
    std::deque<boost::fibers::context*> SharedWorkPool<PoolNumber>::readyQueue{};
    
    template <int PoolNumber>
    std::mutex SharedWorkPool<PoolNumber>::readyQueueMutex{};
    

    Note, I am not entirely sure what will happen if you try to use the same pool number from declarations in different compilation units. However, under normal circumstances, i.e. you have only written boost::fibers::use_scheduling_algorithm< Threads::Fibers::SharedWorkPool<WorkPoolNumber> >(); in a single location for each WorkPoolNumber, it works perfectly. Fibers assigned to a given set of threads always run within the same set of threads, never being run by a different set of threads.