Orchestrating PyTorch ML Workflows on Vertex AI Pipelines

Previously in the PyTorch on Google Cloud series, Google trained, tuned and deployed a PyTorch text classification model using Training and Prediction services on Vertex AI. In this post, we will show how to automate and monitor a PyTorch based ML workflow by orchestrating the pipeline in a serverless manner using Vertex AI Pipelines. Let’s get started!

Why Pipelines?

Before we dive in, first, let’s understand why pipelines are needed for ML workflows? As seen previously, training and deploying a PyTorch based model encapsulates a sequence of tasks such as processing data, training a model, hyperparameter tuning, evaluation, packaging the model artifacts, model deployment and retraining cycle. Each of these steps have different dependencies and if the entire workflow is treated as a monolith, it can quickly become unwieldy.

Machine Learning Pipelines

As the ML systems and processes begin to scale, you might want to share your ML workflow with others on your team to  execute the workflows or contribute to the code. Without a reliable, reproducible process, this can become difficult. With pipelines, each step in the ML process runs in its own container. This lets you develop steps independently and track the input and output from each step in a reproducible way allowing to iterate experiments effectively. Automating these tasks and orchestrating them across multiple services enables repeatable and reproducible ML workflows that can be shared between different teams such as data scientists, data engineers. 

Pipelines are also a key component to MLOps when formalizing training and deployment operationalization to automatically retrain, deploy and monitor models. For example, triggering a pipeline run when new training data is available, retraining a model when performance of the model starts decaying and more such scenarios.

Orchestrating PyTorch based ML workflows with Vertex AI Pipelines

PyTorch based ML workflows can be orchestrated on Vertex AI Pipelines,  which is a fully managed and serverless way to automate, monitor, and orchestrate a ML workflow on Vertex AI platform. Vertex AI Pipelines is the most effective way to orchestrate, automate and share ML workflows on Google Cloud for the following reasons:

Integration with other Google cloud services: A pipeline step may import data from BigQuery or Cloud Storage or other sources, transform datasets using Cloud Dataflow or Dataproc, train models with Vertex AI, store pipeline artifacts in Cloud Storage, get model evaluation metrics, and deploy models to Vertex AI endpoints. Using the pre-built pipeline components for Vertex AI Pipelines makes it easy to call these steps in a pipeline.


NOTE: You can also orchestrate PyTorch based ML workflows on Google Cloud using open source Kubeflow Pipelines (KFP) which is a core component in the OSS Kubeflow project for building and deploying portable, scalable ML workflows based on Docker containers. The OSS KFP backend runs on a Kubernetes cluster such as Google Kubernetes Engine (GKE). The OSS KFP includes the pre-built PyTorch KFP components SDK for different ML tasks such as data loading, model training, model profiling and many more. Refer to this blog post for exploring further on orchestrating PyTorch based ML workflow on OSS KFP.


In this post, we will define a pipeline using KFP SDK v2 to automate and orchestrate the model training, tuning and deployment workflow of the PyTorch text classification model covered previously.  With KFP SDK v2, component and pipeline authoring is simplified and has first class support for metadata logging and tracking making it easier to track metadata and artifacts produced by the pipelines.

Following is the high level flow to define and submit a pipeline on Vertex AI Pipelines:

High-level flow to define and submit a pipeline
  1. Define pipeline components involved in training and deploying a PyTorch model
  2. Define a pipeline by stitching the components in the workflow including pre-built Google Cloud components and custom components
  3. Compile and submit the pipeline to Vertex AI Pipelines service to run the workflow
  4. Monitor the pipeline and analyze the metrics and artifacts generated

This post builds on the training and serving code from the previous posts. You can find the accompanying code and notebook for this blog post on the GitHub repository.

Concepts of a Pipeline

Let’s look at the terminology and concepts used in KFP SDK v2. If you are familiar with KFP SDK, skip to the next section – “Defining the Pipeline for PyTorch based ML workflow”.

Concepts of a pipeline

Learn more about KFP SDK v2 concepts here.

Defining the Pipeline for PyTorch based ML workflow

Now that pipeline concepts are familiar, let’s look at building a pipeline for the PyTorch based text classification model. The following pipeline schematic shows high level steps involved including input and outputs:

PyTorch training and deployment pipeline schematic

Following are the steps in the pipeline:

There are couple of things to note about the pipeline here:

  1. The pipeline starts with building a training container image, because the text classification model we are working on has data preparation and pre-processing steps in the training code itself. When working with your own datasets, you can include data preparation or pre-processing tasks as a separate component from the model training. 
  2. Building custom training and serving containers can be done either as part of the ML pipeline or within the existing CI/CD pipeline (Continuous Integration/ Continuous Delivery). In this post, we chose to include building custom containers as part of the ML pipeline. In a future post, we will go in depth on CI/CD for ML pipelines and models.

