cparallel-processingpthreadspthread-barriers

Implement barrier with pthreads on C


I'm trying to parallelize a merge-sort algorithm. What I'm doing is dividing the input array for each thread, then merging the threads results. The way I'm trying to merge the results is something like this:

thread 0                     |   thread 1        |   thread 2         |   thread 3

sort(A0)                     |   sort(A1)        |   sort(A2)         | sort(A3)
merge(A0,A1)                 |                   |   merge(A2,A3)     | 
merge(A0A1, A2A3)            |                   |                    |

So, at the end of my function sortManager I call the function mergeThreadResults that should implement the above logic. In it I iterate over pairs to merge the corresponding threads. Then, if needed, I merge the last items onto thread 0. It looks like this :

void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {

    int nextThread;
    int iter = 2;
    while (iter <= threads) {
        int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
        int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;

        printf("Merging threads %ld to %d\n", myRank, nextThread);
        
        if (myRank % iter != 0) {
            break;
        }

        merge(sortingArray, myLeft, myRight, nextThreadRight);
        sleep(3); // <- sleep

        myRight = nextThreadRight;
        iter = iter * 2;
    }

     if (myRank == 0 && nextThread < threads-1) {
        int nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
     }

}

It appears to be working as intended. The problem is, I'm using a sleep function to synchronize the threads, which is far from being the best approach. So I'm trying to implement a barrier with pthread.
In it I try to calculate how many iterations will be needed on that cycle and pass it as breakpoint. When all the threads are at the same point I release the merge function and wait again in the new cycle. This is what I've tried:

        pthread_mutex_lock(&mutex);
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(&cond_var);
        } else {
            while (pthread_cond_wait(&cond_var, &mutex) != 0);
        }
        pthread_mutex_unlock(&mutex);

But it's not working as intended. Some merge triggers before the last cycle has fully ended, leaving me with a partially sorted array.

This is a minor example of my code for testing:

#define _GNU_SOURCE

#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>

#include <pthread.h>
#include <unistd.h>

// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;

struct ThreadTask {
    long rank;
    int size;
    int threads;
};

void merge(int arr[], int left, int mid, int right) {
    /* Merge arrays */

    int i, j, k;
    int n1 = mid - left + 1;
    int n2 = right - mid;

    // Alocate temp arrays
    int *L = malloc((n1 + 2) * sizeof(int));
    int *R = malloc((n2 + 2) * sizeof(int));
    if (L == NULL || R == NULL) {
        fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
        exit(EXIT_FAILURE);
    }

    // Populate temp arrays
    for (i = 1; i <= n1; i++) {
        L[i] = arr[left + i - 1];
    }
    for (j = 1; j <= n2; j++) {
        R[j] = arr[mid + j];
    }

    L[n1 + 1] = INT_MAX;
    R[n2 + 1] = INT_MAX;
    i = 1;
    j = 1;

    // Merge arrays
    for (k = left; k <= right; k++) {
        if (L[i] <= R[j]) {
            arr[k] = L[i];
            i++;
        } else {
            arr[k] = R[j];
            j++;
        }
    }

    free(L);
    free(R);
}


void mergeSort(int arr[], int left, int right) {
    /* Sort and then merge arrays */

    if (left < right) {
        int mid = left + (right - left) / 2;

        mergeSort(arr, left, mid);
        mergeSort(arr, mid + 1, right);

        merge(arr, left, mid, right);
    }
}


void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {

    int nextThread;
    int iter = 2;
    while (iter <= threads) {
        int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
        int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;

        printf("Merging threads %ld to %d\n", myRank, nextThread);
        
        if (myRank % iter != 0) {
            break;
        }

        // barrier
        pthread_mutex_lock(&mutex);
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(&cond_var);
        } else {
            while (pthread_cond_wait(&cond_var, &mutex) != 0);
        }
        pthread_mutex_unlock(&mutex);

        merge(sortingArray, myLeft, myRight, nextThreadRight);
        sleep(2); // <- sleep

        myRight = nextThreadRight;
        iter = iter * 2;
    }

     if (myRank == 0 && nextThread < threads-1) {
        int nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
     }

}

void *sortManager(void *threadInfo) {
    /* Manage mergeSort between threads */

    struct ThreadTask *currentTask = threadInfo;

    // Get task arguments
    long rank = currentTask->rank;
    int left= rank * ((float)currentTask->size / (float)currentTask->threads);
    int right = (rank + 1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
    int mid = left + (right - left) / 2;

    // Execute merge for task division
    if (left < right) {
        mergeSort(sortingArray, left, mid);
        mergeSort(sortingArray, mid + 1, right);
        merge(sortingArray, left, mid, right);
    }

    // Merge thread results
    if (rank % 2 == 0)  {
        mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
    }

    return 0;
}


struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
    /* Create threads with each task info */

    struct ThreadTask *threadTask;

    for (long thread = 0; thread < threads; thread++){
        threadTask = &tasksHolder[thread];
        threadTask->rank = thread;
        threadTask->size = size;
        threadTask->threads = threads;

        pthread_create(&thread_handles[thread], NULL, sortManager, (void*) threadTask);
    }

    return tasksHolder;
}


