Horovod Distributed Training

4 Key Features and How to Get Started

What Is Horovod?

Horovod is an open source distributed training framework that enables efficient scaling of machine learning models across multiple GPUs or machines.

Originating from Uber's engineering team, it has quickly gained popularity in the deep learning community for its simplicity and effectiveness in distributed model training. By adding a few lines of Horovod-specific code, developers can convert their single-GPU training scripts into ones harnessing the power of multiple GPUs or hosts.

Horovod operates by wrapping around existing deep learning frameworks (it currently supports TensorFlow, Keras, PyTorch, and MXNet), using MPI (Message Passing Interface) for cross-process communication. It aims to minimize the modifications required to scale deep learning models. Horovod can run on-premises or in AWS, Azure, or Google Cloud.

You can get Horovod from the official GitHub repository.

This is part of a series of articles about deep learning for computer vision.

In this article, you will learn:

Key Features of Horovod

Horovod offers the following features for training ML models.

1. Framework-Agnostic Approach

Horovod can be used with a variety of deep learning frameworks, including TensorFlow, Keras, PyTorch, and Apache MXNet. This flexibility is useful for developers who work across different frameworks and want a unified way to distribute their training processes.

Additionally, being framework agnostic allows Horovod to serve a broader community. Users can apply the same principles and methods of distributed training across different tools without having to learn the ins and outs of each framework's specific parallel processing capabilities.

2. Ring-AllReduce Algorithm

Horovod's efficiency in distributed training partly stems from its use of the Ring-AllReduce algorithm. It enables efficient data averaging across all participating GPUs, ensuring that each one ends up with the same aggregated updates. Ring-AllReduce is particularly effective in reducing the bandwidth needed for communication between nodes, a common bottleneck in distributed training.

The algorithm divides the data to be communicated into chunks, distributing them across a ring of participating GPUs. Each GPU then processes its chunk, passes it to the next GPU in the ring, and receives another chunk to process. This cycle repeats until all GPUs have processed all chunks.

3. GPU-Awareness

The framework optimizes GPU-to-GPU communication, minimizing training times in distributed settings. By leveraging NVIDIA's NCCL library, Horovod achieves high-speed data transfer between GPUs, enhancing the overall performance of distributed training tasks.

Horovod also manages GPU memory effectively. It minimizes memory footprint by dynamically adjusting tensor sizes and managing buffer allocations, allowing for the scaling of deep learning models on high-dimensional datasets without exhausting GPU resources.

4. TensorFlow Integration

TensorFlow users can distribute their training with minimal changes to their existing codebase. Horovod is tightly integrated with TensorFlow, providing hooks and wrappers that integrate directly with TensorFlow's APIs. This simplifies the process of adjusting learning rates and other hyperparameters critical for optimized distributed training.

TensorFlow users also benefit from Horovod's support for advanced features such as gradient aggregation and checkpoint averaging. These features ensure consistent and reliable model performance across multiple GPUs or nodes

Getting Started with Horovod

Requirements

Before installing Horovod, ensure your system meets the following prerequisites:

  • Operating system: GNU Linux or macOS (Windows is not supported)
  • Python version: 3.6 or newer
  • Compiler: g++-5 or above, supporting C++14. For TensorFlow 2.10 or later, a compiler supporting C++17 (like g++-8 or newer) is required.
  • CMake: Version 3.13 or newer
  • Deep learning frameworks: TensorFlow (>=1.15.0), PyTorch (>=1.5.0), or MXNet (>=1.4.1)
  • Optional: MPI for distributed training
  • GPU support: For optimal performance on GPUs, NCCL 2 is recommended.

Installation Steps

First, install the deep learning framework (TensorFlow, PyTorch, MXNet) that you plan to use with Horovod. Ensure it is installed before proceeding with the Horovod installation.

Use the following command to install Horovod with support for the frameworks you are using:

$ pip install horovod[tensorflow,keras,pytorch,mxnet,spark]

For all-framework support, use:

$ pip install horovod[all-frameworks]

To specifically enable or disable framework support, set environment variables accordingly (e.g., HOROVOD_WITH_TENSORFLOW=1 for TensorFlow support).

If Horovod cannot find a suitable version of CMake, it will attempt to download and use a temporary binary. You can specify a CMake binary by setting the HOROVOD_CMAKE environment variable.

Additional Considerations

Other relevant components include:

  • Controllers: Horovod supports MPI and Gloo for coordinating work between processes. MPI is the default, but Gloo can be used without additional dependencies and is necessary for the elastic/fault-tolerant API.
  • Tensor operations on GPU: For the best performance, install Horovod with NCCL support. This requires setting HOROVOD_GPU_OPERATIONS=NCCL during installation.
  • Tensor operations on CPU: By default, CPU operations will use the controller's framework (MPI or Gloo), but you can also use Intel's oneCCL by setting HOROVOD_CPU_OPERATIONS=CCL