Please refer to the accompanying notebook for the complete definition of pipeline and component spec.

Component specification

With this pipeline schematic, the next step is to define the individual components to perform the steps in the pipeline using KFP SDK v2 component spec. We use a mix of pre-built components from Google Cloud Pipeline Components SDK and custom components in the pipeline. Let’s look into the component spec for one of the steps –  building the custom training container image. Here we are defining a Python function-based component where the component code is defined as a standalone python function. The function accepts Cloud Storage path to the training application code along with project and model display name as input parameters and outputs the Container Registry (GCR) URI to the custom training container. The function runs a Cloud Build job that pulls the training application code and the Dockerfile and builds the custom training image which is pushed to Container Registry (or Artifact Repository). In the previous post, this step was performed in the notebook using docker commands and now this task is automated by self-containing the step within a component and including it in a pipeline.

  @component(
    base_image="gcr.io/google.com/cloudsdktool/cloud-sdk:latest",
    packages_to_install=["google-cloud-build"],
    output_component_file="./pipelines/build_custom_train_image.yaml",
)
def build_custom_train_image(
    project: str,
    gs_train_src_path: str,
    training_image_uri: str
) -> NamedTuple(
    "Outputs", [("training_image_uri", str)]
):
    """ custom pipeline component to build custom training image using 
        Cloud Build and the training application code and dependencies 
        defined in the Dockerfile
    """
    import os

    from google.cloud.devtools import cloudbuild_v1 as cloudbuild
    from google.protobuf.duration_pb2 import Duration

    # initialize client for cloud build
    build_client = cloudbuild.services.cloud_build.CloudBuildClient()

    # parse step inputs to get path to Dockerfile and training application code
    gs_dockerfile_path = os.path.join(gs_train_src_path, 'Dockerfile')
    gs_train_src_path = os.path.join(gs_train_src_path, 'trainer/')

    # define build steps to pull the training code and Dockerfile 
    # and build/push the custom training container image
    build = cloudbuild.Build()
    build.steps = [
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", "-r", gs_train_src_path, "."],
        },
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", gs_dockerfile_path, "Dockerfile"]
        },
        # enabling Kaniko cache in a Docker build that caches intermediate 
        # layers and pushes image automatically to Container Registry
        # https://cloud.google.com/build/docs/kaniko-cache
        {
            "name": 'gcr.io/kaniko-project/executor:latest',
            "args": [ f'--destination={training_image_uri}', '--cache=true' ]
        }
    ]
    # override default timeout of 10min
    timeout = Duration()
    timeout.seconds = 7200
    build.timeout = timeout
    
    # create build
    operation = build_client.create_build(project_id=project, build=build)
    result = operation.result()
    
    # return step outputs
    return (training_image_uri,)

There are a few things to notice about the component spec:

The inputs and outputs in the standalone function above are defined as Parameters, which are simple data types representing values. Inputs and outputs can be Artifacts representing any files or objects generated during the component execution. These arguments are annotated as an kfp.dsl.Input or kfp.dsl.Output artifact. For example, the component specification for creating Model Archive file refers to the model artifacts generated in the training job as the input – Input[Model] in the following  snippet: 

  @component(
    base_image="python:3.9",
    packages_to_install=["torch-model-archiver"],
    output_component_file="./pipelines/generate_mar_file.yaml")
def generate_mar_file( 
    model_display_name: str,
    model_version: str,
    handler: str,
    model: Input[Model],
    model_mar: Output[Model]
) -> NamedTuple(
    "Outputs", [
        ("mar_env_var", list), 
        ("mar_export_uri", str)
    ]
):
    ...

Refer here for the artifact types in KFP v2 SDK and here for the artifact types for Google Cloud Pipeline Components

For component specs of other steps in the pipeline, refer to the accompanying notebook.

Pipeline definition

