Building Machine Learning Pipelines with Kubeflow

SciPy 2020

William Horton, Compass

Introduction

Training a machine learning model can be as simple as:

In [1]:
from sklearn import svm
from sklearn import datasets
clf = svm.SVC()
X, y = datasets.load_iris(return_X_y=True)
clf.fit(X, y)
Out[1]:
SVC()

But most of the time, building a real-world application using machine learning is more complicated. As machine learning engineers, we are often tasked with taking ad-hoc model training code and transforming it into a reliable and repeatable workflow. We have to answer questions like: How do we keep the training data fresh? How do we train models quickly, and make efficient use of compute resources? How do we evaluate the performance of new models?

In this notebook, I will explore the use of machine learning pipelines, and explain how we can use Kubeflow as a tool to tackle these important questions and build end-to-end pipelines to get from raw data to deployed model.

Machine Learning Pipelines

Most machine learning projects share a common set of steps that must be completed in order to get to the desired final state: a trained model, serving predictions in a production environment.

At a high level, the steps of a basic ML pipeline include:

  1. Getting the data
  2. Validating the data
  3. Data pre-processing
  4. Training a model
  5. Evaluating the model
  6. Deploying the model

Together, these tasks form an end-to-end machine learning pipeline.

We can begin to model our machine learning code as a pipeline by first breaking it down into functions that correspond to these steps. Below I show the code for a minimal example of training a regression model to predict a car's fuel efficiency.

1. Getting the data

We pull data from the public UCI repository

In [2]:
import pandas as pd

def get_data():
    df = pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data", 
                 sep="\s+",
                 na_values="?",
                 names=["mpg", "cylinders", "displacement", "horsepower", "weight", "acceleration", "model year", "origin", "car name"])
    return df

2. Validating the data

For this example, we check the change in the mean of a certain feature from some previously-defined state.

In [3]:
PREVIOUS_WEIGHT_MEAN = 2960
TOLERANCE = .05

def data_validation(df):
    change_in_mean = abs(PREVIOUS_WEIGHT_MEAN - df["weight"].mean())
    
    if change_in_mean / PREVIOUS_WEIGHT_MEAN > TOLERANCE:
        raise Exception("error detected in data validation")

3. Data preprocessing

We do some example preprocessing, imputing missing values in a column with the median, and dropping columns we don't want to use for training

In [4]:
import pandas as pd
from sklearn.model_selection import train_test_split

def preprocess(df):
    # fill unknown values in column
    df = df.fillna({"horsepower": df["horsepower"].median()})
    
    # drop unused columns
    df = df.drop(["origin", "car name"], axis="columns")
    
    # split 20% for test set
    return train_test_split(df, test_size=0.2, random_state=42)

4. Training a model

Now we get to the training step, and fit an sklearn LinearRegression model

In [5]:
from sklearn import linear_model

def train(df, target_column):
    X = df.drop(target_column, axis="columns")
    y = df[target_column]
    regr = linear_model.LinearRegression()
    regr.fit(X, y)
    return regr

5. Evaluating the model

Compute the metric of interest. In this case we choose MSE

In [6]:
from sklearn.metrics import mean_squared_error

def evaluate(regr, df, target_column):
    X_test = df.drop(target_column, axis="columns")
    y_test = df[target_column]
    y_pred = regr.predict(X_test)
    return mean_squared_error(y_test, y_pred)

6. Deploying the model

For this example, "deploy" is just saving the model artifact. In a real use case, you would likely push the artifact to object storage, like S3, or trigger a new deployment of your model service.

In [7]:
from joblib import dump

PREVIOUS_MSE = 12

def deploy(regr, mse):
    if mse < PREVIOUS_MSE:
        print("Saving model")
        dump(regr, "model.joblib")
    else:
        print("No improvement, skipping deploy")

So now the whole process looks like:

In [8]:
raw_df = get_data()
train_df, val_df = preprocess(raw_df)
print(train_df.head())
data_validation(raw_df)
regr = train(train_df, "mpg")
mse = evaluate(regr, val_df, "mpg")
print(f"MSE: {mse}")
deploy(regr, mse)
      mpg  cylinders  displacement  horsepower  weight  acceleration  \
3    16.0          8         304.0       150.0  3433.0          12.0   
18   27.0          4          97.0        88.0  2130.0          14.5   
376  37.0          4          91.0        68.0  2025.0          18.2   
248  36.1          4          91.0        60.0  1800.0          16.4   
177  23.0          4         115.0        95.0  2694.0          15.0   

     model year  
3            70  
18           70  
376          82  
248          78  
177          75  
MSE: 9.44006846526339
Saving model

