I use an MPI program for parallelize batches.
Imaging multidimensional MRI images (3 spatial dimensions, coil data, ...) and we have several of them aggregated on a batch dimension.
The program should apply the same task along the batch dimension.
My problem is now that I have nproc
processes (COMM_SIZE) but my batch dimension is not divisible by nproc
.
MPI_Scatter, or more precisely MPI_Gather blocks the application.
My question: Is there an more elegant/handy way than doing an for loop with MPI_Send/MPI_Recv to overcome this problem?
I found this answer But IMHO it explains an different problem, since I can't change the amount of send/received data with MPI_Scatterv. Proof me wrong please :)
Thanks in advance!
In the example mpirun -n 2 tut2 1000
works great, but mpirun -n 3 tut2 1000
stucks before MPI_Gather.
Taken and adopted from mpitutorial.com
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <mpi.h>
#include <assert.h>
// Creates an array of random numbers. Each number has a value from 0 - 1
double *create_rand_nums(int num_elements) {
double *rand_nums = (double *)malloc(sizeof(double) * num_elements);
assert(rand_nums != NULL);
int i;
for (i = 0; i < num_elements; i++) {
rand_nums[i] = (rand() / (double)RAND_MAX);
}
return rand_nums;
}
// Computes the average of an array of numbers
double compute_avg(double *array, int num_elements) {
double sum = 0.f;
int i;
for (i = 0; i < num_elements; i++) {
sum += array[i];
}
return sum / num_elements;
}
double compute_sum(double *array, int num_elements) {
double sum = 0.f;
int i;
for (i = 0; i < num_elements; i++) {
sum += array[i];
}
return sum;
}
int main(int argc, char** argv) {
MPI_Init(NULL, NULL);
if (argc != 2) {
fprintf(stderr, "Usage: avg num_elements_per_proc\n");
exit(1);
}
int num_elements_per_proc = atoi(argv[1]);
// Seed the random number generator to get different results each time
srand(time(NULL));
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
char processor_name[MPI_MAX_PROCESSOR_NAME]; // gets the name of the processor
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
// Gather all partial averages down to the root process
double *sub_avgs = NULL;
if (world_rank == 0) {
sub_avgs = (double *)malloc(sizeof(double) * (world_size + 2));
assert(sub_avgs != NULL);
}
double *rand_nums = NULL;
if (world_rank == 0) {
rand_nums = create_rand_nums(num_elements_per_proc * (world_size + 2));
}
double original_data_avg = 0;
for(int i = world_rank; i < (world_size + 2); i += world_size) {
int off = 0;
if (world_rank == 0)
off = world_size;
MPI_Bcast(&off, 1, MPI_INT, 0, MPI_COMM_WORLD);
double *sub_rand_nums = (double *)malloc(sizeof(double) * num_elements_per_proc);
assert(sub_rand_nums != NULL);
MPI_Scatter(rand_nums, num_elements_per_proc, MPI_DOUBLE, sub_rand_nums,
num_elements_per_proc, MPI_DOUBLE, 0, MPI_COMM_WORLD);
// Compute the average of your subset
double sub_avg = compute_sum(sub_rand_nums, num_elements_per_proc);
printf("Sub avg %s (%d/%d): %f\n", processor_name, world_rank, world_size, sub_avg);
//!!! Here the block appears !!!
// Gather all partial averages down to the root process
MPI_Gather(&sub_avg, 1, MPI_DOUBLE, sub_avgs, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
if (world_rank == 0) {
double avg = compute_sum(sub_avgs, world_size)/(num_elements_per_proc * world_size);
printf("Avg of all elements is %f\n", avg);
// Compute the average across the original data for comparison
original_data_avg +=
compute_avg(rand_nums + off, num_elements_per_proc * world_size);
}
free(sub_rand_nums);
}
// Clean up
if (world_rank == 0) {
printf("Avg computed across original data is %f\n", original_data_avg);
free(rand_nums);
free(sub_avgs);
}
printf("proc %s (%d/%d) signes off\n", processor_name, world_rank, world_size);
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
}
EDIT1 : fixed double definition of original_data_avg
, thanks for pointing this out @Armali
EDIT2 : adds missing argument, thanks for pointing this out @Armali
And
MPI_Scatterv
allows an uneven distribution of batches?
Well, if I'm not mistaken, that's its raison d'ĂȘtre. We can modify your program accordingly by replacing the MPI_Scatter
call with
int sendcounts[world_size];
int recvcounts[world_size];
int placements[world_size];
int placementr[world_size];
for (int n = 0; n < world_size; ++n)
sendcounts[n] = i+n < world_size+2 ? num_elements_per_proc : 0,
recvcounts[n] = i+n < world_size+2,
placements[n] = num_elements_per_proc*n,
placementr[n] = n;
int employ = i < world_size+2;
MPI_Scatterv(rand_nums ? rand_nums+num_elements_per_proc*i : NULL,
sendcounts, placements, MPI_DOUBLE,
sub_rand_nums, employ ? num_elements_per_proc : 0, MPI_DOUBLE,
0, MPI_COMM_WORLD);
and the MPI_Gather
call with
MPI_Gatherv(&sub_avg, employ, MPI_DOUBLE,
sub_avgs, recvcounts, placementr, MPI_DOUBLE,
0, MPI_COMM_WORLD);
- of course when using the resulting sub_avgs we have to take into account that the last loop cycle yields less than world_size results.