After defining the components, the next step is to build the pipeline definition describing how input and output parameters and artifacts are passed between the steps. The following code snippet shows the components chained together:

  @dsl.pipeline(
    name=cfg.PIPELINE_NAME,
    # GCS path to set the root directory for storing input/output
    pipeline_root=cfg.PIPELINE_ROOT,
)
def pytorch_text_classifier_pipeline(
    pipeline_job_id: str,
    gs_train_script_path: str,
    gs_serving_dependencies_path: str,
    eval_acc_threshold: float,
    is_hp_tuning_enabled: str = 'n'
):
    # build custom training container image
    build_custom_train_image_task = build_custom_train_image(
        project=cfg.PROJECT_ID,
        gs_train_src_path=gs_train_script_path,
        training_image_uri=cfg.TRAIN_IMAGE_URI
    ).set_display_name("Build custom training image")

    # model training
    #   define training code arguments
    training_args = ["--num-epochs", "2", "--model-name", cfg.MODEL_NAME]
    #   define job name
    JOB_NAME = f"{cfg.MODEL_NAME}-train-pytorch-cstm-cntr-{TIMESTAMP}"
    GCS_BASE_OUTPUT_DIR = f"{cfg.GCS_STAGING}/{TIMESTAMP}"
    #   define worker pool specs
    worker_pool_specs = [{
        "machine_spec": {
                "machine_type": cfg.MACHINE_TYPE,
                "accelerator_type": cfg.ACCELERATOR_TYPE,
                "accelerator_count": cfg.ACCELERATOR_COUNT
            },
            "replica_count": cfg.REPLICA_COUNT,
            "container_spec": {
                "image_uri": cfg.TRAIN_IMAGE_URI,
                "args": training_args
            }
        }]
    run_train_task = aip_exp.custom_job.CustomTrainingJobOp(
        project=cfg.PROJECT_ID,
        location=cfg.REGION,
        display_name=JOB_NAME,
        base_output_directory=GCS_BASE_OUTPUT_DIR,
        worker_pool_specs=worker_pool_specs
    ).set_display_name("Run custom training job").after(build_custom_train_image_task)

    # get training job details
    training_job_details_task = get_training_job_details(
        project=cfg.PROJECT_ID,
        location=cfg.REGION,
        job_resource=run_train_task.output,
        eval_metric_key="eval_accuracy",
        model_display_name=cfg.MODEL_NAME
    ).set_display_name("Get custom training job details")

    # model deployment when condition is met
    with dsl.Condition(
        training_job_details_task.outputs["eval_metric"] > eval_acc_threshold, 
        name="model-deploy-decision"
    ):
        # create model archive file
        create_mar_task =  generate_mar_file(
            model_display_name=cfg.MODEL_NAME,
            model_version=cfg.VERSION,
            handler=gs_serving_dependencies_path,
            model=training_job_details_task.outputs["model"]
        ).set_display_name("Create MAR file")
        
        # build custom serving container running TorchServe
        build_custom_serving_image_task = build_custom_serving_image(
            project=cfg.PROJECT_ID,
            gs_serving_dependencies_path=gs_serving_dependencies_path,
            serving_image_uri=cfg.SERVE_IMAGE_URI
        ).set_display_name("Build custom serving image")

        # create model resource by uploading model to vertex ai
        model_upload_task = aip_components.ModelUploadOp(
            project=cfg.PROJECT_ID, 
            display_name=cfg.MODEL_DISPLAY_NAME,
            serving_container_image_uri=cfg.SERVE_IMAGE_URI,
            serving_container_predict_route=cfg.SERVING_PREDICT_ROUTE,
            serving_container_health_route=cfg.SERVING_HEALTH_ROUTE,
            serving_container_ports=cfg.SERVING_CONTAINER_PORT,
            serving_container_environment_variables=create_mar_task.outputs["mar_env_var"],
            artifact_uri=create_mar_task.outputs["mar_export_uri"]
        ).set_display_name("Upload model").after(build_custom_serving_image_task)

        # create Vertex AI Endpoint
        endpoint_create_task = aip_components.EndpointCreateOp(
            project=cfg.PROJECT_ID,
            display_name=cfg.MODEL_NAME + "-endpoint",
        ).set_display_name("Create endpoint").after(create_mar_task)

        # deploy model to Vertex AI Endpoint
        model_deploy_task = aip_components.ModelDeployOp(
            endpoint=endpoint_create_task.outputs["endpoint"],
            model=model_upload_task.outputs["model"],
            deployed_model_display_name=cfg.MODEL_NAME,
            dedicated_resources_machine_type=cfg.SERVING_MACHINE_TYPE,
            dedicated_resources_min_replica_count=cfg.SERVING_MIN_REPLICA_COUNT,
            dedicated_resources_max_replica_count=cfg.SERVING_MAX_REPLICA_COUNT,
            traffic_split=cfg.SERVING_TRAFFIC_SPLIT
        ).set_display_name("Deploy model to endpoint")

        # test model deployment
        test_instances = [
            "Jaw dropping visual affects and action! One of the best I have seen to date.",
            "Take away the CGI and the A-list cast and you end up with film with less punch.",
        ]
        predict_test_instances_task = make_prediction_request(
            project=cfg.PROJECT_ID,
            bucket=cfg.BUCKET,
            endpoint=model_deploy_task.outputs["gcp_resources"],
            instances=test_instances
        ).set_display_name("Test model deployment making online predictions")

