1

I start 2 processes because I only have 2 gpus but then it gives me a Exception: process 0 terminated with signal SIGSEGV. This code does work with multiple cpus (or at least no error is thrown). Also, it works with a single GPU. Besides that is fails when world_size > 0 and multiple cuda/gpus are present.

My error message this this:

(automl-meta-learning) miranda9~/ML4Coq $ python playground/multiprocessing_playground/ddp_hello_world.py


world_size=2


Traceback (most recent call last):
  File "playground/multiprocessing_playground/ddp_hello_world.py", line 49, in <module>
    main()
  File "playground/multiprocessing_playground/ddp_hello_world.py", line 43, in main
    mp.spawn(example,
  File "/home/miranda9/miniconda3/envs/automl-meta-learning/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 199, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/home/miranda9/miniconda3/envs/automl-meta-learning/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 157, in start_processes
    while not context.join():
  File "/home/miranda9/miniconda3/envs/automl-meta-learning/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 105, in join
    raise Exception(
Exception: process 0 terminated with signal SIGSEGV

This is the code that gives the error:

import os

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
    # create default process group
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '8888'
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()

def main():
    # world_size = 2
    world_size = torch.cuda.device_count()
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    main()
    print('Done\n\a')

[Optional] Larger self-contained example (gives same error)

Note however, that this slightly more complete example (only missing a distributed dataloader) also gives me the same issue:

"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Correctness of code: https://stackoverflow.com/questions/66226135/how-to-parallelize-a-training-loop-ever-samples-of-a-batch-when-cpu-is-only-avai

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import time

from typing import Tuple

import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

import os

num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]

class PerDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        self.net1 = nn.Linear(Din, Din)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def get_batch(batch: Tuple[torch.Tensor, torch.Tensor], rank):
    x, y = batch
    if torch.cuda.is_available():
        x, y = x.to(rank), y.to(rank)
    else:
        x, y = x.share_memory_(), y.share_memory_()
    return x, y

def get_ddp_model(model: nn.Module, rank):
    """
    Moves the underlying storage to shared memory.

        This is a no-op if the underlying storage is already in shared memory
        and for CUDA tensors. Tensors in shared memory cannot be resized.

    :return:

    TODO: does this have to be done outside or inside the process? my guess is that it doesn't matter because
    1) if its on gpu once it's on the right proc it moves it to cpu with id rank via mdl.to(rank)
    2) if it's on cpu then mdl.share_memory() or data.share_memory() is a no op if it's already in shared memory o.w.
    """
    # if gpu avail do the standard of creating a model and moving the model to the GPU with id rank
    if torch.cuda.is_available():
    # create model and move it to GPU with id rank
        model = model.to(rank)
        ddp_model = DDP(model, device_ids=[rank])
    else:
    # if we want multiple cpu just make sure the model is shared properly accross the cpus with shared_memory()
    # note that op is a no op if it's already in shared_memory
        model = model.share_memory()
        ddp_model = DDP(model)  # I think removing the devices ids should be fine...?
    return ddp_model
    # return OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    setup_process(rank, world_size)
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')

    # get ddp model
    model = PerDeviceModel()
    ddp_model = get_ddp_model(model, rank)

    # do training
    for batch_idx, batch in enumerate(data):
        x, y = get_batch(batch, rank)
        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

        optimizer.zero_grad()
        outputs = ddp_model(x)
        # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
        loss_fn(outputs, y).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
        optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # args
    if torch.cuda.is_available():
        world_size = torch.cuda.device_count()
    else:
        world_size = mp.cpu_count()
    print(f'world_size={world_size}')
    mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)

if __name__ == "__main__":
    print('starting __main__')
    start = time.time()
    main()
    print(f'execution length = {time.time() - start}')
    print('Done!\a\n')

cross posted: https://discuss.pytorch.org/t/why-is-mp-spawn-spawning-4-processes-when-i-only-want-2/112299

