copenmpi

Better way than loop over MPI_Send/MPI_Recv if data not divisable my COMM_SIZE?


I use an MPI program for parallelize batches.

Imaging multidimensional MRI images (3 spatial dimensions, coil data, ...) and we have several of them aggregated on a batch dimension.

The program should apply the same task along the batch dimension.

My problem is now that I have nproc processes (COMM_SIZE) but my batch dimension is not divisible by nproc. MPI_Scatter, or more precisely MPI_Gather blocks the application.

My question: Is there an more elegant/handy way than doing an for loop with MPI_Send/MPI_Recv to overcome this problem?

I found this answer But IMHO it explains an different problem, since I can't change the amount of send/received data with MPI_Scatterv. Proof me wrong please :)

Thanks in advance!

In the example mpirun -n 2 tut2 1000 works great, but mpirun -n 3 tut2 1000 stucks before MPI_Gather. Taken and adopted from mpitutorial.com

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <mpi.h>
#include <assert.h>

// Creates an array of random numbers. Each number has a value from 0 - 1
double *create_rand_nums(int num_elements) {
  double *rand_nums = (double *)malloc(sizeof(double) * num_elements);
  assert(rand_nums != NULL);
  int i;
  for (i = 0; i < num_elements; i++) {
    rand_nums[i] = (rand() / (double)RAND_MAX);
  }
  return rand_nums;
}

// Computes the average of an array of numbers
double compute_avg(double *array, int num_elements) {
  double sum = 0.f;
  int i;
  for (i = 0; i < num_elements; i++) {
    sum += array[i];
  }
  return sum / num_elements;
}
double compute_sum(double *array, int num_elements) {
  double sum = 0.f;
  int i;
  for (i = 0; i < num_elements; i++) {
    sum += array[i];
  }
  return sum;
}

int main(int argc, char** argv) {
  MPI_Init(NULL, NULL);

  if (argc != 2) {
    fprintf(stderr, "Usage: avg num_elements_per_proc\n");
    exit(1);
  }

  int num_elements_per_proc = atoi(argv[1]);
  // Seed the random number generator to get different results each time
  srand(time(NULL));


  int world_rank;
  MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
  int world_size;
  MPI_Comm_size(MPI_COMM_WORLD, &world_size);

  char processor_name[MPI_MAX_PROCESSOR_NAME]; // gets the name of the processor
  int name_len;
  MPI_Get_processor_name(processor_name, &name_len);
  
  // Gather all partial averages down to the root process
  double *sub_avgs = NULL;
  if (world_rank == 0) {
    sub_avgs = (double *)malloc(sizeof(double) * (world_size + 2));
    assert(sub_avgs != NULL);
  }

  double *rand_nums = NULL;
  if (world_rank == 0) {
    rand_nums = create_rand_nums(num_elements_per_proc * (world_size + 2));
  }

  double original_data_avg = 0;
  for(int i = world_rank; i < (world_size + 2); i += world_size) {
    int off = 0;
    if (world_rank == 0)
      off = world_size;
    MPI_Bcast(&off, 1, MPI_INT, 0, MPI_COMM_WORLD);

    double *sub_rand_nums = (double *)malloc(sizeof(double) * num_elements_per_proc);
    assert(sub_rand_nums != NULL);

    MPI_Scatter(rand_nums, num_elements_per_proc, MPI_DOUBLE, sub_rand_nums,
                num_elements_per_proc, MPI_DOUBLE, 0, MPI_COMM_WORLD);

    // Compute the average of your subset
    double sub_avg = compute_sum(sub_rand_nums, num_elements_per_proc);

    printf("Sub avg %s (%d/%d): %f\n", processor_name, world_rank, world_size, sub_avg);

    //!!! Here the block appears !!!
    // Gather all partial averages down to the root process
    MPI_Gather(&sub_avg, 1, MPI_DOUBLE, sub_avgs, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);

    if (world_rank == 0) {
      double avg = compute_sum(sub_avgs, world_size)/(num_elements_per_proc * world_size);
      printf("Avg of all elements is %f\n", avg);
      // Compute the average across the original data for comparison
      original_data_avg +=
        compute_avg(rand_nums + off, num_elements_per_proc * world_size);
    }
    free(sub_rand_nums);
  }

  // Clean up
  if (world_rank == 0) {
    printf("Avg computed across original data is %f\n", original_data_avg);
    free(rand_nums);
    free(sub_avgs);
  }
  printf("proc %s (%d/%d) signes off\n", processor_name, world_rank, world_size);

  MPI_Barrier(MPI_COMM_WORLD);
  MPI_Finalize();
}

EDIT1 : fixed double definition of original_data_avg, thanks for pointing this out @Armali

EDIT2 : adds missing argument, thanks for pointing this out @Armali


Solution

  • And MPI_Scatterv allows an uneven distribution of batches?

    Well, if I'm not mistaken, that's its raison d'ĂȘtre. We can modify your program accordingly by replacing the MPI_Scatter call with

        int sendcounts[world_size];
        int recvcounts[world_size];
        int placements[world_size];
        int placementr[world_size];
        for (int n = 0; n < world_size; ++n)
            sendcounts[n] = i+n < world_size+2 ? num_elements_per_proc : 0,
            recvcounts[n] = i+n < world_size+2,
            placements[n] = num_elements_per_proc*n,
            placementr[n] = n;
        int employ = i < world_size+2;
        MPI_Scatterv(rand_nums ? rand_nums+num_elements_per_proc*i : NULL,
                     sendcounts, placements, MPI_DOUBLE,
                     sub_rand_nums, employ ? num_elements_per_proc : 0, MPI_DOUBLE,
                     0, MPI_COMM_WORLD);
    

    and the MPI_Gather call with

        MPI_Gatherv(&sub_avg, employ, MPI_DOUBLE,
                    sub_avgs, recvcounts, placementr, MPI_DOUBLE,
                    0, MPI_COMM_WORLD);
    

    - of course when using the resulting sub_avgs we have to take into account that the last loop cycle yields less than world_size results.