I am trying to implement a master-worker pattern in MPI, where we assign a master rank who keeps track of all the work to be done, and other ranks request work from it. However, in my case, I need the master to also do work (This is because each rank will get a GPU in the overall problem). To facilitate this, I am using multi-threading.
I have the following code
#include <array>
#include <atomic>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <mpi.h>
const int TAG_REQUEST = 0;
const int TAG_TASK = 1;
const int TAG_STOP = 2;
const int master_rank = 0;
std::mutex task_mutex;
std::atomic<bool> done(false);
using Task = std::array<int, 2>;
using TaskList = std::vector<Task>;
TaskList generate_tasks(int N) {
TaskList tasks;
for (int i = 0; i < N; i++) {
for (int j = i + 1; j < N; j++) {
tasks.emplace_back(Task{i, j});
}
}
return tasks;
}
int score2(int i) {
return i;
}
int score1(const Task &task) {
return task[0] * task[1];
}
int do_work(const Task &task, int rank) {
int result;
if (task[0] == task[1]) {
result = score2(task[0]);
} else {
result = score1(task) + rank;
}
return result;
}
void master_comm_thread(int N) {
int tasks_finished = 0, ranks_finished = 0;
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
std::srand(static_cast<unsigned>(std::time(nullptr)));
TaskList tasks = generate_tasks(N);
const int tasks_size = tasks.size();
Task task = {-1, -1};
while (true) {
MPI_Status status;
MPI_Probe(MPI_ANY_SOURCE, TAG_REQUEST, MPI_COMM_WORLD, &status);
MPI_Recv(task.data(), 2, MPI_INT, status.MPI_SOURCE, TAG_REQUEST,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
bool have_task = !tasks.empty();
{
std::lock_guard<std::mutex> lock(task_mutex);
if (!tasks.empty()) {
// int idx = std::rand() % tasks.size();
int idx = 0;
task = tasks[idx];
tasks.erase(tasks.begin() + idx);
have_task = true;
tasks_finished++;
}
}
if (have_task) {
MPI_Send(task.data(), 2, MPI_INT, status.MPI_SOURCE, TAG_TASK,
MPI_COMM_WORLD);
} else {
Task finish{-1, -1};
MPI_Send(&finish, 2, MPI_INT, status.MPI_SOURCE, TAG_STOP,
MPI_COMM_WORLD);
ranks_finished++;
if (tasks_finished >= tasks_size) {
done = true;
break;
}
}
}
while (ranks_finished < size) {
MPI_Status status;
MPI_Probe(MPI_ANY_SOURCE, TAG_REQUEST, MPI_COMM_WORLD, &status);
MPI_Recv(task.data(), 2, MPI_INT, status.MPI_SOURCE, TAG_REQUEST,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Send(task.data(), 2, MPI_INT, status.MPI_SOURCE, TAG_STOP,
MPI_COMM_WORLD);
ranks_finished++;
}
}
void data_loop(int rank, bool &active, Task &result) {
Task task = {-1, -1};
MPI_Status status;
// Post non-blocking send and receive for the next task
MPI_Send(result.data(), 2, MPI_INT, master_rank, TAG_REQUEST, MPI_COMM_WORLD);
MPI_Recv(task.data(), 2, MPI_INT, master_rank, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == TAG_STOP) {
result = {-1, -1};
active = false;
return;
}
std::cout<<rank<<"-"<<"("<<result[0]<<","<<result[1]<<"),("<<task[0]<<","<<task[1]<<")"<<std::endl;
result = task;
active = true;
return;
}
void worker_loop_async() {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
bool active = true;
// First task is pair-wise for my local data
Task task_current = {-1, -1};
Task task_next = {rank, rank};
while (active) {
task_current = task_next;
std::thread data_thread(data_loop, rank, std::ref(active),
std::ref(task_next));
int result = do_work(task_current, rank);
// Do something with result
data_thread.join();
}
}
int main(int argc, char *argv[]) {
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
if (provided < MPI_THREAD_MULTIPLE) {
std::cerr << "MPI threading not sufficient: " << provided << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int N = 5;
std::thread comm_thread;
if (rank == master_rank) {
comm_thread = std::thread(master_comm_thread, N);
}
worker_loop_async();
if (rank == master_rank) {
comm_thread.join();
}
MPI_Finalize();
return 0;
}
However, if you run it, you will notice that the master rank does unecessary work that doesnt make sense. Since its in different thread, it should be able to MPI_Send/Receive to itself, and if not error out, but in this case I get a wierd bug, where the same task is repeated multiple times. Since the bug happens randomly, it appears to be race condition, but I cant point to what exactly is causing it. Help would be greatly appreciated.
I am running this on OpenMPI 4.1.6 with gcc 13.3 on my local machine.
Edit: To explain the bug, when this code is run, the expected output should be
1-(1,1),(0,1)
2-(2,2),(0,2)
3-(3,3),(0,3)
3-(0,3),(3,4)
4-(4,4),(0,4)
5-(5,5),(1,2)
6-(6,6),(1,3)
7-(7,7),(1,4)
0-(0,0),(2,3)
1-(0,1),(2,4)
but I get something like
0-(0,0),(0,0)
1-(1,1),(0,1)
2-(2,2),(0,2)
3-(3,3),(0,3)
3-(0,3),(3,4)
4-(4,4),(0,4)
5-(5,5),(1,2)
6-(6,6),(1,3)
7-(7,7),(1,4)
0-(0,0),(2,3)
1-(0,1),(2,4)
0-(2,3),(2,3)
Notice how the master rank (0) gets assigned duplicate work that is not even part of the work queue. I feel like this is some race conditon weird bug that makes no sense, but it could also be my misunderstanding of MPI itself
So after some debugging the issue is with these 2 lines
MPI_Send(result.data(), 2, MPI_INT, master_rank, TAG_REQUEST, MPI_COMM_WORLD);
MPI_Recv(task.data(), 2, MPI_INT, master_rank, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
Because the second tag is MPI_ANY_TAG, the send for master rank is received by the second line. This creates the wierd behaviour I noted. A refactor of the code fixes this.