This notebook is an example of training a model with Spark, serializing it, uploading it to the data lake and then calling on the model server to load the new version.
#load the iris dataset
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer
# Import standard PySpark Transformers and packages
from pyspark.sql import SparkSession
from sklearn import datasets
import pandas as pd
import numpy as np
#load the spark session. Make sure that the spark master url matches your cluster.
spark = SparkSession.builder\
.master("spark://my-project-my-spark-bdl-spark-master.my-project.svc.cluster.local:7077")\
.config('spark.jars.packages', 'ml.combust.mleap:mleap-spark_2.11:0.13.0')\
.getOrCreate()
#load the iris dataset
iris = datasets.load_iris()
pdDF=pd.DataFrame(np.c_[iris['data'], iris['target']],columns= ['sl','sw','pl','pw','target'])
pdDF.sample(10)
sl | sw | pl | pw | target | |
---|---|---|---|---|---|
132 | 6.4 | 2.8 | 5.6 | 2.2 | 2.0 |
84 | 5.4 | 3.0 | 4.5 | 1.5 | 1.0 |
1 | 4.9 | 3.0 | 1.4 | 0.2 | 0.0 |
5 | 5.4 | 3.9 | 1.7 | 0.4 | 0.0 |
33 | 5.5 | 4.2 | 1.4 | 0.2 | 0.0 |
115 | 6.4 | 3.2 | 5.3 | 2.3 | 2.0 |
100 | 6.3 | 3.3 | 6.0 | 2.5 | 2.0 |
31 | 5.4 | 3.4 | 1.5 | 0.4 | 0.0 |
87 | 6.3 | 2.3 | 4.4 | 1.3 | 1.0 |
14 | 5.8 | 4.0 | 1.2 | 0.2 | 0.0 |
#convert to spark dataframe
df=spark.createDataFrame(pdDF)
#build the classification pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
input_features = ['sl','sw','pl','pw']
feature_assembler = VectorAssembler(inputCols=input_features, outputCol='features')
random_forest_classifier = RandomForestClassifier(featuresCol = 'features', labelCol = 'target')
pipeline = Pipeline(stages=[
feature_assembler,
random_forest_classifier
])
#split the dataset into train and test
splits=df.randomSplit([0.8, 0.2])
train=splits[0]
test=splits[1]
#################
#train the model
#################
model = pipeline.fit(train)
#evaluate the model on the test data
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='target')
predicted=model.transform(test)
evaluator.evaluate(predicted)
0.9497767857142857
#serialize the model
model_version=1.3
model.serializeToBundle("jar:file:/tmp/mleap-bundle-{}.zip".format(model_version), model.transform(df))
Upload the model to the data lake.
%%script env model_version="$model_version" bash
bdl -mkdir -p /models
bdl -copyFromLocal -f /tmp/mleap-bundle-$model_version.zip /models/
rm /tmp/mleap-bundle-$model_version.zip
Apr 12, 2019 2:50:18 PM com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase <clinit> INFO: GHFS version: hadoop2-1.9.7 Apr 12, 2019 2:50:20 PM com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase configure WARNING: No working directory configured, using default: 'gs://project-mleapdemo-my-project/' Apr 12, 2019 2:50:22 PM com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase <clinit> INFO: GHFS version: hadoop2-1.9.7 Apr 12, 2019 2:50:23 PM com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase configure WARNING: No working directory configured, using default: 'gs://project-mleapdemo-my-project/'
Before executing this code you need to first deploy a "model server" application.
#update the inferrence server with the new model
import requests
import json
endpoint="https://zuul.bigstep.cc/api/v1"
#get your api key from the Lentiq application, it is available in your account details.
api_key="<use_your_own>"
headers = {'Authorization': "APIKEY "+api_key}
data_pool_name = "mleapdemo"
project_name = "my-project"
application_name = "server"
url = "{}/applications/{}/{}/{}".format(endpoint,data_pool_name, project_name,application_name)
r=requests.get(url, headers=headers)
app=r.json()
app['configuration']['modelPath'] = model_version
#update model
requests.put(url, headers=headers, json=app['configuration'])
<Response [200]>
#generate a sample json for calling the inference server
obj={
'schema': {
'fields': [{ 'name': feature_name,'type': 'double'} for feature_name in input_features]
},
'rows':test.toPandas()[input_features].sample(10).values.tolist()
}
import json
json.dumps(obj)
#you can use it using:
#put the output in a file (test.json)
#and test the model serving like this: copy paste the model server url from Lentiq's interface
#curl -XPOST -H "accept: application/json" \
# -H "content-type: application/json" \
# -d @test.json \
# 35.202.248.129:65327/transform
'{"schema": {"fields": [{"name": "sl", "type": "double"}, {"name": "sw", "type": "double"}, {"name": "pl", "type": "double"}, {"name": "pw", "type": "double"}]}, "rows": [[6.7, 2.5, 5.8, 1.8], [6.1, 2.9, 4.7, 1.4], [5.7, 2.9, 4.2, 1.3], [5.1, 2.5, 3.0, 1.1], [5.1, 3.5, 1.4, 0.3], [6.0, 2.2, 4.0, 1.0], [7.3, 2.9, 6.3, 1.8], [6.4, 2.7, 5.3, 1.9], [6.5, 3.2, 5.1, 2.0], [7.6, 3.0, 6.6, 2.1]]}'