I'm trying to build an MPI program that sends two types of messages from an undermined number of processors to process A using nonblocking send (the program is randomized, and I am using tags to demarcate which message is of which type. The first type of message is used to send a vector of data, and the second type is used to tell the process no more messages are going to be sent. The code looks like this:
void send_data() {
/*... initialization of variables ...*/
MPI_Isend( loc_buffer1.data(), //Address of the message we are sending.
bufferlen, //Number of elements handled by that address.
MPI_INT, //MPI_TYPE of the message we are sending.
new_proc, //Rank of receiving process
1, //Message Tag
MPI_COMM_WORLD, //MPI Communicator
&request1 ); }
}
void check_for_criteria() {
if (criteria_met) { send_data(); }
// Telling the target process there is no more data to be sent
MPI_Isend( NULL, //Address of the message we are sending.
0, //Number of elements handled by that address.
MPI_CHAR, //MPI_TYPE of the message we are sending.
new_proc, //Rank of receiving process
2, //Message Tag
MPI_COMM_WORLD, //MPI Communicator
&request1 ); }
}
void receive_parallel_comm_helper(...) {
int test_flag1 = 0;
int test_flag2 = 0;
MPI_Status status1;
MPI_Status status2;
std::vector<int> new_loc_buffer1(9);
std::vector<bool> stop_par_Array(num_procs, 0);
stop_par_Array[rank] = 1;
bool stop_par = 0;
while ((!stop_par)) {
for (int i=0; i<num_procs; i++) {
if (i != rank) {
stop_par = 1;
status1.MPI_TAG = 0;
status2.MPI_TAG = 0;
test_flag1 = 0;
test_flag2 = 0;
MPI_Iprobe(i, 1, MPI_COMM_WORLD, &test_flag1, &status1);
if ((status1.MPI_TAG == 1) (test_flag1)) {
MPI_Recv(new_loc_buffer1.data(), 9, MPI_INT, i, 1, MPI_COMM_WORLD, &status1);
}
test_flag2 = 0;
MPI_Iprobe(MPI_ANY_SOURCE, 2, MPI_COMM_WORLD, &test_flag2, &status2);
if ((status2.MPI_TAG == 2) && (test_flag2)) {
MPI_Recv(NULL, 0, MPI_CHAR, status2.MPI_SOURCE, par_done_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
stop_par_Array[status2.MPI_SOURCE] = 1;
}
for (int j = 0; j < num_procs; j++) {
stop_par &= stop_par_Array[j];
}
}
}
}
MPI_Barrier(MPI_COMM_WORLD);
}
void main_function() {
while (some_criteria) {
...
check_for_criteria();
MPI_Barrier(MPI_COMM_WORLD);
receive_parallel_comm_helper();
...
}
}
Essentially, when main_function() is called, each process checks some criteria to see if it should send data to another process. Whether data is sent or not, a null message is then sent to indicate that the sending process is complete. Then, each process enters the receive_parallel_comm_helper() function, and a while loop spins while waiting to receive: 1. data and 2. a null message indicating the data sending from that processor is complete. However, I am currently not getting the desired behavior - data is often sent but received in a later iteration of the main_function() while loop, as the null message is for some reason received first. I know that message sequentiality is guaranteed by MPI, but this code relies on the assumption that message sending is sequential for messages with different tags - is this not the case? If not, how could this code be corrected?
Even if MPI does maintain sequential queuing of different tags, you have a race condition because you do separate Iprobe
calls for tags 1 and 2:
time | sender | receiver |
---|---|---|
1 | Loop 0: Iprobe tag 1 - no | |
2 | send tag 1 | |
3 | send tag 2 | |
4 | Loop 0: Iprobe tag 2 - yes | |
5 | Loop 0: recv tag 2 | |
6 | Loop 1: Iprobe tag 1 - yes | |
7 | Loop 1: recv tag 1 |
One possible fix is to use MPI_ANY_TAG
in a single Iprobe
call instead of (multiple) Iprobe
with 1 followed by Iprobe
with 2.
This would fix the issue if MPI does maintain sequential queuing of different tags.
But, I don't know MPI well enough to know if it does sequential queuing. You could test this by modifying your code to do only one Iprobe
with the MPI_ANY_TAG
as mentioned.
Another way is to have create a common message header struct:
#define MAX_SIZE 2048
struct mymsg {
int msg_type; // message type
unsigned int msg_seqno; // sequence number
unsigned int msg_count; // number of elements
union {
unsigned int msg_1[MAX_SIZE]; // type 1 data
unsigned float msg_2[MAX_SIZE]; // type 2 data
};
};
Then, always send using a single tag value (e.g. 1) and decode your data according to the msg_type
field in your struct.
With such a message struct, you could also use different MPI tags but hold the message in a local queue if you get a sequence number that isn't sequential.
That is, if you get a sequence such as: 1, 4, 2, 5, 3
:
Loosely, this is how networking software reassembles TCP messages that are split and the segments arrive out of order.