I´m having trouble understanding why my implementation is not faster than on a single thread.
I´m trying to implement a bitwise MSB-radixsort(which is working for now) with a certain number (n) of threads. In my approach I create a new thread up until log2(n) within the recursion. But this seems to just increase the time cause of the creation of the thread. I`m guessing that the created threads can not work at the same time on the array, even with the splitted responsibility within it.
If someone could explain it I would really appreciate it. Thanks!!
UPDATE ::
As it turns out I was missing the most obvious Solution as Ulrich commented. I was using a VM where it was locked to one CPU, which I did not know it did. After changing it I got the desired speedup for the radixsort.
Thank you all for the comments and general info on that topic. ^^
The maximum number of threads should be limited to the number of cores on your system, or two times the number of cores if each core has hyper-threading. You've since updated your question to note that you only had one core using a virtual machine, and the multi-threaded version is faster with multiple cores.
Other potential issues: if the distribution of the data is not uniform, then splitting into bins will result in uneven sized bins. Using a base larger than 2, such as 256, will reduce this effect somewhat. On the initial pass, there can be cache line conflicts, where each write to a cache line on one core will invalidate all instances of that cache line on other cores.
Example code that uses a single thread to do a MSD first radix sort base 256, to create 256 bins, which are then sorted using LSD first radix sort, using 4 threads. This example uses Windows specific thread API. This relies on having reasonably uniform data so that the 256 bins are somewhat equal in size. I didn't try to use multi-threading for the initial MSD pass used to create the 256 bins, as one potential issue is cache line conflicts on the array of index updates, which would need an atomic increment operation, which may limit the performance benefit.
#include <iostream>
#include <windows.h>
#define COUNT (6000*6000) // number of values to sort
#define QP 1 // if != 0, use queryperformance for timer
#if QP
#include <math.h>
#pragma comment(lib, "winmm.lib")
typedef LARGE_INTEGER LI64;
#else
#include <ctime>
#endif
#if QP
static LI64 liQPFrequency; // cpu counter values
static LI64 liStartTime;
static LI64 liStopTime;
static double dQPFrequency;
static double dStartTime;
static double dStopTime;
static double dElapsedTime;
#else
clock_t ctTimeStart; // clock values
clock_t ctTimeStop;
#endif
typedef unsigned int uint32_t;
typedef unsigned long long uint64_t;
static HANDLE hs0; // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;
static HANDLE ht1; // thread handles
static HANDLE ht2;
static HANDLE ht3;
static uint64_t *pa, *pb; // ptrs to arrays
static uint32_t aIndex[260]; // start + end indexes, 256 bins
static uint32_t aIi; // current index for 4 bins
static DWORD WINAPI Thread0(LPVOID lpvoid);
void RadixSort(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t i,m,n;
uint64_t u;
for(i = 0; i < count; i++) // generate histogram
aIndex[(size_t)(a[i] >> 56)]++;
m = 0; // convert to indexes
for(i = 0; i < 257; i++){ // including end of bin[255]
n = aIndex[i];
aIndex[i] = m;
m += n;
}
for(i = 0; i < count; i++){ // sort by msb
u = a[i];
m = (uint32_t)(u >> 56);
b[aIndex[m]++] = u;
}
for(i = 256; i; i--) // restore aIndex
aIndex[i] = aIndex[i-1];
aIndex[0] = 0;
for(aIi = 0; aIi < 256; aIi += 4){
ReleaseSemaphore(hs1, 1, NULL); // start threads
ReleaseSemaphore(hs2, 1, NULL);
ReleaseSemaphore(hs3, 1, NULL);
Thread0(NULL);
WaitForSingleObject(hs0, INFINITE); // wait for threads done
WaitForSingleObject(hs0, INFINITE);
WaitForSingleObject(hs0, INFINITE);
}
}
void RadixSort7(uint64_t * a, uint64_t *b, size_t count)
{
uint32_t mIndex[7][256] = {0}; // count / index matrix
uint32_t i,j,m,n;
uint64_t u;
for(i = 0; i < count; i++){ // generate histograms
u = a[i];
for(j = 0; j < 7; j++){
mIndex[j][(size_t)(u & 0xff)]++;
u >>= 8;
}
}
for(j = 0; j < 7; j++){ // convert to indices
m = 0;
for(i = 0; i < 256; i++){
n = mIndex[j][i];
mIndex[j][i] = m;
m += n;
}
}
for(j = 0; j < 7; j++){ // radix sort
for(i = 0; i < count; i++){ // sort by current lsb
u = a[i];
m = (size_t)(u>>(j<<3))&0xff;
b[mIndex[j][m]++] = u;
}
std::swap(a, b); // swap ptrs
}
}
static DWORD WINAPI Thread0(LPVOID lpvoid)
{
RadixSort7(pb + aIndex[aIi+0], pa + aIndex[aIi+0], aIndex[aIi+1]-aIndex[aIi+0]);
return 0;
}
static DWORD WINAPI Thread1(LPVOID lpvoid)
{
while(1){
WaitForSingleObject(hs1, INFINITE); // wait for semaphore
RadixSort7(pb + aIndex[aIi+1], pa + aIndex[aIi+1], aIndex[aIi+2]-aIndex[aIi+1]);
ReleaseSemaphore(hs0, 1, NULL); // indicate done
if(aIi == 252) // exit if all bins done
return 0;
}
}
static DWORD WINAPI Thread2(LPVOID lpvoid)
{
while(1){
WaitForSingleObject(hs2, INFINITE); // wait for semaphore
RadixSort7(pb + aIndex[aIi+2], pa + aIndex[aIi+2], aIndex[aIi+3]-aIndex[aIi+2]);
ReleaseSemaphore(hs0, 1, NULL); // indicate done
if(aIi == 252) // exit if all bins done
return 0;
}
}
static DWORD WINAPI Thread3(LPVOID lpvoid)
{
while(1){
WaitForSingleObject(hs3, INFINITE); // wait for semaphore
RadixSort7(pb + aIndex[aIi+3], pa + aIndex[aIi+3], aIndex[aIi+4]-aIndex[aIi+3]);
ReleaseSemaphore(hs0, 1, NULL); // indicate done
if(aIi == 252) // exit if all bins done
return 0;
}
}
uint64_t rnd64() // random 64 bit integer
{
static uint64_t r = 1ull;
r = r * 6364136223846793005ull + 1442695040888963407ull;
return r;
}
int main( )
{
uint64_t * a = new uint64_t [COUNT]; // allocate data array
uint64_t * b = new uint64_t [COUNT]; // allocate temp array
size_t i;
pa = a; // set global ptrs
pb = b;
hs0 = CreateSemaphore(NULL,0,7,NULL); // create semaphores
hs1 = CreateSemaphore(NULL,0,1,NULL);
hs2 = CreateSemaphore(NULL,0,1,NULL);
hs3 = CreateSemaphore(NULL,0,1,NULL);
ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0); // create threads
ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);
for(i = 0; i < COUNT; i++){ // generate data
a[i] = rnd64();
}
#if QP
QueryPerformanceFrequency(&liQPFrequency);
dQPFrequency = (double)liQPFrequency.QuadPart;
timeBeginPeriod(1); // set ticker to 1000 hz
Sleep(128); // wait for it to settle
QueryPerformanceCounter(&liStartTime);
#else
ctTimeStart = clock();
#endif
RadixSort(a, b, COUNT);
#if QP
QueryPerformanceCounter(&liStopTime);
dStartTime = (double)liStartTime.QuadPart;
dStopTime = (double)liStopTime.QuadPart;
dElapsedTime = (dStopTime - dStartTime) / dQPFrequency;
timeEndPeriod(1); // restore ticker to default
std::cout << "# of seconds " << dElapsedTime << std::endl;
#else
ctTimeStop = clock();
std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
#endif
WaitForSingleObject(ht1, INFINITE); // wait for threads
WaitForSingleObject(ht2, INFINITE);
WaitForSingleObject(ht3, INFINITE);
CloseHandle(ht1); // close threads
CloseHandle(ht2);
CloseHandle(ht3);
CloseHandle(hs0); // close semaphores
CloseHandle(hs1);
CloseHandle(hs2);
CloseHandle(hs3);
for(i = 1; i < COUNT; i++){
if(a[i] < a[i-1]){
break;}}
if(i == COUNT)
std::cout << "passed" << std::endl;
else
std::cout << "failed" << std::endl;
delete[] b;
delete[] a;
return 0;
}