Charlie Parker
  • 5,884
  • 57
  • 198
  • 323
  • 1
    Why do you think it spawns 4 processes? Clearly, as can be seen from the stdout, `world_size==2`, so it will spawn only 2 processes. This is in agreement with the remaining stdout, where there is only `SpawnProcess-1` and `SpawnProcess-2`. – a_guest Feb 18 '21 at 21:17
  • 1
    @a_guest you are right. It's only doing it twice. It was a guess for my error, regardless I still have that `Exception: process 0 terminated with signal SIGSEGV` which I am trying to fix (will update title) – Charlie Parker Feb 18 '21 at 21:27
  • 1
    For the `SIGSEGV` I suggest running your code without multiprocessing to see what goes wrong (i.e. `run_parallel_training_loop` without `setup_process`). – a_guest Feb 18 '21 at 22:01
  • 1
    @a_guest I already did that and it works fine. No errors at all. – Charlie Parker Feb 18 '21 at 22:04
  • At the moment there's lots of code in your question and it seems that the "4 instead of 2 processes" part is not relevant anymore? Could you perhaps update title and content of your question to reduce it to the absolute minimum that is necessary to reproduce your problem and also include the full output/error that you receive from running this code. – a_guest Feb 18 '21 at 22:46
  • @a_guest seems like a sensible suggestion. I've done that. I left the larger example at the end and labeled it optional since it is much more realistic but still completely runs by itself. The first example is much simpler and hope it helps. (yes the 4 & 2 part is not relevant) – Charlie Parker Feb 18 '21 at 23:06
  • The output above the (minimal) code example doesn't seem to correspond to that code. There's lots of extra print stuff, so it makes it more difficult to trace the actual error. By the way, is this code example identical to the [tutorial here](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)? If not, did you try to run that tutorial? – a_guest Feb 18 '21 at 23:15
  • @a_guest I will make sure to run it again and update the question. There are many tutorials with nearly identical code (and many of them don't run/work without the modification I made above). So let's use the one I am using. This blog comments about the weird state of pytorch's tutorials on distributed training not being super good https://github.com/yangkky/distributed_tutorial/blob/master/ddp_tutorial.md – Charlie Parker Feb 18 '21 at 23:20

2 Answers2

0


I ran your "(minimal) code example" without any change and any error on a server with 4 GPUs (python version: 3.6.9, and pytorch version: 1.5.0+cu101 ).
Does the problem still exist when you run the minimal code example?
If so, and if you are on a linux machine, could you please run the following code instead, and tell me what output you get:

import os

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP

def get_visible_gpus():
    ns = os.popen('nvidia-smi')
    lines_ns = ns.readlines()
    # print(lines_ns)
    for _i, _line in enumerate(lines_ns):
        if _line.find('|=') >= 0:
            break
    line_gpus = lines_ns[_i:]
    for _i, _line in enumerate(line_gpus):
        if _line.find('Processes') >= 0:
            break
    line_gpus = line_gpus[:_i-3]
    # print(line_gpus)
    idx_gpu_lines = []
    for _i, _line in enumerate(line_gpus):
        if _line.find('+') >= 0:
            idx_gpu_lines.append(_i+1)
    idx_gpus = []
    for _line_gpu in  idx_gpu_lines:
        idx_gpus.append(int(line_gpus[_line_gpu].split()[1]))
    # print(idx_gpus)
    return idx_gpus

def example(rank, world_size):
    print('rank:{}'.format(rank))
    # create default process group
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '8888'
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()

def main():
    # world_size = 2
    world_size = torch.cuda.device_count()
    print('world_size:{}'.format(world_size))
    print('get_visible_gpus():{}'.format(get_visible_gpus()))
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__ == "__main__":
    print(torch.__version__)
    main()
    print('Done\n\a')

In my case, I simply get:

1.5.0+cu101
world_size:4
get_visible_gpus():[0, 1, 2, 3]
rank:1
rank:3
rank:0
rank:2
Done

get_visible_gpus() is simply text parsing an nvidia-smi shell cmd to get the ids of the gpus that cuda can see.

NB: Please excuse me, I would have commented instead of "answering" -as I am not directly solving your problem, but asking for more details- but my reputation is not good enough T.T

Romain Renard
  • 96
  • 1
  • 4
0

Solution: increase shm-size

docker run -it \
    --shm-size=64g 

Reason: If you run on docker container, it's probably because the shm_size of docker is not large enough. By default, Docker containers are allocated 64 MB of shared memory. This shared memory is not a memory limit, but a /dev/shm temporary file storage file system that uses RAM to store files. This is used for IPC. For check shm-size. After entering the container you can use df to view the shm size.

Luke
  • 19
  • 3