void printArray(int arr[], int size) {
    /* Print array */

    for (int arrayIndex = 0; arrayIndex < size; arrayIndex++)
        printf("%d ", arr[arrayIndex]);
    printf("\n");
}


int main(int argc, char *argv[]) {

    // Initialize arguments
    int arraySize = 20;
    int totalThreads = 16;

    
    // Display input
    printf("\nInput array:\n");
    printArray(sortingArray, arraySize);
    

    // Initialize threads
    pthread_t *thread_handles;
    thread_handles = malloc(totalThreads * sizeof(pthread_t));

    // Create threads
    struct ThreadTask threadTasksHolder[totalThreads];
    *threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);
    
    // Execute merge sort in each thread
    for (long thread = 0; thread < totalThreads; thread++) {
        pthread_join(thread_handles[thread], NULL);
    }
    free(thread_handles);
    

    // Display output
    printf("\nSorted array:\n");
    printArray(sortingArray, arraySize);
    
    return 0;
}

Solution

  • As @John Bollinger said, your approach looks unnecessarily difficult, and a solution would be equally complicated. But if you want to implement a barrier I would suggest you put it after the merge in mergeThreadResults. That way, you can wait for all the threads doing work on that cycle to finnish before moving to the next one.

    To create this, you will need to pass a new barrier every iteration. Because at each cycle the number of threads doing the merge will decrease. So start declaring some global barriers:

    int mergeCycleFlag = 0;
    pthread_mutex_t mutex;
    pthread_barrier_t *mergeBarrier;
    

    The flag is used to create a barrier for each iteration, and we will need multiple mergeBarrier for each cycle. Don't forget to initialize it in your main function, with the numbers of iteration you will do: mergeBarrier = realloc(mergeBarrier, howManyIterations);

    Then we can create a barrier like this:

            pthread_mutex_lock(&mutex);
            if (mergeCycleFlag != iter) { 
                mergeCycleFlag = iter;
                int mergesInLoop = threads % iter== 0 ? threads/iter: threads/iter+1;
                pthread_barrier_init(&mergeBarrier[iter], NULL, mergesInLoop);
            }
            pthread_mutex_unlock(&mutex);
    
            ... MERGE ...
    
            // Wait everyone finnish merging
            pthread_barrier_wait (&mergeBarrier[iter]);
    

    Note that I use a lock to create the barrier, because we don't want two threads messing around here at the same time. If there is no barrier set for this iter we create one with the number of threads that should work now. Also, I've changed your breakpoint statement to fit the calculation of how many threads we expect to perform a merge.

    After some adjustment, this is what your mergeThreadResults should look like:

    void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
        
        int nextThread, nextThreadRight;
        int groupSize = 2;
    
        while (groupSize <= threads) {
            if (myRank % groupSize != 0) { // Release threads that no long perform merges
                break;
            }
    
            nextThread = (myRank+1*groupSize) < threads ? (myRank+1*groupSize) : threads;
            nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
     
            printf("Merging threads %ld to %d\n", myRank, nextThread-1);
    
            // Init barrier with number of threads you will wait merging 
            pthread_mutex_lock(&mutex);  // Just one thread can set the barrier
            if (mergeCycleFlag != groupSize) { 
                mergeCycleFlag = groupSize;
                int mergesInLoop = threads % groupSize == 0 ? threads/groupSize : threads/groupSize+1; // Calculate threads working in this step
                pthread_barrier_init(&mergeBarrier[groupSize], NULL, mergesInLoop);  // set barrier
            }
            pthread_mutex_unlock(&mutex);
    
            // Merge thread group with neighbour group
            merge(sortingArray, myLeft, myRight, nextThreadRight);
    
            // Wait everyone finnish merging
            pthread_barrier_wait (&mergeBarrier[groupSize]);
    
            myRight = nextThreadRight;
            groupSize = groupSize * 2;
        }
    
        // Merge thread 0
        if (myRank == 0 && nextThread < threads-1) {
            nextThreadRight = threads * ((float)size / (float)threads) - 1;
            merge(sortingArray, myLeft, myRight, nextThreadRight);
        }
    }
    

    Finally, for this solution to work you need that every thread has finnished their job before merging results. So you need to either call it after your join in main, or implement another barrier with all threads before your call to mergeThreadResults on sortManager.

    Also, an even better approach would be for the threads to wait only for the other threads that they will merge. Like, thread 0 waits only for 1. Then for 2... and etc.