Let’s unpack this code and understand a few things:

To learn more about building pipelines, refer to the building Kubeflow pipelines section, and follow the samples and tutorials.

Compiling and submitting the Pipeline

Pipeline must be compiled for executing on Vertex AI Pipeline services. When a pipeline is compiled, the KFP SDK analyzes the data dependencies between the components to create a directed acyclic graph. The compiled pipeline is in JSON format with all information required to run the pipeline.

  from kfp.v2 import compiler

PIPELINE_JSON_SPEC_PATH = './pipelines/pytorch_text_classifier_pipeline_spec.json'
compiler.Compiler().compile(
    pipeline_func=pytorch_text_classifier_pipeline,
    package_path=PIPELINE_JSON_SPEC_PATH)

Pipeline is submitted to Vertex AI Pipelines by defining a PipelineJob using Vertex AI SDK for Python client, passing necessary pipeline inputs.

  import google.cloud.aiplatform as aiplatorm

aiplatform.init(project=PROJECT_ID, location=REGION)

# define pipeline inputs
PIPELINE_JOB_ID = f"pipeline-{APP_NAME}-{get_timestamp()}"
TRAIN_APP_CODE_PATH = f"{BUCKET_NAME}/{APP_NAME}/train/"
SERVE_DEPENDENCIES_PATH = f"{BUCKET_NAME}/{APP_NAME}/serve/"
pipeline_params = {
    "pipeline_job_id": PIPELINE_JOB_ID,
    "gs_train_script_path": TRAIN_APP_CODE_PATH,
    "gs_serving_dependencies_path": SERVE_DEPENDENCIES_PATH,
    "eval_acc_threshold": 0.87,
    "is_hp_tuning_enabled": "n"
}

# configure pipeline run
pipeline_job = aiplatform.PipelineJob(
    display_name=PIPELINE_NAME,
    job_id=PIPELINE_JOB_ID,
    template_path=PIPELINE_JSON_SPEC_PATH,
    pipeline_root=PIPELINE_ROOT,
    parameter_values=pipeline_params,
    enable_caching=True
)

# run the pipeline on Vertex AI Pipelines
pipeline_job.submit()

When the pipeline is submitted, the logs show a link to view the pipeline run on Google Cloud Console or access the run by opening Pipelines dashboard on Vertex AI

Accessing Pipeline dashboard

Here is the runtime graph of the pipeline for the PyTorch text classification model:

Pipeline runtime graph

A pipeline execution can be scheduled to run at a specific frequency using Cloud Scheduler or triggered based on an event.

You can view the compiled JSON from the Pipeline Run summary tab on the Vertex AI Pipelines dashboard, which can be useful for debugging.

Compiled pipeline proto

Monitoring the Pipeline

The pipeline run page shows the run summary as well details about individual steps including step inputs and outputs generated such as Model, Artifacts, Metrics, Visualizations. 

Pipeline artifacts

Vertex AI Pipelines automatically tracks pipeline execution information in Vertex ML Metadata including metadata and artifacts thereby enabling comparison across pipeline runs and the lineage of ML artifacts.

Pipeline lineage

You can compare across pipeline runs such as inputs, parameters, metrics and visualizations from the Vertex AI Pipelines dashboard. You can also use aiplatform.get_pipeline_df() method from the Vertex AI SDK to fetch pipeline execution metadata for a pipeline as a Pandas dataframe.

  df_pipeline_runs = aiplatform.get_pipeline_df(pipeline=PIPELINE_NAME.replace("_", "-"))
Pipeline execution metadata as Pandas dataframe

Cleaning up resources

After you are done experimenting, you can either stop or delete the Notebooks instance. If you want to save your work, you can choose to stop the instance. When you stop an instance, you are charged only for the persistent disk storage.

To clean up all Google Cloud resources created in this post, you can delete the individual resources created:

Follow the Cleaning Up section in the Jupyter Notebook to delete the individual resources.

What’s next?

This post continues from the training and deploying of the PyTorch based text classification model on Vertex AI  and shows how to automate a PyTorch based ML workflow using Vertex AI Pipelines and Kubeflow Pipelines v2 SDK. As the next steps, you can work through this pipeline example on Vertex AI or perhaps orchestrate one of your own PyTorch models.

References

Need help to get started?

Related posts

Reduce scaling costs by up to 50% in Cloud Spanner with doubled provisioned storage

by Cloud Ace Indonesia
2 years ago

Optimizing your sustainability efforts for resilience

by Cloud Ace Indonesia
9 months ago

Supporting generative AI development with our data cloud partners

by Cloud Ace Indonesia
5 months ago