c++tbbtbb-flow-graph

parallel_pipeline not terminating


i am using parallel_pipeline function in my code.Sometimes when my condition is satisfied it stops the pipeline and sometimes it does not.When the flow control calls stop even after that it does not terminate instead it calls its next part and prints on console and then console output becomes like it is been staying in infinite loop and doing nothing.

Code is :

#include <iostream> 
#include <sstream>
#include <fstream>
#include <vector>
#include <algorithm>
#include <tbb/pipeline.h>
#include <tbb/atomic.h>
#include <tbb/concurrent_queue.h>
#include <tbb/compat/thread>
#include <tbb/tbbmalloc_proxy.h>    
#include <tbb/tick_count.h>
using namespace std;
using namespace tbb;

#define pi 3.141593
#define FILTER_LEN  265

double coeffs[ FILTER_LEN ] =
{
  0.0033473431384214393,0.000032074683390218124,0.0033131082058404943,0.0024777666109278788,
  -0.0008968429179843104,-0.0031973449396977684,-0.003430943381749411,-0.0029796565504781646,
  -0.002770673157048994,-0.0022783059845596586,-0.0008531818129514857,0.001115432556294998,
  0.0026079871108133294,0.003012423848769931,0.002461420635709332,0.0014154004589753215,
  0.00025190669718400967,-0.0007608257014963959,-0.0013703600874774068,-0.0014133823230551277,
  -0.0009759556503342884,-0.00039687498737139273,-0.00007527524701314324,-0.00024181463305012626,
  -0.0008521761947454302,-0.00162618205097997,-0.002170446498273018,-0.002129903305507943,
  -0.001333859049002249,0.00010700092934983156,0.0018039564602637683,0.0032107930896349583,
  0.0038325849735515363,0.003416201274366522,0.002060848732332109,0.00017954815260431595,
  -0.0016358832300944531,-0.0028402136847527387,-0.0031256650498727384,-0.0025374271571154713,
  -0.001438370315670195,-0.00035115295209013755,0.0002606730012030533,0.0001969569787142967,
  -0.00039635535951198597,-0.0010886127490608972,-0.0013530057243606405,-0.0008123200399262436,
  0.0005730271959526784,0.0024419465938120906,0.004133717273258681,0.0049402122577746265,
  0.0043879285604252714,0.002449549610687005,-0.00040283102645093463,-0.003337730734820209,
  -0.0054508346511294775,-0.006093057767824609,-0.005117609782189977,-0.0029293645861970417,
  -0.0003251033117661085,0.0018074390555649442,0.0028351284091668164,0.002623563404428517,
  0.0015692864792199496,0.0004127664681096788,-0.00009249878881824428,0.0004690173244168184,
  0.001964334172374759,0.0037256715492873485,0.004809640399145206,0.004395274594482053,
  0.0021650921193604,-0.0014888595443799124,-0.005534807968511709,-0.008642334104607624,
  -0.009668950651149259,-0.008104732391434574,-0.004299972815463919,0.0006184612821881392,
  0.005136551428636121,0.007907786753766152,0.008241212326068366,0.00634786595941524,
  0.003235610213062744,0.00028882736660937287,-0.001320994685952108,-0.0011237433853145615,
  0.00044213409507615003,0.0022057106517524255,0.00277593527678719,0.0011909915058737617,
  -0.0025807757230413447,-0.007497632882437637,-0.011739520895818884,-0.013377018279057393,
  -0.011166543231844196,-0.005133056165990026,0.0032948631959114935,0.011673660427968408,
  0.017376415708412904,0.018548938130314566,0.014811760899506572,0.007450782505155853,
  -0.001019540069785369,-0.007805775815783898,-0.010898333714715424,-0.00985364043415772,
  -0.005988406030111452,-0.001818560524968024,0.000028552677472614846,-0.0019938756495376363,
  -0.007477684025727061,-0.013989430449615033,-0.017870518868849213,-0.015639422062597726,
  -0.005624959109456065,0.010993528170353541,0.03001263681283932,0.04527492462846608,
  0.050581340787164114,0.041949186532860346,0.019360612460662185,-0.012644336735920483,
  -0.0458782599058412,-0.07073838953156347,-0.0791205623455818,-0.06709535677423759,
  -0.03644544574795176,0.005505370370858695,0.04780486657828151,0.07898800597378192,
  0.0904453420042807,0.07898800597378192,0.04780486657828151,0.005505370370858695,
  -0.03644544574795176,-0.06709535677423759,-0.0791205623455818,-0.07073838953156347,
  -0.0458782599058412,-0.012644336735920483,0.019360612460662185,0.041949186532860346,
  0.050581340787164114,0.04527492462846608,0.03001263681283932,0.010993528170353541,
  -0.005624959109456065,-0.015639422062597726,-0.017870518868849213,-0.013989430449615033,
  -0.007477684025727061,-0.0019938756495376363,0.000028552677472614846,-0.001818560524968024,
  -0.005988406030111452,-0.00985364043415772,-0.010898333714715424,-0.007805775815783898,
  -0.001019540069785369,0.007450782505155853,0.014811760899506572,0.018548938130314566,
  0.017376415708412904,0.011673660427968408,0.0032948631959114935,-0.005133056165990026,
  -0.011166543231844196,-0.013377018279057393,-0.011739520895818884,-0.007497632882437637,
  -0.0025807757230413447,0.0011909915058737617,0.00277593527678719,0.0022057106517524255,
  0.00044213409507615003,-0.0011237433853145615,-0.001320994685952108,0.00028882736660937287,
  0.003235610213062744,0.00634786595941524,0.008241212326068366,0.007907786753766152,
  0.005136551428636121,0.0006184612821881392,-0.004299972815463919,-0.008104732391434574,
  -0.009668950651149259,-0.008642334104607624,-0.005534807968511709,-0.0014888595443799124,
  0.0021650921193604,0.004395274594482053,0.004809640399145206,0.0037256715492873485,
  0.001964334172374759,0.0004690173244168184,-0.00009249878881824428,0.0004127664681096788,
  0.0015692864792199496,0.002623563404428517,0.0028351284091668164,0.0018074390555649442,
  -0.0003251033117661085,-0.0029293645861970417,-0.005117609782189977,-0.006093057767824609,
  -0.0054508346511294775,-0.003337730734820209,-0.00040283102645093463,0.002449549610687005,
  0.0043879285604252714,0.0049402122577746265,0.004133717273258681,0.0024419465938120906,
  0.0005730271959526784,-0.0008123200399262436,-0.0013530057243606405,-0.0010886127490608972,
  -0.00039635535951198597,0.0001969569787142967,0.0002606730012030533,-0.00035115295209013755,
  -0.001438370315670195,-0.0025374271571154713,-0.0031256650498727384,-0.0028402136847527387,
  -0.0016358832300944531,0.00017954815260431595,0.002060848732332109,0.003416201274366522,
  0.0038325849735515363,0.0032107930896349583,0.0018039564602637683,0.00010700092934983156,
  -0.001333859049002249,-0.002129903305507943,-0.002170446498273018,-0.00162618205097997,
  -0.0008521761947454302,-0.00024181463305012626,-0.00007527524701314324,-0.00039687498737139273,
  -0.0009759556503342884,-0.0014133823230551277,-0.0013703600874774068,-0.0007608257014963959,
  0.00025190669718400967,0.0014154004589753215,0.002461420635709332,0.003012423848769931,
  0.0026079871108133294,0.001115432556294998,-0.0008531818129514857,-0.0022783059845596586,
  -0.002770673157048994,-0.0029796565504781646,-0.003430943381749411,-0.0031973449396977684,
  -0.0008968429179843104,0.0024777666109278788,0.0033131082058404943,0.000032074683390218124,
  0.0033473431384214393
};

