1

I am trying to send message to all MPI processes from a process and also receive message from all those processes in a process. It is basically an all to all communication where every process sends message to every other process (except itself) and receives message from every other process.

The following example code snippet shows what I am trying to achieve. Now, the problem with MPI_Send is its behavior where for small message size it acts as non-blocking but for the larger message (in my machine BUFFER_SIZE 16400) it blocks. I am aware of this is how MPI_Send behaves. As a workaround, I replaced the code below with blocking (send+recv) which is MPI_Sendrecv. Example code is like this MPI_Sendrecv(intSendPack, BUFFER_SIZE, MPI_INT, processId, MPI_TAG, intReceivePack, BUFFER_SIZE, MPI_INT, processId, MPI_TAG, MPI_COMM_WORLD, MPI_STATUSES_IGNORE) . I am making the above call for all the processes of MPI_COMM_WORLD inside a loop for every rank and this approach gives me what I am trying to achieve (all to all communication). However, this call takes a lot of time which I want to cut-down with some time-efficient approach. I have tried with mpi scatter and gather to perform all to all communication but here one issue is the buffer size (16400) may differ in actual implementation in different iteration for MPI_all_to_all function calling. Here, I am using MPI_TAG to differentiate the call in different iteration which I cannot use in scatter and gather functions.

#define BUFFER_SIZE 16400

void MPI_all_to_all(int MPI_TAG)
{

    int size;
    int rank;
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    int* intSendPack = new int[BUFFER_SIZE]();
    int* intReceivePack = new int[BUFFER_SIZE]();

    for (int prId = 0; prId < size; prId++) {
        if (prId != rank) {
            MPI_Send(intSendPack, BUFFER_SIZE, MPI_INT, prId, MPI_TAG,
            MPI_COMM_WORLD);
          }
    }

    for (int sId = 0; sId < size; sId++) {
        if (sId != rank) {
            MPI_Recv(intReceivePack, BUFFER_SIZE, MPI_INT, sId, MPI_TAG,
            MPI_COMM_WORLD, MPI_STATUSES_IGNORE);
        }
    }
}

I want to know if there is a way I can perform all to all communication using any efficient communication model. I am not sticking to MPI_Send, if there is some other way which provides me what I am trying to achieve, I am happy with that. Any help or suggestion is much appreciated.

