20

I am trying to spawn a couple of process using pytorch's multiprocessing module within a openmpi distributed back-end. What I have is the following code:

def run(rank_local, rank, world_size, maingp):
    print("I WAS SPAWNED ", rank_local, " OF ", rank)

    tensor = torch.zeros(1)
    tensor += 1

    if rank == 0:
        tensor += 100
        dist.send(tensor, dst=1)
    else:
        print("I am spawn: ", rank, "and my tensor value before receive: ", tensor[0])
        dist.recv(tensor, src=0)
        print("I am spawn: ", rank, "and my tensor value after  receive: ", tensor[0])


if __name__ == '__main__':

    # Initialize Process Group
    dist.init_process_group(backend="mpi", group_name="main")
    maingp = None #torch.distributed.new_group([0,1])
    mp.set_start_method('spawn')    

    # get current process information
    world_size = dist.get_world_size()
    rank = dist.get_rank()

    # Establish Local Rank and set device on this node
    mp.spawn(run, args=(rank, world_size, maingp), nprocs=1)

I run this code using the openmpi as follows:

mpirun -n 2 python code.py

So my understanding is that mpirun creates two process with ranks [0, 1], each of these process spawn new process with their local rank as 0. Now if I want to communicate between these two sub-processes of the main process I get some Traceback and following error:

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/home/usama/anaconda3/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 19, in _wrap
    fn(i, *args)
  File "/home/usama/code/test/code.py", line 19, in run
    dist.send(tensor, dst=1)
  File "/home/usama/anaconda3/lib/python3.6/site-packages/torch/distributed/distributed_c10d.py", line 666, in send
    _check_default_pg()
  File "/home/usama/anaconda3/lib/python3.6/site-packages/torch/distributed/distributed_c10d.py", line 191, in _check_default_pg
    "Default process group is not initialized"
AssertionError: Default process group is not initialized

My question is how do I make these sub-processes to be able to communicate i.e the [0, 0] process sending something to [1, 0] process. Any ideas?

usamazf
  • 3,195
  • 4
  • 22
  • 40

1 Answers1

0

Sometimes our questions become too restrictive due to premature optimization, like the choice of MPI backend in this case... it may be actually impossible, given that the popular distributed training framework Ray, which supports the other two backends, NCCL and Gloo, does not support MPI, see its code:

RuntimeError for Backend.MPI

An example of using Ray for distributed training of PyTorch models with backends other than MPI (source):

import pytorch_lightning as pl
from ray_lightning import RayPlugin

# Create your PyTorch Lightning model here.
ptl_model = MNISTClassifier(...)
plugin = RayPlugin(num_workers=4, num_cpus_per_worker=1, use_gpu=True)

# If using GPUs, set the ``gpus`` arg to a value > 0.
# The actual number of GPUs is determined by ``num_workers``.
trainer = pl.Trainer(..., gpus=1, plugins=[plugin])
trainer.fit(ptl_model)
mirekphd
  • 4,799
  • 3
  • 38
  • 59