Training your PyTorch model using components and pipelines in Azure ML

Created:

Topic: Azure ML + PyTorch: better together

Introduction

In this post, we’ll explore how you can take your PyTorch model training to the next level, using Azure ML. In particular, we’ll see how you can split your training code into multiple steps that can be easily shared and reused by others, and how you can connect those steps into a training graph to be executed in the cloud.

If you’re looking to get started with bringing your PyTorch model to the cloud, make sure you read my post on Training and deploying your PyTorch model in the cloud with Azure ML first.

The project associated with this post can be found on GitHub.

Step 1: Train and test your PyTorch model locally

The first step is to make sure we can train and test our PyTorch model on our development machine. Our scenario for this post uses the Fashion MNIST dataset, and has two steps: a training step where we produce a model, and a test step where we make sure our trained model gives us good predictions. I discussed and showed the PyTorch training code in my previous post, so I won’t go over it again. The project for this post adds a PyTorch test step, which you can find in the test.py file, and also below:

"""Tests the model."""

import argparse
import logging

import mlflow
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

from common import DATA_DIR, MODEL_DIR
from utils_train_nn import evaluate


def load_test_data(data_dir: str, batch_size: int) -> DataLoader[torch.Tensor]:
    """
    Returns a DataLoader object that wraps test data.
    """
    test_data = datasets.FashionMNIST(data_dir,
                                      train=False,
                                      download=True,
                                      transform=ToTensor())
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

    return test_loader


def test(data_dir: str, model_dir: str, device: str) -> None:
    """
    Tests the model on test data.
    """
    batch_size = 64
    loss_fn = nn.CrossEntropyLoss()
    test_dataloader = load_test_data(data_dir, batch_size)
    model = mlflow.pytorch.load_model(model_uri=model_dir)

    (test_loss, test_accuracy) = evaluate(device, test_dataloader, model,
                                          loss_fn)

    mlflow.log_param("test_loss", test_loss)
    mlflow.log_param("test_accuracy", test_accuracy)
    logging.info("Test loss: %f", test_loss)
    logging.info("Test accuracy: %f", test_accuracy)


