c++parallel-processingmpinonblockingsequential

Are MPI messages of different tags matched sequentially?


I'm trying to build an MPI program that sends two types of messages from an undermined number of processors to process A using nonblocking send (the program is randomized, and I am using tags to demarcate which message is of which type. The first type of message is used to send a vector of data, and the second type is used to tell the process no more messages are going to be sent. The code looks like this:

void send_data() {
   /*... initialization of variables ...*/
   MPI_Isend( loc_buffer1.data(),        //Address of the message we are sending.
           bufferlen,                  //Number of elements handled by that address.
           MPI_INT,            //MPI_TYPE of the message we are sending.
           new_proc,           //Rank of receiving process
           1,                  //Message Tag
           MPI_COMM_WORLD,      //MPI Communicator
           &request1 ); }

}


void check_for_criteria() {
   if (criteria_met) { send_data(); }

   // Telling the target process there is no more data to be sent 

   MPI_Isend( NULL,        //Address of the message we are sending.
           0,                  //Number of elements handled by that address.
           MPI_CHAR,            //MPI_TYPE of the message we are sending.
           new_proc,           //Rank of receiving process
           2,                  //Message Tag
           MPI_COMM_WORLD,      //MPI Communicator
           &request1 ); }
}


void receive_parallel_comm_helper(...) {

            int test_flag1 = 0;
            int test_flag2 = 0;
            MPI_Status status1;
            MPI_Status status2;
            std::vector<int> new_loc_buffer1(9);
            
            std::vector<bool> stop_par_Array(num_procs, 0);
            stop_par_Array[rank] = 1;
            bool stop_par = 0;
            
            while ((!stop_par)) {
                for (int i=0; i<num_procs; i++) {
                    if (i != rank) {
                        stop_par = 1;

                        status1.MPI_TAG = 0;
                        status2.MPI_TAG = 0;

                        test_flag1 = 0;
                        test_flag2 = 0;
                        MPI_Iprobe(i, 1, MPI_COMM_WORLD, &test_flag1, &status1);

                        if ((status1.MPI_TAG == 1) (test_flag1)) {
                            MPI_Recv(new_loc_buffer1.data(), 9, MPI_INT, i, 1, MPI_COMM_WORLD, &status1);
                        }

                        test_flag2 = 0;

                        MPI_Iprobe(MPI_ANY_SOURCE, 2, MPI_COMM_WORLD, &test_flag2, &status2);
                        if ((status2.MPI_TAG == 2) && (test_flag2)) {
                           MPI_Recv(NULL, 0, MPI_CHAR, status2.MPI_SOURCE, par_done_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
                            stop_par_Array[status2.MPI_SOURCE] = 1;
                        }
                        for (int j = 0; j < num_procs; j++) {
                            stop_par &= stop_par_Array[j];
                        }

                    }
                }
            }
                
            MPI_Barrier(MPI_COMM_WORLD);

        }

 


void main_function() {
    while (some_criteria) {
       ...
       check_for_criteria();
       MPI_Barrier(MPI_COMM_WORLD);
       receive_parallel_comm_helper();
       ...
    }

}

Essentially, when main_function() is called, each process checks some criteria to see if it should send data to another process. Whether data is sent or not, a null message is then sent to indicate that the sending process is complete. Then, each process enters the receive_parallel_comm_helper() function, and a while loop spins while waiting to receive: 1. data and 2. a null message indicating the data sending from that processor is complete. However, I am currently not getting the desired behavior - data is often sent but received in a later iteration of the main_function() while loop, as the null message is for some reason received first. I know that message sequentiality is guaranteed by MPI, but this code relies on the assumption that message sending is sequential for messages with different tags - is this not the case? If not, how could this code be corrected?


Solution

  • Even if MPI does maintain sequential queuing of different tags, you have a race condition because you do separate Iprobe calls for tags 1 and 2:

    time sender receiver
    1 Loop 0: Iprobe tag 1 - no
    2 send tag 1
    3 send tag 2
    4 Loop 0: Iprobe tag 2 - yes
    5 Loop 0: recv tag 2
    6 Loop 1: Iprobe tag 1 - yes
    7 Loop 1: recv tag 1

    One possible fix is to use MPI_ANY_TAG in a single Iprobe call instead of (multiple) Iprobe with 1 followed by Iprobe with 2.

    This would fix the issue if MPI does maintain sequential queuing of different tags.

    But, I don't know MPI well enough to know if it does sequential queuing. You could test this by modifying your code to do only one Iprobe with the MPI_ANY_TAG as mentioned.


    Another way is to have create a common message header struct:

    #define MAX_SIZE        2048
    
    struct mymsg {
        int msg_type;                           // message type
        unsigned int msg_seqno;                 // sequence number
        unsigned int msg_count;                 // number of elements
        union {
            unsigned int msg_1[MAX_SIZE];       // type 1 data
            unsigned float msg_2[MAX_SIZE];     // type 2 data
        };
    };
    

    Then, always send using a single tag value (e.g. 1) and decode your data according to the msg_type field in your struct.


    With such a message struct, you could also use different MPI tags but hold the message in a local queue if you get a sequence number that isn't sequential.

    That is, if you get a sequence such as: 1, 4, 2, 5, 3:

    1. Receive 1 -- process 1
    2. Receive 4 -- hold 4
    3. Receive 2 -- process 2
    4. Receive 5 -- hold 5
    5. Receive 3 -- process 3, 4, 5

    Loosely, this is how networking software reassembles TCP messages that are split and the segments arrive out of order.