We see that we can execute these functions in our Jupyter Notebook and end up with a trained model.

But in a real-world use-case, additional complexities arise. What if I don't want to run this just once, but instead schedule it to train every day? What if our preprocessing step has to make use of a distributed framework like Spark? What if our training step has to run on a GPU?

We can realize many benefits from moving away from executing these steps manually on a single machine, and instead orchestrate them together as part of a single automated workflow. For that, we will use Kubeflow Pipelines.

Kubeflow and Kubeflow Pipelines

Kubeflow Overview

Kubeflow is a tool for managing machine learning workflows on Kubernetes. The components of the Kubeflow platform are aligned with the broader open-source scientific Python community. Kubeflow allows you to easily deploy Jupyter Notebook servers to start the process of exploring your data, iterating on your models, and sharing results. It supports distributed training for computationally expensive workloads, using common frameworks like PyTorch and Tensorflow. It also allows you to visualize the outputs of training runs using custom Python code, leveraging familiar plotting tools like matplotlib and seaborn.

Deploying Kubeflow

Because Kubeflow runs on top of Kubernetes, it can be deployed on any infrastructure where you are able to run a Kubernetes cluster.

This includes the managed Kubernetes services of major cloud providers like GCP, AWS, and Azure, but also on-premise clusters and even locally using MiniKF.

Kubeflow Pipelines

Kubeflow Pipelines is one component of the Kubeflow platform. It allows you to author machine learning workflows and execute them on Kubernetes. Behind the scenes, it is built on top of Argo, a general tool for Kubernetes workflows.

Main Concepts of Kubeflow Pipelines

A Pipeline is a workflow that you have created and uploaded to Kubeflow Pipelines. A Pipeline is a Directed Acyclic Graph (DAG).

A Pipeline Definition is what defines the logic of the Pipeline. It is written in Python using the Kubeflow Pipelines SDK, which is then used to generate a zipped YAML file. The YAML file is what is actually uploaded to Kubeflow to create the Pipeline.

A Run is an instance of Pipeline. To create a Run, you select a Pipeline and then provide the parameters that the Pipeline requires. A Run must be part of an Experiment.

An Experiment is a way to group Runs. You can view all of the Runs for an Experiment as a list in the UI.

For more detailed information, you can read the Pipelines Overview section of the Kubeflow documentation.

Example Pipeline

The first step to being able to deploy our steps as a Kubeflow Pipeline is to create a Docker container with our code. I have already created a Dockerfile that creates a container with the sample code, you can view it here.

Once we have created a container, we can write a Pipeline Definition that lays out the steps of our pipeline. This Pipeline Definition is written in Python and uses the Kubeflow Pipelines DSL to define the logic of our specific workflow.

Here are the important parts of the DSL we will use in the example Pipeline Definition:

dsl.pipeline: A decorator around the main function where we will define the logic for our pipeline.

dsl.ContainerOp: This is the main unit of a Kubeflow Pipeline, it's an operation that executes a command in a specified container.

dsl.component: This is a decorator used on functions that return ContainerOps.

dsl.VolumeOp: A step that creates a Kubernetes Volume to use in your pipeline

You will also see that we can import from the Kubernetes Python Client library to handle Kubernetes concepts like Tolerations.

The Pipeline Definition for our example might look like:

In [14]:
import kfp
from kfp import dsl

from kubernetes.client import V1Toleration

IMAGE_ID = "test_image_id"

@dsl.component
def get_data_op():
    return dsl.ContainerOp(
        name="Get data",
        image=IMAGE_ID,
        command=['python', 'get_data.py']
    )

@dsl.component
def data_validation_op():
    return dsl.ContainerOp(
        name="Data validation",
        image=IMAGE_ID,
        command=['python', 'data_validation.py']
    )

@dsl.component
def preprocessing_op():
    return dsl.ContainerOp(
        name="Preprocessing",
        image=IMAGE_ID,
        command=['python', 'preprocessing.py']
    )

@dsl.component
def train_op(normalize):
    return dsl.ContainerOp(
        name="Training",
        image=IMAGE_ID,
        command=['python', 'train.py'],
        arguments=[
            "--normalize", normalize,
        ]
    )

@dsl.component
def evaluate_op():
    return dsl.ContainerOp(
        name="Evaluate",
        image=IMAGE_ID,
        command=['python', 'evaluate.py'],
        file_outputs={
            'mlpipeline-metrics': '/mlpipeline-metrics.json',
        },
    )


@dsl.component
def deploy_op():
    return dsl.ContainerOp(
        name="Deploy",
        image=IMAGE_ID,
        command=['python', 'deploy.py'],
    )