class MyBuffer 
{
    public:
    double *acc;
    double *buffer;
    int start,end;
    static int j;

    MyBuffer()
    {
        start=0;
        end=0;

       buffer=new double[150264];
       acc=new double[150000];
       fill_n(buffer,150264,0);

    }

    ~MyBuffer()
    {
        delete[] buffer;
        delete[] acc;
    }

    int startnumber()
    {
        return start;
    }

    int endnumber()
    {
        return end;
    }

};

typedef concurrent_bounded_queue<MyBuffer>  QueueMyBufferType;
QueueMyBufferType chunk_queue;

atomic<bool> stop_flag;
atomic<bool> stop_filter;

int MyBuffer::j=0;
int queueloopcount=30;

void input_function()                               
{   
   stop_flag = false;     

   cout<<"thread reached to call input function " <<endl;
   ofstream o("testing sinewave.csv");

   int counter=0;
   while(counter<150000)    
  { 
    //  cout<<"value of counter is \t" <<counter << endl;

        MyBuffer *b=new MyBuffer;                                                       
        b->start=(FILTER_LEN-1+(counter));
        b->end=(5264+(counter));

    //  cout<<"value of b.start is and b.end is "<<b->start<<"\t" <<b->end<<endl;

        for(int i =b->startnumber(); i <b->endnumber(); i++)
         {
               b->buffer[i] = sin(700 * (2 * pi) * (i / 5000.0));
               o<<b->buffer[i]<<endl;   
         }
         chunk_queue.push(*b);

         counter+=5000;
        // cout<<"value of queueloopcount is "<< queueloopcount << endl;
     }

     cout<<"all data is perfectly generated" <<endl;
}

int main()
{

    int ntokens = 8;

    thread inputfunc(input_function);
    tick_count t1,t2;
    ofstream o("filter700Hz.csv");
    t1=tick_count::now();

     bool stop_pipeline = false;    
     stop_filter=false;


     inputfunc.join();

     parallel_pipeline(ntokens,make_filter<void,MyBuffer*>
     ( 
             filter::parallel,[&](flow_control& fc)->MyBuffer*
         {
             if(queueloopcount==0)
             {
                 fc.stop();
                 cout<<"pipeline stopped"<<endl;
             }
             else
             {
                  MyBuffer *b=new MyBuffer;
                  chunk_queue.pop(*b);
                 {
                     cout<<"value of start and end popped is "<<b->startnumber()<<"\t"<<b->endnumber()<<endl;
                     queueloopcount--;
                 }
                 return b;
             }
         }
     )&

    make_filter<MyBuffer*, void>
    (
        filter::serial,[&](MyBuffer* b)
        {
             cout<<"value of second filter start is and end is \t "<< b->startnumber() << "\t" << b->endnumber() <<endl;
        }
     )
    );

        cout<<"now i am out" <<endl;
    o.close();
    t2=tick_count::now();

    cout << "\n Time elapsed is \n\n" <<(t2-t1).seconds()<<endl;        

    return 0;
}

please help to find where code is wrong.


Solution

  • The Problem in this code is with filter i.e parallel in first stage that is causing problem to flow_control object which tries to pop and it is a blocking call due to which it blocks and solution to this is you should probably have a serial first filter that only create an empty MyBuffer* and stop the pipeline if no more work is due. Then have a parallel second filter that performs the real work and finally a serial (in-order) output stage.