c++multithreadingmpi

What is the best practice for protecting a shared communicator in a MPI_THREAD_MULTIPLE context?


I am writing some code on top of an existing library which uses MPI_THREAD_SERIALIZED internally. However, I have a need to use MPI_THREAD_MULTIPLE. For ease of presentation, let's say each process needs to compute with two threads. Each thread has it's own duplicate of MPI_COMM_WORLD so that they can use some non-shared objects concurrently. This works totally fine.

However, the library I am making use of does not anticipate this use case and you have essentially two options, that I can think of, 1) duplicate the object for the use of each thread or 2) ensure that only one thread attempts to use the object at any given time.

These objects can be so large that duplicating them would run me out of memory, so option 2 is the only workable solution.

With all that said, I think the issue can be reduced to, "How can I protect a call to MPI_COMM_WORLD while using MPI_THREAD_MULTIPLE?"

Assume that the function foo may be called from multiple threads at one time. I attempted to solve my issue with a std::mutex, something like.

int foo1(int thread_id)
{
  MPI_Barrier(communicator[thread_id]);
  my_mutex.lock();
  // Some MPI call which needs to happen over MPI_COMM_WORLD
  MPI_Barrier(communicator[thread_id]);
  my_mutex.unlock();
}

int foo2(int thread_id)
{
  MPI_Barrier(communicator[thread_id]);
  my_mutex.lock();
  // Some other MPI call which needs to happen over MPI_COMM_WORLD
  MPI_Barrier(communicator[thread_id]);
  my_mutex.unlock();
}

communicator is a std::vector<MPI_Comm> so that each thread ID has its own communicator to use.

Let us assume that thread 0 on each process calls foo1 at the same time and thread 1 on each process calls foo2 at the same time. I thought this would work, but it seems that I get a deadlock of this form: Thread 0 on process 0 has the mutex lock and is doing some MPI_COMM_WORLD call. Thread 1 on process 0 is waiting for the mutex. Thread 0 on process 1 is waiting on the mutex. Thread 1 on process 1 is doing some other call with MPI_COMM_WORLD. However, the goal would be for the same thread id to obtain the mutex on each process at the same time.

If it matters, I am launching with mpirun -n 2 -bind-to none ./my_program.

I have been asked for an example program. The following seems to hang in pretty much the way my real program does and is just about as small as I can make it.

I am compiling with mpicxx ./example.cpp -std=c++11 -g, and running with mpirun -n 2 -bind-to none a.out or to inspect how it is hanging I run mpirun -n 2 -bind-to none xterm -e gdb -ex run.

#include <iostream>
#include <mutex>
#include <vector>
#include <mpi/mpi.h>
#include <thread>

void foo(int){}

class MyClass
{
    public:
        MyClass()
            : communicators()
        {
            MPI_Comm new_comm_1;
            MPI_Comm new_comm_2;
            MPI_Comm_dup(MPI_COMM_WORLD, &new_comm_1);
            MPI_Comm_dup(MPI_COMM_WORLD, &new_comm_2);
            communicators.push_back(new_comm_1);
            communicators.push_back(new_comm_2);
        }

        void run()
        {
            unsigned int count = 0;
            int my_rank = 0;
            MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
            while(true){
                std::thread t1 = std::thread(&MyClass::foo1,this);
                std::thread t2 = std::thread(&MyClass::foo2,this);
                if (my_rank == 0)
                    std::cout << "Currently in iteration " << count << "\n";

                t1.join();
                t2.join();
                count++;
            }
        }

    private:
        std::vector<MPI_Comm> communicators;
        std::mutex my_mutex;

        void foo1()
        {
            int local_int = rand();
            int reduced_int = 0;
            MPI_Barrier(communicators[0]);
            std::unique_lock<std::mutex> lock(my_mutex);
            MPI_Allreduce(&local_int, &reduced_int, 1, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD);
            MPI_Barrier(communicators[0]);
        }

        void foo2()
        {
            int local_int = rand();
            int reduced_int = 0;
            MPI_Barrier(communicators[1]);
            std::unique_lock<std::mutex> lock(my_mutex);
            MPI_Scatter(&local_int, 1, MPI_INT, &reduced_int, 1, MPI_INT, 0,
                    MPI_COMM_WORLD);
            MPI_Barrier(communicators[1]);
        }
};

int main(int n_args, char** args)
{
    int provided = 0;
    MPI_Init_thread(&n_args,&args,MPI_THREAD_MULTIPLE,&provided);

    std::cout << "Your MPI implementation provided thread level " << provided
        << ".\t You requested level " << MPI_THREAD_MULTIPLE << "\n";

    MyClass obj{};
    obj.run();
}

Solution

  • I was able to find a solution. As I explained, but perhaps not very well, the problem with the approach in the post is that after MPI_Barrier, it is possible for different threads on each process to get the mutex, which leads to issues for all sorts of reasons--conflicting calls over MPI_COMM_WORLD, deadlocks etc.

    After thinking for a couple days, I came up with the following solution, which I think is airtight. The key is for the root process to be the only process to decide who may proceed. The means replacing

    MPI_Barrier(communicators[*]);
    std::unique_lock<std::mutex> lock(my_mutex);
    

    in each of foo1 and foo2 with

    int rank;
    MPI_Comm_rank(communicators[*], &rank);
    std::unique_lock<std::mutex> lock;
    if (rank == 0){
        std::unique_lock<std::mutex> lock_(my_mutex);
        MPI_Barrier(communicators[*]);
        lock.swap(lock_);
    }
    else{
        MPI_Barrier(communicators[*]);
    }
    

    Note that I am using * as a placeholder for either 0 or 1. Each function must still use it's own communicator for MPI_Barrier calls. This may not work in a more complicated situation in which communicators is not filled with copies of MPI_COMM_WORLD, but that is my use case so it works for me.

    I also wanted to mention that I forgot MPI_Finalize in the example program, but as it was designed to have an infinite loop maybe I can be forgiven.