@dsl.pipeline(
    name='Example',
    description='Example of Kubeflow Pipeline'
)
def example_pipeline(normalize):
    vop = dsl.VolumeOp(
        name="volume_creation",
        resource_name="example",
        size="20Gi",
        modes=dsl.VOLUME_MODE_RWO,
    )
    
    get_data = get_data_op()
    
    data_validation = data_validation_op()
    
    preprocessing = preprocessing_op()
    
    train = train_op(normalize)
    train.container.set_cpu_request("3")
    train.container.set_memory_request("5G")
    train.add_toleration(V1Toleration(effect="NoSchedule",
                                      key="node-role.kubernetes.io/Computeoptimized",
                                      operator="Equal",
                                      value="true"))
    
    evaluate = evaluate_op()
    
    deploy = deploy_op()
    
    for step in [get_data, data_validation, preprocessing, train, evaluate, deploy]:
        step.add_pvolumes({"/": vop.volume})

Within this example pipeline definition, you can see I'm doing several additional things in addition to just defining the commands to be run in each step:

  1. I'm attaching a Volume to each step, which will be used for passing intermediate data between each step.
  2. I've used Kubernetes resource requests to make sure the training step will have a certain amount CPU and memory.
  3. I've used Kubernetes Tolerations to allow the training step to run on compute-optimized nodes, if necessary.
  4. I've defined file_outputs for the evaluate step so that the metrics will display in the Kubeflow Pipelines UI

These are a few examples of features that become useful as you develop real-world machine learning pipelines

You then use the KFP compiler to generate an output artifact that represents the pipeline. It is a zipped yaml file.

In [15]:
kfp.compiler.Compiler().compile(example_pipeline, 'example_pipeline_definition.yaml.zip')

We can examine the pipeline definition:

In [16]:
!unzip -o example_pipeline_definition.yaml.zip
Archive:  example_pipeline_definition.yaml.zip
  inflating: pipeline.yaml           
In [17]:
!cat pipeline.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: example-
  annotations: {pipelines.kubeflow.org/kfp_sdk_version: 0.5.1, pipelines.kubeflow.org/pipeline_compilation_time: '2020-06-26T17:45:45.642072',
    pipelines.kubeflow.org/pipeline_spec: '{"description": "Example of Kubeflow Pipeline",
      "inputs": [{"name": "normalize"}], "name": "Example"}'}
  labels: {pipelines.kubeflow.org/kfp_sdk_version: 0.5.1}
