I'm trying to parallelize a merge-sort algorithm. What I'm doing is dividing the input array for each thread, then merging the threads results. The way I'm trying to merge the results is something like this:
thread 0 | thread 1 | thread 2 | thread 3
sort(A0) | sort(A1) | sort(A2) | sort(A3)
merge(A0,A1) | | merge(A2,A3) |
merge(A0A1, A2A3) | | |
So, at the end of my function sortManager
I call the function mergeThreadResults
that should implement the above logic. In it I iterate over pairs to merge the corresponding threads. Then, if needed, I merge the last items onto thread 0. It looks like this :
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(3); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
It appears to be working as intended. The problem is, I'm using a sleep
function to synchronize the threads, which is far from being the best approach. So I'm trying to implement a barrier with pthread.
In it I try to calculate how many iterations will be needed on that cycle and pass it as breakpoint
. When all the threads are at the same point I release the merge function and wait again in the new cycle. This is what I've tried:
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
But it's not working as intended. Some merge
triggers before the last cycle has fully ended, leaving me with a partially sorted array.
This is a minor example of my code for testing:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;
struct ThreadTask {
long rank;
int size;
int threads;
};
void merge(int arr[], int left, int mid, int right) {
/* Merge arrays */
int i, j, k;
int n1 = mid - left + 1;
int n2 = right - mid;
// Alocate temp arrays
int *L = malloc((n1 + 2) * sizeof(int));
int *R = malloc((n2 + 2) * sizeof(int));
if (L == NULL || R == NULL) {
fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
exit(EXIT_FAILURE);
}
// Populate temp arrays
for (i = 1; i <= n1; i++) {
L[i] = arr[left + i - 1];
}
for (j = 1; j <= n2; j++) {
R[j] = arr[mid + j];
}
L[n1 + 1] = INT_MAX;
R[n2 + 1] = INT_MAX;
i = 1;
j = 1;
// Merge arrays
for (k = left; k <= right; k++) {
if (L[i] <= R[j]) {
arr[k] = L[i];
i++;
} else {
arr[k] = R[j];
j++;
}
}
free(L);
free(R);
}
void mergeSort(int arr[], int left, int right) {
/* Sort and then merge arrays */
if (left < right) {
int mid = left + (right - left) / 2;
mergeSort(arr, left, mid);
mergeSort(arr, mid + 1, right);
merge(arr, left, mid, right);
}
}
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
// barrier
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(2); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
void *sortManager(void *threadInfo) {
/* Manage mergeSort between threads */
struct ThreadTask *currentTask = threadInfo;
// Get task arguments
long rank = currentTask->rank;
int left= rank * ((float)currentTask->size / (float)currentTask->threads);
int right = (rank + 1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
int mid = left + (right - left) / 2;
// Execute merge for task division
if (left < right) {
mergeSort(sortingArray, left, mid);
mergeSort(sortingArray, mid + 1, right);
merge(sortingArray, left, mid, right);
}
// Merge thread results
if (rank % 2 == 0) {
mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
}
return 0;
}
struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
/* Create threads with each task info */
struct ThreadTask *threadTask;
for (long thread = 0; thread < threads; thread++){
threadTask = &tasksHolder[thread];
threadTask->rank = thread;
threadTask->size = size;
threadTask->threads = threads;
pthread_create(&thread_handles[thread], NULL, sortManager, (void*) threadTask);
}
return tasksHolder;
}
void printArray(int arr[], int size) {
/* Print array */
for (int arrayIndex = 0; arrayIndex < size; arrayIndex++)
printf("%d ", arr[arrayIndex]);
printf("\n");
}
int main(int argc, char *argv[]) {
// Initialize arguments
int arraySize = 20;
int totalThreads = 16;
// Display input
printf("\nInput array:\n");
printArray(sortingArray, arraySize);
// Initialize threads
pthread_t *thread_handles;
thread_handles = malloc(totalThreads * sizeof(pthread_t));
// Create threads
struct ThreadTask threadTasksHolder[totalThreads];
*threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);
// Execute merge sort in each thread
for (long thread = 0; thread < totalThreads; thread++) {
pthread_join(thread_handles[thread], NULL);
}
free(thread_handles);
// Display output
printf("\nSorted array:\n");
printArray(sortingArray, arraySize);
return 0;
}
As @John Bollinger said, your approach looks unnecessarily difficult, and a solution would be equally complicated. But if you want to implement a barrier I would suggest you put it after the merge
in mergeThreadResults
. That way, you can wait for all the threads doing work on that cycle to finnish before moving to the next one.
To create this, you will need to pass a new barrier every iteration. Because at each cycle the number of threads doing the merge will decrease. So start declaring some global barriers:
int mergeCycleFlag = 0;
pthread_mutex_t mutex;
pthread_barrier_t *mergeBarrier;
The flag is used to create a barrier for each iteration, and we will need multiple mergeBarrier for each cycle. Don't forget to initialize it in your main
function, with the numbers of iteration you will do: mergeBarrier = realloc(mergeBarrier, howManyIterations);
Then we can create a barrier like this:
pthread_mutex_lock(&mutex);
if (mergeCycleFlag != iter) {
mergeCycleFlag = iter;
int mergesInLoop = threads % iter== 0 ? threads/iter: threads/iter+1;
pthread_barrier_init(&mergeBarrier[iter], NULL, mergesInLoop);
}
pthread_mutex_unlock(&mutex);
... MERGE ...
// Wait everyone finnish merging
pthread_barrier_wait (&mergeBarrier[iter]);
Note that I use a lock
to create the barrier, because we don't want two threads messing around here at the same time. If there is no barrier set for this iter
we create one with the number of threads that should work now. Also, I've changed your breakpoint
statement to fit the calculation of how many threads we expect to perform a merge
.
After some adjustment, this is what your mergeThreadResults
should look like:
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread, nextThreadRight;
int groupSize = 2;
while (groupSize <= threads) {
if (myRank % groupSize != 0) { // Release threads that no long perform merges
break;
}
nextThread = (myRank+1*groupSize) < threads ? (myRank+1*groupSize) : threads;
nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread-1);
// Init barrier with number of threads you will wait merging
pthread_mutex_lock(&mutex); // Just one thread can set the barrier
if (mergeCycleFlag != groupSize) {
mergeCycleFlag = groupSize;
int mergesInLoop = threads % groupSize == 0 ? threads/groupSize : threads/groupSize+1; // Calculate threads working in this step
pthread_barrier_init(&mergeBarrier[groupSize], NULL, mergesInLoop); // set barrier
}
pthread_mutex_unlock(&mutex);
// Merge thread group with neighbour group
merge(sortingArray, myLeft, myRight, nextThreadRight);
// Wait everyone finnish merging
pthread_barrier_wait (&mergeBarrier[groupSize]);
myRight = nextThreadRight;
groupSize = groupSize * 2;
}
// Merge thread 0
if (myRank == 0 && nextThread < threads-1) {
nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
Finally, for this solution to work you need that every thread has finnished their job before merging results. So you need to either call it after your join
in main
, or implement another barrier with all threads before your call to mergeThreadResults
on sortManager
.
Also, an even better approach would be for the threads to wait only for the other threads that they will merge. Like, thread 0 waits only for 1. Then for 2... and etc.