Verifying Installation

After installation, you can verify the build and check which features are enabled by running:

$ horovodrun --check-build

This command will list all the features enabled in your Horovod installation. If a required feature is not enabled, you may need to reinstall Horovod with the appropriate environment variables set.

Using Horovod to Scale Pytorch Training with Multiple GPUs

Let’s see what is involved in using Horovod to scale training with multiple GPUs in PyTorch. The code below was shared in the Horovod documentation. For background on how PyTorch training works, see the PyTorch documentation.

When using Horovod with PyTorch, you’ll need to make several modifications to the PyTorch training script:

  1. Run hvd.init().
  2. Each GPU should be pinned to one process. Once you’ve specified a single GPU for each process, you can specify local rank. This setup ensures that the first GPU is allocated to the first process, the second GPU to the second process, and so on.
    if torch.cuda.is_available():
       torch.cuda.set_device(hvd.local_rank())
  3. Determine the learning rate based on the number of workers. The batch size for synchronous distributed training increases as more workers are added. A higher learning rate can handle a larger batch size.
  4. Use hvd.DistributedOptimizer to wrap the original optimizer. This enables the optimizer to compute gradients and creates a gradient average using AllGather or AllReduce.
  5. Send the initial variable states from (rank 0) to the rest of the processes. This ensures that all workers are consistently initialized when you start training with random weights or restore training from a checkpoint:
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)
  6. Edit the code to ensure that checkpoints are only saved on worker 0. This prevents the other workers from corrupting the checkpoints. You can do this by using hvd.rank() != 0 to protect the checkpointing code.

Here is the complete PyTorch code that uses Horovod to achieve distributed training:

Using Horovod to Scale Pytorch Training with Multiple GPUs

Let’s see what is involved in using Horovod to scale training with multiple GPUs in PyTorch. The code below was shared in the Horovod documentation. For background on how PyTorch training works, see the PyTorch documentation.

When using Horovod with PyTorch, you’ll need to make several modifications to the PyTorch training script:

  1. Run hvd.init().
  2. Each GPU should be pinned to one process. Once you’ve specified a single GPU for each process, you can specify local rank. This setup ensures that the first GPU is allocated to the first process, the second GPU to the second process, and so on.
    if torch.cuda.is_available():
       torch.cuda.set_device(hvd.local_rank())
  3. Determine the learning rate based on the number of workers. The batch size for synchronous distributed training increases as more workers are added. A higher learning rate can handle a larger batch size.
  4. Use hvd.DistributedOptimizer to wrap the original optimizer. This enables the optimizer to compute gradients and creates a gradient average using AllGather or AllReduce.
  5. Send the initial variable states from (rank 0) to the rest of the processes. This ensures that all workers are consistently initialized when you start training with random weights or restore training from a checkpoint:
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)
  6. Edit the code to ensure that checkpoints are only saved on worker 0. This prevents the other workers from corrupting the checkpoints. You can do this by using hvd.rank() != 0 to protect the checkpointing code.

Here is the complete PyTorch code that uses Horovod to achieve distributed training:

import torch
import horovod.torch as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
torch.cuda.set_device(hvd.local_rank())

# Define dataset...
train_dataset = ...

# Partition dataset among workers using DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)

# Build model...
model = ...
model.cuda()

optimizer = optim.SGD(model.parameters())

# Add Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

# Broadcast parameters from rank 0 to all other processes.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)

for epoch in range(100):
   for batch_idx, (data, target) in enumerate(train_loader):
       optimizer.zero_grad()
       output = model(data)
       loss = F.nll_loss(output, target)
       loss.backward()
       optimizer.step()
       if batch_idx % args.log_interval == 0:
           print('Train Epoch: {} [{}/{}]\tLoss: {}'.format(
               epoch, batch_idx * len(data), len(train_sampler), loss.item()))

Optimizing Your AI Infrastructure with Run:ai

Run:ai automates resource management and orchestration and reduces cost for the infrastructure used for distributed training. With Run:ai, you can automatically run as many compute intensive experiments as needed.

Here are some of the capabilities you gain when using Run:ai:

  • Advanced visibility—create an efficient pipeline of resource sharing by pooling GPU compute resources.
  • No more bottlenecks—you can set up guaranteed quotas of GPU resources, to avoid bottlenecks and optimize billing.
  • A higher level of control—Run:ai enables you to dynamically change resource allocation, ensuring each job gets the resources it needs at any given time.

Run:ai simplifies machine learning infrastructure pipelines, helping data scientists accelerate their productivity and the quality of their models.

Learn more about the Run:ai GPU virtualization platform.