We've seen how to deploy machine learning pipelines into production with s2i
and now we'll see how we can use these services to make predictions.
First, make sure that the model service you built with source-to-image is running. Your next step, which is absolutely necessary, is to change the DEFAULT_BASE_URL
in the first code cell. If you're running this notebook in OpenShift, you'll want to change pipeline
to the internal service name, or pipeline:8080
to the external route hostname (if you've set one). (If you're running this notebook locally and have built and are running the pipeline as a container image, you can probably use localhost:8080
.)
You can get the internal service name from the OpenShift web console; in our lab, the service name is pipeline
.
DEFAULT_BASE_URL = "http://pipeline:8080/%s"
We'll use the requests
library to interact with the REST service that our s2i
builder created. Although we're running this in a notebook, you can certainly imagine how you'd interact with a similar service from an application using your favorite REST client.
import requests
from urllib.parse import urlencode
import json
def score_text(text, url = None):
url = (url or (DEFAULT_BASE_URL % "predict"))
if type(text) == str:
text = [text]
payload = urlencode({"json_args" : json.dumps(text)})
headers = {'content-type': 'application/x-www-form-urlencoded'}
response = requests.request("POST", url, data=payload, headers=headers)
return json.loads(response.text)
def get_metrics(url = None):
def parse_one_metric(line):
ll = line.rsplit(' ', 1)
return (ll[0], float(ll[1]))
url = (url or (DEFAULT_BASE_URL % "metrics"))
response = requests.request("POST", url)
return dict([parse_one_metric(line) for line in response.text.split('\n') if len(line) > 0 and line[0] != '#'])
The score_text
function we just defined will let us pass in a single document (as a string) or a set of documents (as a list of strings). Let's try it with some very basic "documents."
score_text(["dog food", "It is a truth universally acknowledged"])
Let's try our service with some real documents:
import pandas as pd
import os.path
data = pd.read_parquet(os.path.join("data", "training.parquet"))
sample = data.sample(200)
sample["predictions"] = score_text(sample["text"].values.tolist())
sample
Running our models as services gives us an interesting opportunity to detect data drift by publishing the distribution of our predictions as metrics. If the distribution of predictions shifts over time, we can use that as an indication that the distribution of the data we're evaluating has shifted as well, and that we should re-train our model.
In this example, our pipeline service publishes metrics related to the predictions made by the model (keys beginning with pipeline_predictions_
) as well as metrics related to the computational performance of our pipeline service (keys beginning with pipeline_processing_seconds_
).
get_metrics()
Since our service publishes Prometheus metrics, we can define alerting rules or visualize how our metric values change over time. If you're running the model service in a place where a Prometheus service can scrape the metrics (like OpenShift with the Open Data Hub installed), then you'll be able to add the following query to see the distribution of predictions over time:
sum(pipeline_predictions_total) by (app, value)
We're taking the sum
of these counts because we could have multiple instances of the pipeline
service running, and we're aggregating over the app
label (in this case, pipeline
) and the predicted label (spam
or legitimate
).
To see these metrics in Prometheus, go to the OpenShift console and select Networking -> Routes
and then click on the route for Prometheus. You can also visualize how each prediction count changes by taking the logarithm of each:
ln(sum(pipeline_predictions_total) by (app, value))
Now we'll set up an experiment to simulate data drift. The experiment
function will take a percentage of legitimate
and spam
messages from our training set and score them against our live pipeline service.
✅ Change the distributions in the below cells to simulate data drift.
def experiment(data, size, **kwargs):
for k, v in kwargs.items():
sample = data[data.label == k].sample(int(size * v), replace=True)
score_text(sample["text"].values.tolist())
experiment(data, 20000, legitimate=.05, spam=.95)
experiment(data, 20000, legitimate=.05, spam=.95)
experiment(data, 20000, legitimate=.05, spam=.95)
experiment(data, 20000, legitimate=.25, spam=.75)
experiment(data, 20000, legitimate=1)