Faysal
  • 35
  • 6
  • I don't understand why you are coding your own all_to_all where there is already `MPI_Alltoall`? – AlexG May 24 '19 at 18:02
  • @AlexG, may be the subject line of my question created the confusion, but I am not trying to achieve what MPI_Alltoall function does. I am trying to achieve something similar to MPI_Gather. Please take a look at the following post https://stackoverflow.com/questions/15049190/difference-between-mpi-allgather-and-mpi-alltoall-functions – Faysal May 24 '19 at 18:14
  • [`MPI_Send`](https://www.mpich.org/static/docs/latest/www3/MPI_Send.html) is blocking. I suspect what you're referring to is buffering, but not sure. You shouldn't use point-to-point communication instead of global communication because it's most likely going to be less efficient (except if you're planning on something special/dedicated but keep in mind it's tricky). Keeping in mind the referenced post, what is it actually that you want to do? Just send around and overwrite arrays? Or something else? – atru May 24 '19 at 18:49
  • @atru, yes, by blocking I meant the certain limit of buffering after which MPI_Send acts like blocking call until a matching MPI_Recv is posted. I want a time-efficient global communication. About your question of what I am actually trying to do, consider every process in the MPI Comm world has some update to share with rest of the processes. So, every process is going to send its own computation results (update) to all other processes and also will receive the results from the rest of the processes. – Faysal May 24 '19 at 20:15
  • 1
    @Faysal if all of them update all others and need upates from others at the same point of the program then it would probably be best to use Allgather. If one needs to update all but others don't update then and that one does not need their updates, then it's a Bcast. If you clarify, I'll post an answer with some benchmarking. Also, you're overwriting the common array - is that intended? – atru May 24 '19 at 20:50
  • @atru, I tried with Allgather and made the recvbuffer large enough (sendbuffer*total_processes) to let the update of all the processes be written in a common array as how Allgather operates. However, the size of the sendbuffer changes in every call to MPI_all_to_all (at the end of MPI_all_to_all call, I gather the update using MPI_Allgather) in different iterations. That's why I need the MPI_tag to differentiate the messages in different iterations which MPI_Allgather does not have. I got the buffer size mismatch error by using MPI_Allgather. Please let me know if I could not clarify properly. – Faysal May 24 '19 at 21:17
  • You could create a large, reusable receive buffer and just allgather to it at each iteration with different send buffer sizes. If majority of sizes are within some range but some are much larger you can also resize it while iterating (though this will be costly). This assumes that message sizes are the same on all ranks at a given iteration. – atru May 24 '19 at 21:41
  • 1
    You can use the same buffer for send (this is basically a `MPI_Bcast()`) but your snippet keeps overwriting the receive buffer. Can you please update your code to clarify what you are trying to achieve ? It seems `MPI_Allgather()` is a fit here. – Gilles Gouaillardet May 24 '19 at 23:36
  • @atru, I apologize for the delayed response as I was trying to make my code work with MPI_Allgather. Before posting my problem here I tried with MPI_Allgather and could not make it work. After getting your comments I wanted to give it another try and this time it worked. The issue was in my code that deals with the send buffer and the receive buffer sizes. However, I do like to have some benchmarking with point to point communication (send-receive) vs collective communication as I am not seeing much difference in performance after updating my code. Please post your answer as you mentioned. – Faysal May 27 '19 at 23:46
  • @Gilles, originally I wanted to post my complete problem (code), but I could not find a sensible way to present as it would be too big to be posted. If I posted some snippet, describing it would be even harder. That's why I went for the above code which basically shows what I am trying to achieve. I appreciate your suggestion. – Faysal May 27 '19 at 23:58

1 Answers1

1

This is a benchmark that allows to compare performance of collective vs. point-to-point communication in an all-to-all communication,

#include <iostream>
#include <algorithm>
#include <mpi.h>

#define BUFFER_SIZE 16384

void point2point(int*, int*, int, int);

int main(int argc, char *argv[])
{
    MPI_Init(&argc, &argv);

    int rank_id = 0, com_sz = 0;
    double t0 = 0.0, tf = 0.0;
    MPI_Comm_size(MPI_COMM_WORLD, &com_sz);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank_id);

    int* intSendPack = new int[BUFFER_SIZE]();
    int* result = new int[BUFFER_SIZE*com_sz]();
    std::fill(intSendPack, intSendPack + BUFFER_SIZE, rank_id);
    std::fill(result + BUFFER_SIZE*rank_id, result + BUFFER_SIZE*(rank_id+1), rank_id);

    // Send-Receive
    t0 = MPI_Wtime();
    point2point(intSendPack, result, rank_id, com_sz);
    MPI_Barrier(MPI_COMM_WORLD);
    tf = MPI_Wtime();
    if (!rank_id)
        std::cout << "Send-receive time: " << tf - t0 << std::endl;

    // Collective
    std::fill(result, result + BUFFER_SIZE*com_sz, 0);
    std::fill(result + BUFFER_SIZE*rank_id, result + BUFFER_SIZE*(rank_id+1), rank_id);
    t0 = MPI_Wtime();
    MPI_Allgather(intSendPack, BUFFER_SIZE, MPI_INT, result, BUFFER_SIZE, MPI_INT, MPI_COMM_WORLD);
    MPI_Barrier(MPI_COMM_WORLD);
    tf = MPI_Wtime();
    if (!rank_id)
        std::cout << "Allgather time: " << tf - t0 << std::endl;

    MPI_Finalize();
    delete[] intSendPack;
    delete[] result;
    return 0;
}

// Send/receive communication
void point2point(int* send_buf, int* result, int rank_id, int com_sz)
{
    MPI_Status status;
    // Exchange and store the data
    for (int i=0; i<com_sz; i++){
        if (i != rank_id){
            MPI_Sendrecv(send_buf, BUFFER_SIZE, MPI_INT, i, 0, 
                result + i*BUFFER_SIZE, BUFFER_SIZE, MPI_INT, i, 0, MPI_COMM_WORLD, &status);
        }
    }
}

Here every rank contributes its own array intSendPack to the array result on all other ranks that should end up the same on all the ranks. result is flat, each rank takes BUFFER_SIZE entries starting with its rank_id*BUFFER_SIZE. After the point-to-point communication, the array is reset to its original shape.

Time is measured by setting up an MPI_Barrier which will give you the maximum time out of all ranks.

I ran the benchmark on 1 node of Nersc Cori KNL using slurm. I ran it a few times each case just to make sure the values are consistent and I'm not looking at an outlier, but you should run it maybe 10 or so times to collect more proper statistics.

Here are some thoughts:

  • For small number of processes (5) and a large buffer size (16384) collective communication is about twice faster than point-to-point, but it becomes about 4-5 times faster when moving to larger number of ranks (64).
  • In this benchmark there is not much difference between performance with recommended slurm settings on that specific machine and default settings but in real, larger programs with more communication there is a very significant one (job that runs for less than a minute with recommended will run for 20-30 min and more with default). Point of this is check your settings, it may make a difference.
  • What you were seeing with Send/Receive for larger messages was actually a deadlock. I saw it too for the message size shown in this benchmark. In case you missed those, there are two worth it SO posts on it: buffering explanation and a word on deadlocking.

In summary, adjust this benchmark to represent your code more closely and run it on your system, but collective communication in an all-to-all or one-to-all situations should be faster because of dedicated optimizations such as superior algorithms used for communication arrangement. A 2-5 times speedup is considerable, since communication often contributes to the overall time the most.

atru
  • 4,699
  • 2
  • 18
  • 19