cmultithreadingsortingpthreadsradix-sort

Threaded radixsort not faster


I´m having trouble understanding why my implementation is not faster than on a single thread.

I´m trying to implement a bitwise MSB-radixsort(which is working for now) with a certain number (n) of threads. In my approach I create a new thread up until log2(n) within the recursion. But this seems to just increase the time cause of the creation of the thread. I`m guessing that the created threads can not work at the same time on the array, even with the splitted responsibility within it.

If someone could explain it I would really appreciate it. Thanks!!

UPDATE ::

As it turns out I was missing the most obvious Solution as Ulrich commented. I was using a VM where it was locked to one CPU, which I did not know it did. After changing it I got the desired speedup for the radixsort.

Thank you all for the comments and general info on that topic. ^^


Solution

  • The maximum number of threads should be limited to the number of cores on your system, or two times the number of cores if each core has hyper-threading. You've since updated your question to note that you only had one core using a virtual machine, and the multi-threaded version is faster with multiple cores.

    Other potential issues: if the distribution of the data is not uniform, then splitting into bins will result in uneven sized bins. Using a base larger than 2, such as 256, will reduce this effect somewhat. On the initial pass, there can be cache line conflicts, where each write to a cache line on one core will invalidate all instances of that cache line on other cores.

    Example code that uses a single thread to do a MSD first radix sort base 256, to create 256 bins, which are then sorted using LSD first radix sort, using 4 threads. This example uses Windows specific thread API. This relies on having reasonably uniform data so that the 256 bins are somewhat equal in size. I didn't try to use multi-threading for the initial MSD pass used to create the 256 bins, as one potential issue is cache line conflicts on the array of index updates, which would need an atomic increment operation, which may limit the performance benefit.

    #include <iostream>
    #include <windows.h>
    
    #define COUNT (6000*6000)            // number of values to sort
    
    #define QP 1        // if != 0, use queryperformance for timer
    
    #if QP
    #include <math.h>
    #pragma comment(lib, "winmm.lib")
    typedef LARGE_INTEGER LI64;
    #else
    #include <ctime>
    #endif
    
    #if QP
    static LI64     liQPFrequency;  // cpu counter values
    static LI64     liStartTime;
    static LI64     liStopTime;
    static double   dQPFrequency;
    static double   dStartTime;
    static double   dStopTime;
    static double   dElapsedTime;
    #else
    clock_t ctTimeStart;            // clock values
    clock_t ctTimeStop;
    #endif
    
    typedef unsigned       int uint32_t;
    typedef unsigned long long uint64_t;
    
    static HANDLE hs0;                      // semaphore handles
    static HANDLE hs1;
    static HANDLE hs2;
    static HANDLE hs3;
    
    static HANDLE ht1;                      // thread handles
    static HANDLE ht2;
    static HANDLE ht3;
    
    static uint64_t *pa, *pb;               // ptrs to arrays
    static uint32_t aIndex[260];            // start + end indexes, 256 bins
    static uint32_t aIi;                    // current index for 4 bins
    
    static DWORD WINAPI Thread0(LPVOID lpvoid);
    
    void RadixSort(uint64_t * a, uint64_t *b, size_t count)
    {
    uint32_t i,m,n;
    uint64_t u;
        for(i = 0; i < count; i++)          // generate histogram
            aIndex[(size_t)(a[i] >> 56)]++;
        m = 0;                              // convert to indexes
        for(i = 0; i < 257; i++){           //  including end of bin[255]
            n = aIndex[i];
            aIndex[i] = m;
            m += n;
        }
        for(i = 0; i < count; i++){         // sort by msb
            u = a[i];
            m = (uint32_t)(u >> 56);
            b[aIndex[m]++] = u;
        }
        for(i = 256; i; i--)                // restore aIndex
            aIndex[i] = aIndex[i-1];
        aIndex[0] = 0;
        for(aIi = 0; aIi < 256; aIi += 4){
            ReleaseSemaphore(hs1, 1, NULL);     // start threads
            ReleaseSemaphore(hs2, 1, NULL);
            ReleaseSemaphore(hs3, 1, NULL);
            Thread0(NULL);
            WaitForSingleObject(hs0, INFINITE); // wait for threads done
            WaitForSingleObject(hs0, INFINITE);
            WaitForSingleObject(hs0, INFINITE);
        }        
    }
    
    void RadixSort7(uint64_t * a, uint64_t *b, size_t count)
    {
    uint32_t mIndex[7][256] = {0};          // count / index matrix
    uint32_t i,j,m,n;
    uint64_t u;
        for(i = 0; i < count; i++){         // generate histograms
            u = a[i];
            for(j = 0; j < 7; j++){
                mIndex[j][(size_t)(u & 0xff)]++;
                u >>= 8;
            }
        }
        for(j = 0; j < 7; j++){             // convert to indices
            m = 0;
            for(i = 0; i < 256; i++){
                n = mIndex[j][i];
                mIndex[j][i] = m;
                m += n;
            }       
        }
        for(j = 0; j < 7; j++){             // radix sort
            for(i = 0; i < count; i++){     //  sort by current lsb
                u = a[i];
                m = (size_t)(u>>(j<<3))&0xff;
                b[mIndex[j][m]++] = u;
            }
            std::swap(a, b);                //  swap ptrs
        }
    }
    
    static DWORD WINAPI Thread0(LPVOID lpvoid)
    {
        RadixSort7(pb + aIndex[aIi+0], pa + aIndex[aIi+0], aIndex[aIi+1]-aIndex[aIi+0]);
        return 0;
    }
    
    static DWORD WINAPI Thread1(LPVOID lpvoid)
    {
        while(1){
            WaitForSingleObject(hs1, INFINITE); // wait for semaphore
            RadixSort7(pb + aIndex[aIi+1], pa + aIndex[aIi+1], aIndex[aIi+2]-aIndex[aIi+1]);
            ReleaseSemaphore(hs0, 1, NULL);     // indicate done
            if(aIi == 252)                      // exit if all bins done
                return 0;
        }
    }
    
    static DWORD WINAPI Thread2(LPVOID lpvoid)
    {
        while(1){
            WaitForSingleObject(hs2, INFINITE); // wait for semaphore
            RadixSort7(pb + aIndex[aIi+2], pa + aIndex[aIi+2], aIndex[aIi+3]-aIndex[aIi+2]);
            ReleaseSemaphore(hs0, 1, NULL);     // indicate done
            if(aIi == 252)                      // exit if all bins done
                return 0;
        }
    }
    
    static DWORD WINAPI Thread3(LPVOID lpvoid)
    {
        while(1){
            WaitForSingleObject(hs3, INFINITE); // wait for semaphore
            RadixSort7(pb + aIndex[aIi+3], pa + aIndex[aIi+3], aIndex[aIi+4]-aIndex[aIi+3]);
            ReleaseSemaphore(hs0, 1, NULL);     // indicate done
            if(aIi == 252)                      // exit if all bins done
                return 0;
        }
    }
    
    uint64_t rnd64()                        // random 64 bit integer
    {
    static uint64_t r = 1ull;
        r = r * 6364136223846793005ull + 1442695040888963407ull;
        return r;
    }
    
    int main( )
    {
        uint64_t * a = new uint64_t [COUNT];    // allocate data array
        uint64_t * b = new uint64_t [COUNT];    // allocate temp array
        size_t i;
        pa = a;                                 // set global ptrs
        pb = b;
    
        hs0 = CreateSemaphore(NULL,0,7,NULL);   // create semaphores
        hs1 = CreateSemaphore(NULL,0,1,NULL);
        hs2 = CreateSemaphore(NULL,0,1,NULL);
        hs3 = CreateSemaphore(NULL,0,1,NULL);
    
        ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0);  // create threads
        ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
        ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);
    
        for(i = 0; i < COUNT; i++){             // generate data
            a[i] = rnd64();
        }
    
    #if QP
        QueryPerformanceFrequency(&liQPFrequency);
        dQPFrequency = (double)liQPFrequency.QuadPart;
        timeBeginPeriod(1);                     // set ticker to 1000 hz
        Sleep(128);                             // wait for it to settle
        QueryPerformanceCounter(&liStartTime);
    #else
        ctTimeStart = clock();
    #endif
    
        RadixSort(a, b, COUNT);
    
    #if QP
        QueryPerformanceCounter(&liStopTime);
        dStartTime = (double)liStartTime.QuadPart;
        dStopTime  = (double)liStopTime.QuadPart;
        dElapsedTime = (dStopTime - dStartTime) / dQPFrequency;
        timeEndPeriod(1);                       // restore ticker to default
        std::cout << "# of seconds " << dElapsedTime << std::endl;
    #else
        ctTimeStop = clock();
        std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
    #endif
    
        WaitForSingleObject(ht1, INFINITE); // wait for threads
        WaitForSingleObject(ht2, INFINITE);
        WaitForSingleObject(ht3, INFINITE);
        CloseHandle(ht1);                   // close threads
        CloseHandle(ht2);
        CloseHandle(ht3);
        CloseHandle(hs0);                   // close semaphores
        CloseHandle(hs1);
        CloseHandle(hs2);
        CloseHandle(hs3);
        
        for(i = 1; i < COUNT; i++){
            if(a[i] < a[i-1]){
                break;}}
        if(i == COUNT)
            std::cout << "passed" << std::endl;
        else
            std::cout << "failed" << std::endl;
    
        delete[] b;
        delete[] a;
        return 0;
    }