I have a problem with running a distributed training of pytorch using torchrun. first of all, this is the script I try to run:
import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import argparse
import os
import random
import numpy as np
import time
import importlib
if 'LOCAL_RANK' in os.environ:
# Environment variables set by torch.distributed.launch or torchrun
LOCAL_RANK = int(os.environ['LOCAL_RANK'])
WORLD_SIZE = int(os.environ['WORLD_SIZE'])
WORLD_RANK = int(os.environ['RANK'])
elif 'OMPI_COMM_WORLD_LOCAL_RANK' in os.environ:
# Environment variables set by mpirun
LOCAL_RANK = int(os.environ['OMPI_COMM_WORLD_LOCAL_RANK'])
WORLD_SIZE = int(os.environ['OMPI_COMM_WORLD_SIZE'])
WORLD_RANK = int(os.environ['OMPI_COMM_WORLD_RANK'])
else:
import sys
sys.exit("Can't find the evironment variables for local rank")
def set_random_seeds(random_seed=0):
torch.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)
def evaluate(model, device, test_loader):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
images, labels = data[0].to(device), data[1].to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracy = correct / total
return accuracy
def main():
num_epochs_default = 10000
batch_size_default = 256 # 1024
learning_rate_default = 0.1
random_seed_default = 0
model_dir_default = "saved_models"
model_filename_default = "resnet_distributed.pth"
w = 32
h = 32
c = 3
num_steps_syn = 20
# Each process runs on 1 GPU device specified by the local_rank argument.
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--local_rank", type=int, help="Local rank. Necessary for using the torch.distributed.launch utility.")
parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
parser.add_argument("--learning_rate", type=float, help="Learning rate.", default=learning_rate_default)
parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
parser.add_argument("--backend", type=str, help="Backend for distribted training.", default='nccl', choices=['nccl', 'gloo', 'mpi'])
parser.add_argument("--arch", type=str, help="Model architecture.", default='resnet50', choices=['resnet50', 'resnet18', 'resnet101', 'resnet152'])
parser.add_argument("--use_syn", action="store_true", help="Use synthetic data")
argv = parser.parse_args()
local_rank = argv.local_rank
num_epochs = argv.num_epochs
batch_size = argv.batch_size
learning_rate = argv.learning_rate
random_seed = argv.random_seed
model_dir = argv.model_dir
model_filename = argv.model_filename
resume = argv.resume
backend = argv.backend
use_syn = argv.use_syn
# Create directories outside the PyTorch program
# Do not create directory here because it is not multiprocess safe
'''
if not os.path.exists(model_dir):
os.makedirs(model_dir)
'''
model_filepath = os.path.join(model_dir, model_filename)
# We need to use seeds to make sure that the models initialized in different processes are the same
set_random_seeds(random_seed=random_seed)
# Initializes the distributed backend which will take care of sychronizing nodes/GPUs
print(WORLD_SIZE)
torch.distributed.init_process_group(backend=backend, rank=WORLD_RANK, world_size=WORLD_SIZE)
# Encapsulate the model on the GPU assigned to the current process
model = getattr(torchvision.models, argv.arch)(pretrained=False)
device = torch.device("cuda:{}".format(LOCAL_RANK))
model = model.to(device)
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[LOCAL_RANK], output_device=LOCAL_RANK)
# We only save the model who uses device "cuda:0"
# To resume, the device for the saved model would also be "cuda:0"
if resume == True:
map_location = {"cuda:0": "cuda:{}".format(LOCAL_RANK)}
ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))
if use_syn:
# Synthetic data
inputs_syn = torch.rand((batch_size, c, w, h)).to(device)
labels_syn = torch.zeros(batch_size, dtype=torch.int64).to(device)
else:
# Prepare dataset and dataloader
transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# Data should be prefetched
# Download should be set to be False, because it is not multiprocess safe
train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=transform)
test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=transform)
# Restricts data loading to a subset of the dataset exclusive to the current process
train_sampler = DistributedSampler(dataset=train_set)
train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
# Test loader does not have to follow distributed sampling strategy
test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)
# Loop over the dataset multiple times
times = []
for epoch in range(num_epochs):
print("Local Rank: {}, Epoch: {}, Training ...".format(LOCAL_RANK, epoch))
# Save and evaluate model routinely
if not use_syn:
if epoch % 10 == 0:
if LOCAL_RANK == 0:
accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
torch.save(ddp_model.state_dict(), model_filepath)
print("-" * 75)
print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
print("-" * 75)
ddp_model.train()
train_loader.sampler.set_epoch(epoch)
if use_syn:
start_epoch = time.time()
for count in range(num_steps_syn):
optimizer.zero_grad()
outputs = ddp_model(inputs_syn)
loss = criterion(outputs, labels_syn)
loss.backward()
optimizer.step()
torch.cuda.synchronize()
end_epoch = time.time()
elapsed = end_epoch - start_epoch
if epoch > 0:
times.append(elapsed)
print('num_steps_per_gpu: {}, avg_step_time: {:.4f}'.format(count, elapsed / count))
else:
start_epoch = time.time()
count = 0
for data in train_loader:
inputs, labels = data[0].to(device), data[1].to(device)
optimizer.zero_grad()
outputs = ddp_model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
count += 1
torch.cuda.synchronize()
end_epoch = time.time()
elapsed = end_epoch - start_epoch
if epoch > 0:
times.append(elapsed)
print('num_steps_per_gpu: {}, avg_step_time: {:.4f}'.format(count, elapsed / count))
avg_time = sum(times) / (num_epochs - 1)
print("Average epoch time: {}".format(avg_time))
if __name__ == "__main__":
main()
I want to run this on 3 nodes. The command I use to run this on each of the nodes is this: node1:
torchrun --nproc_per_node=1 --nnodes=3 --node_rank=0 --master_addr=192.168.1.92 --master_port=8000 main.py --backend=gloo --batch_size=64 --arch=resnet152
node2:
torchrun --nproc_per_node=1 --nnodes=3 --node_rank=1 --master_addr=192.168.1.92 --master_port=8000 main.py --backend=gloo --batch_size=64 --arch=resnet152
node3:
torchrun --nproc_per_node=1 --nnodes=3 --node_rank=2 --master_addr=192.168.1.92 --master_port=8000 main.py --backend=gloo --batch_size=64 --arch=resnet152
As you can see I'm trying with gloo
backend. But it hangs at the line:
torch.distributed.init_process_group(backend=backend, rank=WORLD_RANK, world_size=WORLD_SIZE)
and this is the messages I see when I set debug flags on node 1:
[I debug.cpp:49] [c10d] The debug level is set to DETAIL.
[I socket.cpp:442] [c10d - debug] The server socket will attempt to listen on an IPv6 address.
[I socket.cpp:492] [c10d - debug] The server socket is attempting to listen on [::]:8000.
[I socket.cpp:566] [c10d] The server socket has started to listen on [::]:8000.
[I socket.cpp:624] [c10d - debug] The client socket will attempt to connect to an IPv6 address of (192.168.1.92, 8000).
[I socket.cpp:699] [c10d - trace] The client socket is attempting to connect to [mortezauoc2]:8000.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc2]:32920.
[I socket.cpp:787] [c10d] The client socket has connected to [mortezauoc2]:8000 on [mortezauoc2]:32920.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc3]:37382.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc3]:37384.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc1]:46816.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc1]:46818.
[I socket.cpp:624] [c10d - debug] The client socket will attempt to connect to an IPv6 address of (192.168.1.92, 8000).
[I socket.cpp:699] [c10d - trace] The client socket is attempting to connect to [mortezauoc2]:8000.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc2]:32922.
[I socket.cpp:787] [c10d] The client socket has connected to [mortezauoc2]:8000 on [mortezauoc2]:32922.
[I debug.cpp:49] [c10d] The debug level is set to DETAIL.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc1]:46820.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc1]:46822.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc3]:37386.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc3]:37388.
gloo
[I socket.cpp:624] [c10d - debug] The client socket will attempt to connect to an IPv6 address of (192.168.1.92, 8000).
[I socket.cpp:699] [c10d - trace] The client socket is attempting to connect to [mortezauoc2]:8000.
[I socket.cpp:787] [c10d] The client socket has connected to [mortezauoc2]:8000 on [mortezauoc2]:32924.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc2]:32924.
[I socket.cpp:624] [c10d - debug] The client socket will attempt to connect to an IPv6 address of (192.168.1.92, 8000).
[I socket.cpp:699] [c10d - trace] The client socket is attempting to connect to [mortezauoc2]:8000.
[I socket.cpp:787] [c10d] The client socket has connected to [mortezauoc2]:8000 on [mortezauoc2]:32926.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc2]:32926.
I tried to see if it works with 2 nodes, and it sometimes work on two nodes but sometimes don't. This was for the gloo backend. Something like this also happens for nccl backend, but it hangs on the line
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[LOCAL_RANK], output_device=LOCAL_RANK)
and output of node1:
[I debug.cpp:49] [c10d] The debug level is set to DETAIL.
[I socket.cpp:442] [c10d - debug] The server socket will attempt to listen on an IPv6 address.
[I socket.cpp:492] [c10d - debug] The server socket is attempting to listen on [::]:8000.
[I socket.cpp:566] [c10d] The server socket has started to listen on [::]:8000.
[I socket.cpp:624] [c10d - debug] The client socket will attempt to connect to an IPv6 address of (192.168.1.92, 8000).
[I socket.cpp:699] [c10d - trace] The client socket is attempting to connect to [mortezauoc2]:8000.
[I socket.cpp:787] [c10d] The client socket has connected to [mortezauoc2]:8000 on [mortezauoc2]:32930.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc2]:32930.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc3]:37394.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc3]:37396.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc1]:46824.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc1]:46826.
[I socket.cpp:624] [c10d - debug] The client socket will attempt to connect to an IPv6 address of (192.168.1.92, 8000).
[I socket.cpp:699] [c10d - trace] The client socket is attempting to connect to [mortezauoc2]:8000.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc2]:32932.
[I socket.cpp:787] [c10d] The client socket has connected to [mortezauoc2]:8000 on [mortezauoc2]:32932.
[I debug.cpp:49] [c10d] The debug level is set to DETAIL.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc1]:46828.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc1]:46830.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc3]:37398.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc3]:37400.
nccl
[I socket.cpp:624] [c10d - debug] The client socket will attempt to connect to an IPv6 address of (192.168.1.92, 8000).
[I socket.cpp:699] [c10d - trace] The client socket is attempting to connect to [mortezauoc2]:8000.
[I socket.cpp:787] [c10d] The client socket has connected to [mortezauoc2]:8000 on [mortezauoc2]:32934.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc2]:32934.
[I socket.cpp:624] [c10d - debug] The client socket will attempt to connect to an IPv6 address of (192.168.1.92, 8000).
[I socket.cpp:699] [c10d - trace] The client socket is attempting to connect to [mortezauoc2]:8000.
[I socket.cpp:787] [c10d] The client socket has connected to [mortezauoc2]:8000 on [mortezauoc2]:32936.
[I socket.cpp:295] [c10d - debug] The server socket on [::]:8000 has accepted a connection from [mortezauoc2]:32936.
[I ProcessGroupNCCL.cpp:629] [Rank 0] NCCL_BLOCKING_WAIT and NCCL_ASYNC_ERROR_HANDLING|NCCL_DESYNC_DEBUGshould not both be enabled. Only NCCL_BLOCKING_WAIT is being used in this process.
[I ProcessGroupNCCL.cpp:665] [Rank 0] ProcessGroupNCCL initialized with following options:
NCCL_ASYNC_ERROR_HANDLING: 0
NCCL_DESYNC_DEBUG: 0
NCCL_BLOCKING_WAIT: 1
TIMEOUT(ms): 1800000
USE_HIGH_PRIORITY_STREAM: 0
[I ProcessGroupNCCL.cpp:842] [Rank 0] NCCL watchdog thread started!
HELLO
/home/morteza/torch/lib/python3.8/site-packages/torchvision/models/_utils.py:208: UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead.
warnings.warn(
/home/morteza/torch/lib/python3.8/site-packages/torchvision/models/_utils.py:223: UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. The current behavior is equivalent to passing `weights=None`.
warnings.warn(msg)
0
mortezauoc2:243289:243289 [0] NCCL INFO NCCL_SOCKET_IFNAME set by environment to ens3
mortezauoc2:243289:243289 [0] NCCL INFO NCCL_SOCKET_IFNAME set to ens3
mortezauoc2:243289:243289 [0] NCCL INFO Bootstrap : Using ens3:192.168.1.92<0>
mortezauoc2:243289:243289 [0] NCCL INFO NET/Plugin : No plugin found (libnccl-net.so), using internal implementation
mortezauoc2:243289:243289 [0] NCCL INFO cudaDriverVersion 11070
NCCL version 2.14.3+cuda11.7
mortezauoc2:243289:243289 [0] NCCL INFO init.cc:1147 Cuda Host Alloc Size 4 pointer 0x7f83e2000000
mortezauoc2:243289:243312 [0] NCCL INFO Failed to open libibverbs.so[.1]
mortezauoc2:243289:243312 [0] NCCL INFO NCCL_SOCKET_IFNAME set by environment to ens3
mortezauoc2:243289:243312 [0] NCCL INFO NET/Socket : Using [0]ens3:192.168.1.92<0>
mortezauoc2:243289:243312 [0] NCCL INFO Using network Socket
Each of these nodes are VM environments, this maybe have something to do with rootcause of the problem but I don't know how I can investigate that. this is the output of ifconfig:
$ ifconfig
ens3: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1450
inet 192.168.1.92 netmask 255.255.255.0 broadcast 192.168.1.255
inet6 fe80::f816:3eff:fee7:862 prefixlen 64 scopeid 0x20<link>
ether fa:16:3e:e7:08:62 txqueuelen 1000 (Ethernet)
RX packets 18581866 bytes 509072745967 (509.0 GB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 12684239 bytes 503527152043 (503.5 GB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
lo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536
inet 127.0.0.1 netmask 255.0.0.0
inet6 ::1 prefixlen 128 scopeid 0x10<host>
loop txqueuelen 1000 (Local Loopback)
RX packets 1380710 bytes 80791900 (80.7 MB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 1380710 bytes 80791900 (80.7 MB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
Do you know what can be the cause of the problem? or how I can find that out?
I tried to see if it works with 2 nodes, and it sometimes work on two nodes but sometimes don't. This was for the gloo backend.