spec:
  entrypoint: example
  templates:
  - name: data-validation
    container:
      command: [python, data_validation.py]
      image: test_image_id
      volumeMounts:
      - {mountPath: /, name: volume-creation}
    inputs:
      parameters:
      - {name: volume-creation-name}
    metadata:
      labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}
      annotations: {pipelines.kubeflow.org/component_spec: '{"name": "Data validation
          op"}'}
    volumes:
    - name: volume-creation
      persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}
  - name: deploy
    container:
      command: [python, deploy.py]
      image: test_image_id
      volumeMounts:
      - {mountPath: /, name: volume-creation}
    inputs:
      parameters:
      - {name: volume-creation-name}
    metadata:
      labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}
      annotations: {pipelines.kubeflow.org/component_spec: '{"name": "Deploy op"}'}
    volumes:
    - name: volume-creation
      persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}
  - name: evaluate
    container:
      command: [python, evaluate.py]
      image: test_image_id
      volumeMounts:
      - {mountPath: /, name: volume-creation}
    inputs:
      parameters:
      - {name: volume-creation-name}
    outputs:
      artifacts:
      - {name: mlpipeline-metrics, path: /mlpipeline-metrics.json}
    metadata:
      labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}
      annotations: {pipelines.kubeflow.org/component_spec: '{"name": "Evaluate op"}'}
    volumes:
    - name: volume-creation
      persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}
  - name: example
    inputs:
      parameters:
      - {name: normalize}
    dag:
      tasks:
      - name: data-validation
        template: data-validation
        dependencies: [volume-creation]
        arguments:
          parameters:
          - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}
      - name: deploy
        template: deploy
        dependencies: [volume-creation]
        arguments:
          parameters:
          - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}
      - name: evaluate
        template: evaluate
        dependencies: [volume-creation]
        arguments:
          parameters:
          - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}
      - name: get-data
        template: get-data
        dependencies: [volume-creation]
        arguments:
          parameters:
          - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}
      - name: preprocessing
        template: preprocessing
        dependencies: [volume-creation]
        arguments:
          parameters:
          - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}
      - name: training
        template: training
        dependencies: [volume-creation]
        arguments:
          parameters:
          - {name: normalize, value: '{{inputs.parameters.normalize}}'}
          - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}
      - {name: volume-creation, template: volume-creation}
  - name: get-data
    container:
      command: [python, get_data.py]
      image: test_image_id
      volumeMounts:
      - {mountPath: /, name: volume-creation}
    inputs:
      parameters:
      - {name: volume-creation-name}
    metadata:
      labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}
      annotations: {pipelines.kubeflow.org/component_spec: '{"name": "Get data op"}'}
    volumes:
    - name: volume-creation
      persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}
  - name: preprocessing
    container:
      command: [python, preprocessing.py]
      image: test_image_id
      volumeMounts:
      - {mountPath: /, name: volume-creation}
    inputs:
      parameters:
      - {name: volume-creation-name}
    metadata:
      labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}
      annotations: {pipelines.kubeflow.org/component_spec: '{"name": "Preprocessing
          op"}'}
    volumes:
    - name: volume-creation
      persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}
  - name: training
    container:
      args: [--normalize, '{{inputs.parameters.normalize}}']
      command: [python, train.py]
      image: test_image_id
      resources:
        requests: {cpu: '3', memory: 5G}
      volumeMounts:
      - {mountPath: /, name: volume-creation}
    inputs:
      parameters:
      - {name: normalize}
      - {name: volume-creation-name}
    tolerations:
    - effect: NoSchedule
      key: node-role.kubernetes.io/Computeoptimized
      operator: Equal
      value: "true"
    metadata:
      labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}
      annotations: {pipelines.kubeflow.org/component_spec: '{"inputs": [{"name": "normalize"}],
          "name": "Train op"}'}
    volumes:
    - name: volume-creation
      persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}
  - name: volume-creation
    resource:
      action: create
      manifest: |
        apiVersion: v1
        kind: PersistentVolumeClaim
        metadata:
          name: '{{workflow.name}}-example'
        spec:
          accessModes:
          - ReadWriteOnce
          resources:
            requests:
              storage: 20Gi
    outputs:
      parameters:
      - name: volume-creation-manifest
        valueFrom: {jsonPath: '{}'}
      - name: volume-creation-name
        valueFrom: {jsonPath: '{.metadata.name}'}
      - name: volume-creation-size
        valueFrom: {jsonPath: '{.status.capacity.storage}'}
    metadata:
      labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}
  arguments:
    parameters:
    - {name: normalize}
  serviceAccountName: pipeline-runner

If you have a cluster running, you can use the KFP CLI to create your pipeline and kick off a run.

In [ ]:
!kfp pipeline upload -p test example_pipeline_definition.yaml.zip

Then list your pipelines and find the id of the one you just created

In [ ]:
!kfp pipeline list
In [175]:
PIPELINE_ID = "replace_with_pipeline_id"
In [ ]:
!kfp run submit -e Default -r test_run -p {PIPELINE_ID}

Within the Kubeflow Pipelines UI, you can view a visualization of your pipeline.

Real-World Applications

On the AI Services team at Compass, we are building machine learning capabilities that enhance the Compass platform for real estate agents and consumers. Kubeflow gives us a platform to iterate more quickly and effectively on machine learning projects.

Compass runs on AWS, so we have spent time on some work to integrate Kubeflow with our existing infrastructure. For example, Kubeflow can be integrated with kiam to manage specific permissions for different pipelines. We can connect to Redshift and Athena to gather the necessary data for training and inference. There are custom components for processing on EMR that we've started to explore for data processing with Spark.

Our first use case for Kubeflow at Compass was the Listing Editor Autofill project. This involved training models to infer different fields in a listing, to speed up the agent's workflow as they enter new listings. Training was initially done locally on our team members' laptops, but soon encountered friction around the length of queries to get data, amount of memory used in training, and ability to only train one model at a time. Moving to Kubeflow Pipelines alleviated these issues and cut down on the time needed to deliver new models to the product team.

Recently, we have also used it for training and inference of our Likely Seller model, which predicts properties that are likely to sell in the next year. We train a separate model for each city that Compass is in, so training locally, or even on a single instance, can take a long time. Using Kubeflow Pipelines, we were able to achieve better parallelism for the offline scoring job, bringing its running time down from over 4 hours to 26 minutes.

Conclusion

If you're interested in getting more into the details of Kubeflow Pipelines, you can find the official documentation here

For any questions you have for me, please reach out on Twitter (I'm @hortonhearsafoo) or via email at william.horton[at]compass.com.