Distributed training with PyTorch and Azure ML

Created:

Topic: Azure ML + PyTorch: better together

Introduction

Suppose you have a very large PyTorch model, and you’ve already tried many common tricks to speed up training: you optimized your code, you moved training to the cloud and selected a fast GPU VM, you installed software packages that improve training performance (for example, by using the ACPT curated environment on Azure ML). And yet, you still wish your model could train faster. Maybe it’s time to give distributed training a try! Continue reading to learn the simplest way to do distributed training with PyTorch and Azure ML.

Overview of distributed training

Distributed training divides the work of training a model across multiple processes, working in parallel on multiple GPUs and possibly on multiple machines to accelerate progress. This technique can be used for any model, but it’s particularly useful when training very large deep learning models.

There are two main types of distributed training: data parallelism and model parallelism. In data parallelism, the full model is copied to each process and is trained on a portion of the input data. In model parallelism, the model is segmented into separate parts that run concurrently in each process, and each model piece receives the full data as input. In this article I’ll cover data parallelism, since it’s easier to implement, more commonly used, and sufficient for most purposes.

You can find the code that accompanies this article on GitHub. The code for our demo extends an earlier post I wrote about moving training code to the cloud. You can consult that post to understand the PyTorch code that trains Fashion MNIST, and the Azure ML code that trains our base scenario in the cloud. In this article, we’ll focus on the code changes required to add distributed training to the original project. There are two parts to this task: first, we’ll need to change how we launch our training job on Azure ML, and second, we’ll need to alter the PyTorch code that does the training.

Adding distributed training to Azure ML code

Let’s assume that we want to distribute our training across two machines (also called nodes or instances), each with 4 processes, for a total of 8 processes. We start by choosing a machine size with 4 GPUs, such as the “Standard_NC24” size. You can find this code in the job.py file:

    cluster = AmlCompute(
        ...
        type="amlcompute",
        ...
    )

You can learn about choosing your Azure ML machine size in this article. Whichever option you choose, make sure you take note of how many GPUs the machine has — you’ll need to specify the number of processes per instance later.

Since we want to take advantage of the GPU hardware, we need to choose a suitable Azure ML environment by including CUDA. For example:

    environment = Environment(image="mcr.microsoft.com/azureml/" +
                              "openmpi4.1.0-cuda11.1-cudnn8-ubuntu20.04:latest",
                              conda_file=CONDA_PATH)

You can learn more about choosing an Azure ML environment in this article.

We can specify the number of instances and processes per instance when we create the command job:

    job = command(
        ...
        resources=dict(instance_count=2),
        distribution=dict(type="PyTorch", process_count_per_instance=4),
        ...
    )

On your first distributed training run, you may want to set “instance_count” to 1 and “process_count_per_instance” to the number of GPUs your VM supports. You can always increase the number of instances later if needed.

When executing this code, Azure ML will automatically set the following environment variables on each instance:

  • WORLD_SIZE — The number of processes in the current instance.
  • NODE_RANK — The index of the current instance. The first instance has NODE_RANK zero.
  • MASTER_ADDR — The IP address of the first instance.
  • MASTER_PORT — An available port on the first instance.

And it will set the following environment variables on each process:

  • LOCAL_RANK — The index of the current process within its instance.
  • RANK — The global index of the current process (among all processes on all instances).

PyTorch needs these environment variables in order to execute the distributed training code. It’s handy that Azure ML takes care of setting these variables for us.

Adding distributed training to PyTorch code

In order to do distributed training, PyTorch creates a group of processes that communicate with each other. We initialize the group using the torch.distributed.init_process_group() function, specifying two important pieces of information:

  • The backend, which determines how the processes communicate with each other. The methods available to us are “gloo,” “mpi,” and “nccl.” We choose “nccl” because we want distributed GPU training.

  • The initialization method, which determines how we want to initialize information needed during training. This information can be initialized using TCP, a shared file system, or environment variables. We’ll choose environment variable initialization, so that PyTorch will look for the environment variables that Azure ML sets automatically.

The initialization of the process group is done in train.py:

    torch.distributed.init_process_group(backend="nccl", init_method="env://")

For convenience, we’ll create variables for the rank and local rank of the current process. We’ll need them later:

    rank = int(os.environ["RANK"])
    local_rank = int(os.environ["LOCAL_RANK"])

We’ll use the local rank to select the correct GPU when we specify our torch device:

    device = torch.device("cuda", local_rank)

We’ll also use the local rank when creating a DistributedDataParallel object in our train() function. This is the class that actually implements the data parallelism:

from torch import nn
    ...
    model = nn.parallel.DistributedDataParallel(
        module=NeuralNetwork().to(device), device_ids=[local_rank])

DistributedDataParallel copies the model to each process across different instances, and hands each copy a portion of the input data. Then, during the backward pass, it averages the gradients and synchronizes the models across processes.

We’ll use the global rank to ensure that we only save the trained model once, across all processes:

    if rank == 0:
        save_model(model_dir, model)

There’s one more change we need to make. In order to ensure that each process loads its own unique subset of the training data, we pass a DistributedSampler to the DataLoader:

    train_sampler = torch.utils.data.distributed.DistributedSampler(train_data)
    train_loader = DataLoader(train_data,
                              batch_size=batch_size,
                              sampler=train_sampler)

According to the documentation, when using the DistributedSampler we need to set the epoch in the training sampler at the beginning of each epoch, to ensure that shuffling works properly across multiple epochs:

    for epoch in range(epochs):
        ...
        train_sampler.set_epoch(epoch)

And we’re done!

Not that long ago, multi-machine multi-GPU distributed training was a niche technique used only by a few trailblazers. But these days, as tools advance and become more user-friendly, it’s within the reach of all machine learning engineers and data scientists. I hope that you’ll give it a try next time you have a large model to train.

Thank you Ankit Singhal and Hanchi Wang from the Azure ML team for reviewing this post.