multithreadingc++17thread-safetympirace-condition

Race condition in MPI code when rank sends to/receives from itself


I am trying to implement a master-worker pattern in MPI, where we assign a master rank who keeps track of all the work to be done, and other ranks request work from it. However, in my case, I need the master to also do work (This is because each rank will get a GPU in the overall problem). To facilitate this, I am using multi-threading.

I have the following code

#include <array>
#include <atomic>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

#include <mpi.h>

const int TAG_REQUEST = 0;
const int TAG_TASK = 1;
const int TAG_STOP = 2;

const int master_rank = 0;

std::mutex task_mutex;
std::atomic<bool> done(false);

using Task = std::array<int, 2>;
using TaskList = std::vector<Task>;

TaskList generate_tasks(int N) {
  TaskList tasks;
  for (int i = 0; i < N; i++) {
    for (int j = i + 1; j < N; j++) {
      tasks.emplace_back(Task{i, j});
    }
  }
  return tasks;
}

int score2(int i) {
  return i;
}

int score1(const Task &task) {
  return task[0] * task[1];
}

int do_work(const Task &task, int rank) {
  int result;
  if (task[0] == task[1]) {
    result = score2(task[0]);
  } else {
    result = score1(task) + rank;
  }

  return result;
}

void master_comm_thread(int N) {
  int tasks_finished = 0, ranks_finished = 0;
  int size;
  MPI_Comm_size(MPI_COMM_WORLD, &size);
  std::srand(static_cast<unsigned>(std::time(nullptr)));
  TaskList tasks = generate_tasks(N);
  const int tasks_size = tasks.size();
  Task task = {-1, -1};
  while (true) {
    MPI_Status status;
    MPI_Probe(MPI_ANY_SOURCE, TAG_REQUEST, MPI_COMM_WORLD, &status);

    MPI_Recv(task.data(), 2, MPI_INT, status.MPI_SOURCE, TAG_REQUEST,
             MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    bool have_task = !tasks.empty();
    {
      std::lock_guard<std::mutex> lock(task_mutex);
      if (!tasks.empty()) {
        // int idx = std::rand() % tasks.size();
        int idx = 0;
        task = tasks[idx];
        tasks.erase(tasks.begin() + idx);
        have_task = true;
        tasks_finished++;
      }
    }

    if (have_task) {
      MPI_Send(task.data(), 2, MPI_INT, status.MPI_SOURCE, TAG_TASK,
               MPI_COMM_WORLD);
    } else {
      Task finish{-1, -1};
      MPI_Send(&finish, 2, MPI_INT, status.MPI_SOURCE, TAG_STOP,
               MPI_COMM_WORLD);
      ranks_finished++;
      if (tasks_finished >= tasks_size) {
        done = true;
        break;
      }
    }
  }
  while (ranks_finished < size) {
    MPI_Status status;
    MPI_Probe(MPI_ANY_SOURCE, TAG_REQUEST, MPI_COMM_WORLD, &status);
    MPI_Recv(task.data(), 2, MPI_INT, status.MPI_SOURCE, TAG_REQUEST,
             MPI_COMM_WORLD, MPI_STATUS_IGNORE);

    MPI_Send(task.data(), 2, MPI_INT, status.MPI_SOURCE, TAG_STOP,
             MPI_COMM_WORLD);
    ranks_finished++;
  }
}

void data_loop(int rank, bool &active, Task &result) {
  Task task = {-1, -1};
  MPI_Status status;

  // Post non-blocking send and receive for the next task
  MPI_Send(result.data(), 2, MPI_INT, master_rank, TAG_REQUEST, MPI_COMM_WORLD);
  MPI_Recv(task.data(), 2, MPI_INT, master_rank, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
  if (status.MPI_TAG == TAG_STOP) {
    result = {-1, -1};
    active = false;
    return;
  }

  std::cout<<rank<<"-"<<"("<<result[0]<<","<<result[1]<<"),("<<task[0]<<","<<task[1]<<")"<<std::endl;

  result = task;
  active = true;
  return;
}

void worker_loop_async() {
  int rank;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);

  bool active = true;
  // First task is pair-wise for my local data
  Task task_current = {-1, -1};
  Task task_next = {rank, rank};
  while (active) {
    task_current = task_next;
    std::thread data_thread(data_loop, rank, std::ref(active),
                            std::ref(task_next));
    int result = do_work(task_current, rank);
    // Do something with result
    data_thread.join();
  }
}

int main(int argc, char *argv[]) {
  int provided;
  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
  if (provided < MPI_THREAD_MULTIPLE) {
    std::cerr << "MPI threading not sufficient: " << provided << std::endl;
    MPI_Abort(MPI_COMM_WORLD, 1);
  }

  int rank;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);

  int N = 5;
  std::thread comm_thread;
  if (rank == master_rank) {
    comm_thread = std::thread(master_comm_thread, N);
  }

  worker_loop_async();

  if (rank == master_rank) {
    comm_thread.join();
  }

  MPI_Finalize();
  return 0;
}

However, if you run it, you will notice that the master rank does unecessary work that doesnt make sense. Since its in different thread, it should be able to MPI_Send/Receive to itself, and if not error out, but in this case I get a wierd bug, where the same task is repeated multiple times. Since the bug happens randomly, it appears to be race condition, but I cant point to what exactly is causing it. Help would be greatly appreciated.

I am running this on OpenMPI 4.1.6 with gcc 13.3 on my local machine.

Edit: To explain the bug, when this code is run, the expected output should be

1-(1,1),(0,1)
2-(2,2),(0,2)
3-(3,3),(0,3)
3-(0,3),(3,4)
4-(4,4),(0,4)
5-(5,5),(1,2)
6-(6,6),(1,3)
7-(7,7),(1,4)
0-(0,0),(2,3)
1-(0,1),(2,4)

but I get something like

0-(0,0),(0,0)
1-(1,1),(0,1)
2-(2,2),(0,2)
3-(3,3),(0,3)
3-(0,3),(3,4)
4-(4,4),(0,4)
5-(5,5),(1,2)
6-(6,6),(1,3)
7-(7,7),(1,4)
0-(0,0),(2,3)
1-(0,1),(2,4)
0-(2,3),(2,3)

Notice how the master rank (0) gets assigned duplicate work that is not even part of the work queue. I feel like this is some race conditon weird bug that makes no sense, but it could also be my misunderstanding of MPI itself


Solution

  • So after some debugging the issue is with these 2 lines

    MPI_Send(result.data(), 2, MPI_INT, master_rank, TAG_REQUEST, MPI_COMM_WORLD);  
    
    MPI_Recv(task.data(), 2, MPI_INT, master_rank, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    

    Because the second tag is MPI_ANY_TAG, the send for master rank is received by the second line. This creates the wierd behaviour I noted. A refactor of the code fixes this.