def main() -> None:
    logging.basicConfig(level=logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument("--data_dir", dest="data_dir", default=DATA_DIR)
    parser.add_argument("--model_dir", dest="model_dir", default=MODEL_DIR)
    args = parser.parse_args()
    logging.info("input parameters: %s", vars(args))

    device = "cuda" if torch.cuda.is_available() else "cpu"

    test(**vars(args), device=device)


if __name__ == "__main__":
    main()

Just like the training code, my test code uses the MLflow open-source framework to log metrics. And because we used MLflow to save the model, we need to do the same to load it.

We now have two executable files that we need to run in sequence: first we execute train.py to produce the model, and then we execute test.py to test it. Our goal is to make sure that the training results in a model that has a satisfactory accuracy when we make predictions on test data. In this case, when you run the test code, you should get an accuracy of somewhere around 85%. Once we’re happy with the results, we’re ready to move our code to the cloud!

Step 2: Train your model in the cloud

Training our model in the cloud using Azure ML requires the creation of a few different cloud entities. Similarly to our previous project, we need to create a compute resource (which specifies the details about the VM), a data asset (which contains a copy of our data), and an environment (which has information about the software packages we want installed). In our previous project, we used a command job to gather the previous resources together with the command used to run our training script, and to execute that script in the cloud. But we can’t do that this time because we don’t have a single script, we have two scripts! That’s where Azure ML components and pipelines come in. We can register each script as a component, and then connect those components into a pipeline job, which is capable of running multiple steps. Let’s take a look at the Azure ML code we need to write to create these resources, which you can find in the pipeline_job.py file:

"""Creates and runs an Azure ML pipeline."""

import logging
from pathlib import Path

from azure.ai.ml import MLClient, Input, Output
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import (AmlCompute, CommandComponent, Data,
                                  Environment, Model)
from azure.identity import DefaultAzureCredential

from common import MODEL_NAME

COMPUTE_NAME = "cluster-cpu"
DATA_NAME = "data-fashion-mnist"
DATA_PATH = Path(Path(__file__).parent.parent, "data")
COMPONENT_TRAIN_NAME = "component_pipeline_sdk_train"
COMPONENT_TEST_NAME = "component_pipeline_sdk_test"
COMPONENT_CODE = Path(Path(__file__).parent.parent, "src")
ENVIRONMENT_NAME = "environment-pipeline-sdk"
CONDA_PATH = Path(Path(__file__).parent, "conda.yml")
EXPERIMENT_NAME = "aml_pipeline_sdk"
DOWNLOADED_MODEL_PATH = Path(Path(__file__).parent.parent)


def main() -> None:
    logging.basicConfig(level=logging.INFO)
    credential = DefaultAzureCredential()
    ml_client = MLClient.from_config(credential=credential)

    # Create the compute cluster.
    cluster_cpu = AmlCompute(
        name=COMPUTE_NAME,
        type="amlcompute",
        size="Standard_DS4_v2",
        location="westus",
        min_instances=0,
        max_instances=4,
    )
    ml_client.begin_create_or_update(cluster_cpu)

    # Create the data set.
    dataset = Data(
        name=DATA_NAME,
        description="Fashion MNIST data set",
        path=DATA_PATH.as_posix(),
        type=AssetTypes.URI_FOLDER,
    )
    registered_dataset = ml_client.data.create_or_update(dataset)

    # Create environment for components. We won't register it.
    environment = Environment(name=ENVIRONMENT_NAME,
                              image="mcr.microsoft.com/azureml/" +
                              "openmpi4.1.0-ubuntu20.04:latest",
                              conda_file=CONDA_PATH)

    # Create the components.
    train_component = CommandComponent(
        name=COMPONENT_TRAIN_NAME,
        inputs=dict(data_dir=Input(type="uri_folder"),),
        outputs=dict(model_dir=Output(type="mlflow_model")),
        environment=environment,
        code=COMPONENT_CODE.as_posix(),
        command="python train.py --data_dir ${{inputs.data_dir}} " +
        "--model_dir ${{outputs.model_dir}}",
    )

    test_component = CommandComponent(
        name=COMPONENT_TEST_NAME,
        inputs=dict(data_dir=Input(type="uri_folder"),
                    model_dir=Input(type="mlflow_model")),
        environment=environment,
        code=COMPONENT_CODE.as_posix(),
        command="python test.py --model_dir ${{inputs.model_dir}}")

    registered_train_component = ml_client.components.create_or_update(
        train_component)

    registered_test_component = ml_client.components.create_or_update(
        test_component)

    # Create and submit pipeline.
    @pipeline(experiment_name=EXPERIMENT_NAME, default_compute=COMPUTE_NAME)
    def pipeline_func(data_dir: Input) -> dict[str, str]:
        train_job = registered_train_component(data_dir=data_dir)
        # Ignoring pylint because "test_job" shows up in the Studio UI.
        test_job = registered_test_component(  # pylint: disable=unused-variable
            data_dir=data_dir,
            model_dir=train_job.outputs.model_dir)

        return {
            "model_dir": train_job.outputs.model_dir,
        }

    pipeline_job = pipeline_func(
        data_dir=Input(type="uri_folder", path=registered_dataset.id))

    pipeline_job = ml_client.jobs.create_or_update(pipeline_job)
    ml_client.jobs.stream(pipeline_job.name)

    # Create the model.
    model_path = f"azureml://jobs/{pipeline_job.name}/outputs/model_dir"
    model = Model(name=MODEL_NAME,
                  path=model_path,
                  type=AssetTypes.MLFLOW_MODEL)
    registered_model = ml_client.models.create_or_update(model)

    # Download the model (this is optional).
    ml_client.models.download(name=MODEL_NAME,
                              download_path=DOWNLOADED_MODEL_PATH,
                              version=registered_model.version)


if __name__ == "__main__":
    main()

As you can see, each component specifies a path to the folder containing the code, inputs and outputs to the script, and a command that runs the script. The train component takes as input a path to the training data and produces a trained model. The test component takes as input the path to the test data and the trained model, and logs loss and accuracy.

The pipeline runs the two components in sequence. In our demo we have a very simple pipeline with just two components, but in more realistic scenarios we typically have a graph of components, some of which have to run sequentially and others which can run in parallel. These components can contain any code you’d like, including pre-processing steps.

Running this script will register the trained model with Azure ML, and it will also download it to your development machine. You can verify that your Azure ML model was registered by going to the Azure ML portal and clicking on “Model” in the left navigation pane. Once you have a trained model registered with Azure ML, you can deploy it by running the endpoint.py file. (If you do so, don’t forget to run the delete_endpoint.py file to delete the endpoint when you’re done and avoid unnecessary charges.)

My favorite aspect of components is that once they’re registered with Azure ML, I can easily reuse them in future projects by simply adding them to another pipeline. I can also share them with my teammates through the use of registries, a feature that the Azure ML team recently announced.

Hopefully I’ve inspired you to take your next step in your journey of moving your PyTorch training to the cloud. Thanks for reading!