PyTorch Parallel Training with DDP

The Basics and a Quick Tutorial

What Is PyTorch Parallel Training?

PyTorch is a popular machine learning framework written in Python. In PyTorch, parallel training allows you to leverage multiple GPUs or computing nodes to speed up the process of training neural networks and other complex machine learning algorithms. This technique is particularly beneficial when dealing with large-scale models and massive datasets that would otherwise take a very long time to process.

PyTorch offers several mechanisms for parallel training:

  • DataParallel: Enables single-machine multi-GPU training with just one line of code.
  • DistributedDataParallel: Enables single-machine multi-GPU training which is faster and more efficient, but requires some setup. It is also possible to use this method to run training across multiple machines, using a launching script provided by PyTorch.
  • RPC-Based Distributed Training (RPC): Suitable for more advanced training structures that require several PyTorch servers, such as distributed pipeline parallelism and parameter server paradigm.

This is part of a series of articles about distributed computing.

In this article:

Simple Parallel Training with PyTorch DataParallel

To run parallel training on multiple GPUs, you can simply use this syntax:

net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
output = net(input_var)  # input_var can be on any device, including CPU

Keep in mind there are several important limitations of DataParallel and the PyTorch documentation recommends using Distributed Data-Parallel, as explained below.

What Is PyTorch Distributed Data-Parallel (DDP)?

PyTorch's Distributed Data-Parallel (DDP) is a module designed for multi-GPU and multi-node training. Unlike DataParallel, which is limited to single-node scenarios, DDP enables parallel computation across multiple nodes, each with multiple GPUs. But even on a single node, it is more efficient than DataParallel.

DDP distributes the model and data across different GPUs and synchronizes the gradients during the backward pass. It uses collective communication algorithms, like all-reduce, to ensure that each node has the updated model parameters after every training step.

DDP offers several advantages over DataParallel:

  • It allows for better utilization of multiple GPUs, leading to faster training times.
  • It minimizes the communication overhead during the backward pass by efficiently managing the gradient synchronization process. This is especially important in multi-node environments where network communication can become a bottleneck.
  • It supports asynchronous gradient reduction, which can help in optimizing the training process by reducing idle time on GPUs.

Implementing DDP requires careful consideration of several aspects:

  • The data should be evenly distributed across all GPUs to prevent any imbalances that could lead to inefficient training. This involves partitioning the dataset and ensuring that each node receives a subset of the data.
  • Developers must be mindful of the network architecture, as DDP relies heavily on inter-node communication. Optimizing the network setup can lead to significant improvements in training speed.
  • DDP requires explicit initialization and cleanup code, which involves setting up the process group and ensuring proper allocation and deallocation of resources across different nodes and GPUs.

Quick Tutorial: Multi-GPU Training in PyTorch with DDP

In this tutorial, we will guide you through the steps to set up Distributed Data-Parallel training in PyTorch.

Step 1: Set Up the Environment

Ensure you have a multi-GPU setup and PyTorch installed with the necessary libraries. Install PyTorch using the following command:

pip install torch torchvision

Next, download and extract the sample MNIST data to the data folder using this command:

wget www.di.ens.fr/~lelarge/MNIST.tar.gz 
tar -zxvf MNIST.tar.gz

Step 2: Initialize the DDP Environment

Start by initializing the DDP environment. This includes setting up the process group and specifying the backend for communication. The init_process_group function is used for this purpose:

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'  # Change this to the master node's IP address if using multiple machines
    os.environ['MASTER_PORT'] = '12355'  # Pick a free port on the master node
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

Step 3: Define the Model and Wrap with DDP

Define your model and wrap it with DistributedDataParallel. This ensures that the model is replicated across all GPUs and gradients are synchronized:

import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP

class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(28*28, 10)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(10, 10)
        self.fc3 = nn.Linear(10, 10)

    def forward(self, x):
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        return x

def create_model():
    return SimpleModel()

Step 4: Partition the Dataset

To ensure an even distribution of data across all GPUs, use torch.utils.data.DistributedSampler:

torch.utils.data.DistributedSampler:

from torch.utils.data import DataLoader, DistributedSampler
from torchvision import datasets, transforms

def create_dataloader(rank, world_size, batch_size=32):
    transform = transforms.Compose([transforms.ToTensor()])
    dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    dataloader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)
    return dataloader

Step 5: Implement the Training Loop

Finally, implement the training loop. Make sure to call dist.barrier() to synchronize all processes and avoid data races:

def train(rank, world_size, epochs=5):
    setup(rank, world_size)
    
    dataloader = create_dataloader(rank, world_size)
    model = create_model().to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(epochs):
        ddp_model.train()
        for batch_idx, (data, target) in enumerate(dataloader):
            data, target = data.to(rank), target.to(rank)
            optimizer.zero_grad()
            output = ddp_model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
        
        if rank == 0:
            print(f"Epoch {epoch} complete")
    
    cleanup()

def main():
    world_size = 4  # Number of GPUs
    mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)

if __name__ == "__main__":
    main()

The output should look like this:

This code initializes the DDP environment, sets up the model and data loader, and runs the training loop on multiple GPUs. The mp.spawn function is used to start multiple processes, each corresponding to a GPU.

Parallel Computing and Run:ai

Run:ai has built Atlas, an AI computing platform, that functions as a foundational layer within the MLOps and AI Infrastructure stack. The automated resource management capabilities allow organizations to properly align the resources across the different MLOps platforms and tools running on top of Run:ai Atlas. Deep integration with the NVIDIA ecosystem through Run:ai's GPU Abstraction technology maximizes the utilization and value of the NVIDIA GPUs in your environment.

Learn more about Run:ai