In this document we demonstrate how H2O can be used with Scala & Spark to run a 'Distriuted Random Forrest' classification.
H2O and the related documentaion for Sparkling Water can be found at http://h2o-release.s3.amazonaws.com/h2o/rel-xia/1/index.html
We are using Jupyter with the BeakerX Scala kernal.
We install the full Sparkling Water package which also includes Spark with the help of Maven
%%classpath add mvn
ai.h2o:sparkling-water-package_2.11:2.3.17
In order to prevent subsequent runtime errors we must take care of the following:
import water.H2O
H2O.ARGS.disable_web = true
System.setProperty("spark.ext.h2o.repl.enabled","false")
System.getProperty("spark.ext.h2o.repl.enabled")
false
Now we are ready to start Spark ...
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Iris NaiveBayes")
.master("local")
.config("spark.ui.enabled", "false")
.getOrCreate()
org.apache.spark.sql.SparkSession@1f1d9b1a
... and we can create the related H2O Context
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(spark)
Sparkling Water Context: * H2O name: sparkling-water-beakerx_local-1541437783196 * cluster size: 1 * list of used nodes: (executorId, host, port) ------------------------ (driver,928b6a866c15,54323) ------------------------ Open H2O Flow in browser: http://:54323 (CMD + click in Mac OSX)
We start by loading the iris.csv directly into a H2OFrame and double check the data types: The variety has been converted to an Enum!
import water.fvec.H2OFrame
import java.net.URL
val h2oFrame = new H2OFrame(new URL("https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv").toURI)
h2oFrame.names
[sepal.length, sepal.width, petal.length, petal.width, variety]
h2oFrame.rename(0,"sepalLength")
h2oFrame.rename(1,"sepalWidth")
h2oFrame.rename(2,"petalLenth")
h2oFrame.rename(3,"petalWidth")
h2oFrame.names
[sepalLength, sepalWidth, petalLenth, petalWidth, variety]
h2oFrame.typesStr
[Numeric, Numeric, Numeric, Numeric, Enum]
var domains = h2oFrame.vec("variety").domain
[Setosa, Versicolor, Virginica]
We can display the data by converting it to a DataFrame:
val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext)
val ds = h2oContext.asDataFrame(h2oFrame)(sqlContext).toDF
ds.show()
+-----------+----------+----------+----------+-------+ |sepalLength|sepalWidth|petalLenth|petalWidth|variety| +-----------+----------+----------+----------+-------+ | 5.1| 3.5| 1.4| 0.2| Setosa| | 4.9| 3.0| 1.4| 0.2| Setosa| | 4.7| 3.2| 1.3| 0.2| Setosa| | 4.6| 3.1| 1.5| 0.2| Setosa| | 5.0| 3.6| 1.4| 0.2| Setosa| | 5.4| 3.9| 1.7| 0.4| Setosa| | 4.6| 3.4| 1.4| 0.3| Setosa| | 5.0| 3.4| 1.5| 0.2| Setosa| | 4.4| 2.9| 1.4| 0.2| Setosa| | 4.9| 3.1| 1.5| 0.1| Setosa| | 5.4| 3.7| 1.5| 0.2| Setosa| | 4.8| 3.4| 1.6| 0.2| Setosa| | 4.8| 3.0| 1.4| 0.1| Setosa| | 4.3| 3.0| 1.1| 0.1| Setosa| | 5.8| 4.0| 1.2| 0.2| Setosa| | 5.7| 4.4| 1.5| 0.4| Setosa| | 5.4| 3.9| 1.3| 0.4| Setosa| | 5.1| 3.5| 1.4| 0.3| Setosa| | 5.7| 3.8| 1.7| 0.3| Setosa| | 5.1| 3.8| 1.5| 0.3| Setosa| +-----------+----------+----------+----------+-------+ only showing top 20 rows
null
Then we split the frame into a training and a test frame. The call to splitFrame is also shuffeling the data!
import water.support.H2OFrameSupport
val keys = Array("train.hex", "test.hex")
val ratios = Array(0.9, 0.1)
val Array(train, test) = H2OFrameSupport.splitFrame(h2oFrame,keys, ratios)
val h2oTraining = h2oContext.asH2OFrame(train)
val h2oTest = h2oContext.asH2OFrame(test)
h2oTraining+"-----------\n"+ h2oTest
Frame key: train.hex cols: 5 rows: 131 chunks: 1 size: 3177 ----------- Frame key: test.hex cols: 5 rows: 19 chunks: 1 size: 1625
We define the result-column and we indicate both the training and the test data so that the system is providing the metrics for both datasets.
We get the trained model by calling trainModel.get
import hex.tree.drf.DRF
import hex.tree.drf.DRFModel
import h2oContext.implicits._
var parameters = new DRFModel.DRFParameters()
parameters._train = h2oTraining
parameters._valid = h2oTest
parameters._response_column = "variety"
var model = new DRF(parameters).trainModel.get
org.apache.spark.h2o.H2OContext$implicits$@5a3d5b41
The toString method of the model provides the evaluation of the Training and Test datasets:
model.toString
Model Metrics Type: Multinomial Description: Metrics reported on Out-Of-Bag training samples model id: DRF_model_1541437780529_1 frame id: train.hex MSE: 0.04673963 RMSE: 0.2161935 logloss: 0.39659226 mean_per_class_error: 0.06150583 hit ratios: [0.9389313, 1.0, 1.0] CM: Confusion Matrix (Row labels: Actual class; Column labels: Predicted class): Setosa Versicolor Virginica Error Rate Setosa 44 0 0 0.0000 0 / 44 Versicolor 0 37 4 0.0976 4 / 41 Virginica 0 4 42 0.0870 4 / 46 Totals 44 41 46 0.0611 8 / 131 Model Metrics Type: Multinomial Description: N/A model id: DRF_model_1541437780529_1 frame id: test.hex MSE: 0.009293742 RMSE: 0.09640405 logloss: 0.059128653 mean_per_class_error: 0.0 hit ratios: [1.0, 1.0, 1.0] CM: Confusion Matrix (Row labels: Actual class; Column labels: Predicted class): Setosa Versicolor Virginica Error Rate Setosa 6 0 0 0.0000 0 / 6 Versicolor 0 9 0 0.0000 0 / 9 Virginica 0 0 4 0.0000 0 / 4 Totals 6 9 4 0.0000 0 / 19 Variable Importances: Variable Relative Importance Scaled Importance Percentage petalLenth 1795.945801 1.000000 0.474394 petalWidth 1578.936157 0.879167 0.417072 sepalLength 327.999115 0.182633 0.086640 sepalWidth 82.884659 0.046151 0.021894 Model Summary: Number of Trees Number of Internal Trees Model Size in Bytes Min. Depth Max. Depth Mean Depth Min. Leaves Max. Leaves Mean Leaves 50 150 19906 1 8 3.62667 2 12 5.91333 Scoring History: Timestamp Duration Number of Trees Training RMSE Training LogLoss Training Classification Error Validation RMSE Validation LogLoss Validation Classification Error 2018-11-05 18:09:55 0.063 sec 0 NaN NaN NaN NaN NaN NaN 2018-11-05 18:09:55 0.316 sec 1 0.16366 0.64152 0.03571 0.22942 1.81783 0.05263 2018-11-05 18:09:55 0.367 sec 2 0.21490 1.31760 0.06250 0.07647 0.02134 0.00000 2018-11-05 18:09:55 0.395 sec 3 0.21937 1.40235 0.06000 0.05735 0.01514 0.00000 2018-11-05 18:09:55 0.416 sec 4 0.21750 1.27528 0.05405 0.04588 0.01174 0.00000 2018-11-05 18:09:55 0.435 sec 5 0.25178 1.52270 0.07627 0.08550 0.03094 0.00000 2018-11-05 18:09:55 0.450 sec 6 0.24614 1.49340 0.06667 0.07328 0.02582 0.00000 2018-11-05 18:09:55 0.471 sec 7 0.22902 0.92432 0.06400 0.08767 0.03961 0.00000 2018-11-05 18:09:55 0.488 sec 8 0.22442 0.90776 0.06299 0.07795 0.03560 0.00000 2018-11-05 18:09:55 0.509 sec 9 0.22097 0.89179 0.06202 0.07976 0.03716 0.00000 --- 2018-11-05 18:09:55 0.912 sec 41 0.21637 0.39776 0.06107 0.09405 0.05657 0.00000 2018-11-05 18:09:55 0.922 sec 42 0.21665 0.39893 0.06107 0.09180 0.05507 0.00000 2018-11-05 18:09:55 0.931 sec 43 0.21674 0.39871 0.06107 0.09459 0.05633 0.00000 2018-11-05 18:09:55 0.968 sec 44 0.21532 0.39679 0.06107 0.09285 0.05539 0.00000 2018-11-05 18:09:55 0.975 sec 45 0.21595 0.39729 0.06107 0.09262 0.05581 0.00000 2018-11-05 18:09:55 0.984 sec 46 0.21673 0.39842 0.06107 0.09600 0.05824 0.00000 2018-11-05 18:09:55 0.992 sec 47 0.21713 0.39887 0.06107 0.09899 0.05947 0.00000 2018-11-05 18:09:55 1.000 sec 48 0.21770 0.39978 0.06107 0.09932 0.06035 0.00000 2018-11-05 18:09:55 1.010 sec 49 0.21687 0.39822 0.06107 0.09737 0.05916 0.00000 2018-11-05 18:09:55 1.018 sec 50 0.21619 0.39659 0.06107 0.09640 0.05913 0.00000
We display the input data. We remove the variety column to make sure that the prediction works by providing only the 4 input columns:
import org.apache.spark.sql.SaveMode
h2oTest.remove("variety")
var predictionDF = h2oContext.asDataFrame(h2oTest)(sqlContext).toDF
predictionDF.write.format("csv").option("header", "true").mode(SaveMode.Overwrite).save("prediction.csv")
predictionDF.show()
+-----------+----------+----------+----------+ |sepalLength|sepalWidth|petalLenth|petalWidth| +-----------+----------+----------+----------+ | 5.0| 3.4| 1.5| 0.2| | 4.9| 3.1| 1.5| 0.1| | 5.7| 4.4| 1.5| 0.4| | 5.4| 3.4| 1.7| 0.2| | 4.8| 3.1| 1.6| 0.2| | 5.0| 3.5| 1.3| 0.3| | 6.9| 3.1| 4.9| 1.5| | 4.9| 2.4| 3.3| 1.0| | 5.0| 2.0| 3.5| 1.0| | 6.2| 2.2| 4.5| 1.5| | 5.6| 2.5| 3.9| 1.1| | 6.6| 3.0| 4.4| 1.4| | 6.8| 2.8| 4.8| 1.4| | 5.5| 2.6| 4.4| 1.2| | 5.6| 2.7| 4.2| 1.3| | 6.5| 3.2| 5.1| 2.0| | 6.1| 3.0| 4.9| 1.8| | 6.7| 3.1| 5.6| 2.4| | 6.2| 3.4| 5.4| 2.3| +-----------+----------+----------+----------+
null
We can execute a prediction by calling the score method on the model.
val predictionResult = model.score(h2oTest)
Frame key: _bbef6b85a84cad502123f5977b4548d1 cols: 4 rows: 19 chunks: 1 size: 1371
Finally we convert the result to a Spark Dataset so that we can display it:
val df1 = h2oContext.asDataFrame(predictionResult)(sqlContext).toDF
df1.show()
+----------+--------------------+-------------------+--------------------+ | predict| Setosa| Versicolor| Virginica| +----------+--------------------+-------------------+--------------------+ | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| | Setosa| 0.997470677599468| 0.0|0.002529322400532...| | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| |Versicolor|0.001875732701114...| 0.7561547479537011| 0.24196951934518435| |Versicolor| 0.0| 0.8972644377915675| 0.10273556220843251| |Versicolor| 0.0| 0.9529729730689217| 0.04702702693107835| |Versicolor|0.001654777154018...| 0.8182873057105341| 0.1800579171354474| |Versicolor|0.001852500303911956| 0.9957189170620183|0.002428582634069...| |Versicolor|0.001852500303911956| 0.9957189170620183|0.002428582634069...| |Versicolor|0.001820664535787...| 0.7633136094726328| 0.23486572599157932| |Versicolor|0.001852500303911956| 0.9957189170620183|0.002428582634069...| |Versicolor|0.001852500303911956| 0.9957189170620183|0.002428582634069...| | Virginica|0.001863209378640...| 0.0| 0.9981367906213595| | Virginica|0.001774360491032...|0.09537187674828396| 0.9028537627606835| | Virginica|0.001863209378640...| 0.0| 0.9981367906213595| | Virginica|0.001725005390582263|0.07417523207136159| 0.9240997625380561| +----------+--------------------+-------------------+--------------------+
null
import water.support.ModelSerializationSupport
ModelSerializationSupport.exportH2OModel(model,"model.bin",true)
var modelLoaded:DRFModel = ModelSerializationSupport.loadH2OModel("model.bin")
modelLoaded.getClass
class hex.tree.drf.DRFModel
val predictionResult = model.score(h2oTest)
val df1 = h2oContext.asDataFrame(predictionResult)(sqlContext).toDF
df1.show()
+----------+--------------------+-------------------+--------------------+ | predict| Setosa| Versicolor| Virginica| +----------+--------------------+-------------------+--------------------+ | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| | Setosa| 0.997470677599468| 0.0|0.002529322400532...| | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| | Setosa| 0.9975669100660882| 0.0|0.002433089933911768| |Versicolor|0.001875732701114...| 0.7561547479537011| 0.24196951934518435| |Versicolor| 0.0| 0.8972644377915675| 0.10273556220843251| |Versicolor| 0.0| 0.9529729730689217| 0.04702702693107835| |Versicolor|0.001654777154018...| 0.8182873057105341| 0.1800579171354474| |Versicolor|0.001852500303911956| 0.9957189170620183|0.002428582634069...| |Versicolor|0.001852500303911956| 0.9957189170620183|0.002428582634069...| |Versicolor|0.001820664535787...| 0.7633136094726328| 0.23486572599157932| |Versicolor|0.001852500303911956| 0.9957189170620183|0.002428582634069...| |Versicolor|0.001852500303911956| 0.9957189170620183|0.002428582634069...| | Virginica|0.001863209378640...| 0.0| 0.9981367906213595| | Virginica|0.001774360491032...|0.09537187674828396| 0.9028537627606835| | Virginica|0.001863209378640...| 0.0| 0.9981367906213595| | Virginica|0.001725005390582263|0.07417523207136159| 0.9240997625380561| +----------+--------------------+-------------------+--------------------+
null
import org.apache.spark.ml.h2o.models._
ModelSerializationSupport.exportMOJOModel(model,"model.mojo",true)
val mojoModelLoaded = H2OMOJOModel.createFromMojo("model.mojo")
mojoModelLoaded.getClass
class org.apache.spark.ml.h2o.models.H2OMOJOModel
import org.apache.spark.sql.functions._
val df = mojoModelLoaded.transform(predictionDF)
.withColumn("probabilities", expr("prediction_output.probabilities"))
.drop("prediction_output")
df.show
+-----------+----------+----------+----------+--------------------+ |sepalLength|sepalWidth|petalLenth|petalWidth| probabilities| +-----------+----------+----------+----------+--------------------+ | 5.0| 3.4| 1.5| 0.2|[0.99756691006608...| | 4.9| 3.1| 1.5| 0.1|[0.99756691006608...| | 5.7| 4.4| 1.5| 0.4|[0.99747067759946...| | 5.4| 3.4| 1.7| 0.2|[0.99756691006608...| | 4.8| 3.1| 1.6| 0.2|[0.99756691006608...| | 5.0| 3.5| 1.3| 0.3|[0.99756691006608...| | 6.9| 3.1| 4.9| 1.5|[0.00187573270111...| | 4.9| 2.4| 3.3| 1.0|[0.0, 0.897264437...| | 5.0| 2.0| 3.5| 1.0|[0.0, 0.952972973...| | 6.2| 2.2| 4.5| 1.5|[0.00165477715401...| | 5.6| 2.5| 3.9| 1.1|[0.00185250030391...| | 6.6| 3.0| 4.4| 1.4|[0.00185250030391...| | 6.8| 2.8| 4.8| 1.4|[0.00182066453578...| | 5.5| 2.6| 4.4| 1.2|[0.00185250030391...| | 5.6| 2.7| 4.2| 1.3|[0.00185250030391...| | 6.5| 3.2| 5.1| 2.0|[0.00186320937864...| | 6.1| 3.0| 4.9| 1.8|[0.00177436049103...| | 6.7| 3.1| 5.6| 2.4|[0.00186320937864...| | 6.2| 3.4| 5.4| 2.3|[0.00172500539058...| +-----------+----------+----------+----------+--------------------+
null
import spark.implicits._
var domains = h2oFrame.vec("variety").domain.toSeq
println(domains)
var domainsDF = spark.sparkContext.parallelize(domains).zipWithIndex.toDF("variety","index")
domainsDF.show
WrappedArray(Setosa, Versicolor, Virginica) +----------+-----+ | variety|index| +----------+-----+ | Setosa| 0| |Versicolor| 1| | Virginica| 2| +----------+-----+
org.apache.spark.sql.SparkSession$implicits$@3c57edc7
def maxIndex: (collection.mutable.WrappedArray[Double] => Int) = { array => (array.indexOf(array.max)) }
spark.udf.register("maxIndex", maxIndex)
var predictionResultDF = df.select(df.col("*"), callUDF("maxIndex", df.col("probabilities")).name("index"))
.join(domainsDF,Seq("index"))
predictionResultDF.show
+-----+-----------+----------+----------+----------+--------------------+----------+ |index|sepalLength|sepalWidth|petalLenth|petalWidth| probabilities| variety| +-----+-----------+----------+----------+----------+--------------------+----------+ | 0| 5.0| 3.4| 1.5| 0.2|[0.99756691006608...| Setosa| | 0| 4.9| 3.1| 1.5| 0.1|[0.99756691006608...| Setosa| | 0| 5.7| 4.4| 1.5| 0.4|[0.99747067759946...| Setosa| | 0| 5.4| 3.4| 1.7| 0.2|[0.99756691006608...| Setosa| | 0| 4.8| 3.1| 1.6| 0.2|[0.99756691006608...| Setosa| | 0| 5.0| 3.5| 1.3| 0.3|[0.99756691006608...| Setosa| | 1| 6.9| 3.1| 4.9| 1.5|[0.00187573270111...|Versicolor| | 1| 4.9| 2.4| 3.3| 1.0|[0.0, 0.897264437...|Versicolor| | 1| 5.0| 2.0| 3.5| 1.0|[0.0, 0.952972973...|Versicolor| | 1| 6.2| 2.2| 4.5| 1.5|[0.00165477715401...|Versicolor| | 1| 5.6| 2.5| 3.9| 1.1|[0.00185250030391...|Versicolor| | 1| 6.6| 3.0| 4.4| 1.4|[0.00185250030391...|Versicolor| | 1| 6.8| 2.8| 4.8| 1.4|[0.00182066453578...|Versicolor| | 1| 5.5| 2.6| 4.4| 1.2|[0.00185250030391...|Versicolor| | 1| 5.6| 2.7| 4.2| 1.3|[0.00185250030391...|Versicolor| | 2| 6.5| 3.2| 5.1| 2.0|[0.00186320937864...| Virginica| | 2| 6.1| 3.0| 4.9| 1.8|[0.00177436049103...| Virginica| | 2| 6.7| 3.1| 5.6| 2.4|[0.00186320937864...| Virginica| | 2| 6.2| 3.4| 5.4| 2.3|[0.00172500539058...| Virginica| +-----+-----------+----------+----------+----------+--------------------+----------+
null
The MOJO model can be deployed to production w/o the need of any access to a running H2O or Spark Instance. All you need to do is to add the following dependency:
Further details can be found in http://docs.h2o.ai/h2o/latest-stable/h2o-docs/productionizing.html
val csv = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("prediction.csv/*.csv")
csv.show
+-----------+----------+----------+----------+ |sepalLength|sepalWidth|petalLenth|petalWidth| +-----------+----------+----------+----------+ | 5.0| 3.4| 1.5| 0.2| | 4.9| 3.1| 1.5| 0.1| | 5.7| 4.4| 1.5| 0.4| | 5.4| 3.4| 1.7| 0.2| | 4.8| 3.1| 1.6| 0.2| | 5.0| 3.5| 1.3| 0.3| | 6.9| 3.1| 4.9| 1.5| | 4.9| 2.4| 3.3| 1.0| | 5.0| 2.0| 3.5| 1.0| | 6.2| 2.2| 4.5| 1.5| | 5.6| 2.5| 3.9| 1.1| | 6.6| 3.0| 4.4| 1.4| | 6.8| 2.8| 4.8| 1.4| | 5.5| 2.6| 4.4| 1.2| | 5.6| 2.7| 4.2| 1.3| | 6.5| 3.2| 5.1| 2.0| | 6.1| 3.0| 4.9| 1.8| | 6.7| 3.1| 5.6| 2.4| | 6.2| 3.4| 5.4| 2.3| +-----------+----------+----------+----------+
null
import _root_.hex.genmodel.GenModel
import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.easy.prediction
import _root_.hex.genmodel.MojoModel
import _root_.hex.genmodel.easy.RowData
import _root_.hex.genmodel.GenModel import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData} import _root_.hex.genmodel.easy.prediction import _root_.hex.genmodel.MojoModel import _root_.hex.genmodel.easy.RowData
def predict(easyModel: EasyPredictModelWrapper, values:Array[Double]):String = {
var row = new RowData()
row.put("sepalLength", values(0).toString)
row.put("sepalWidth", values(1).toString)
row.put("petalLength", values(2).toString)
row.put("petalWidth", values(3).toString)
var p = easyModel.predictMultinomial(row);
p.label
}
var easyModel = new EasyPredictModelWrapper( MojoModel.load("model.mojo"));
predict(easyModel, Array(5.0,3.4,1.4,0.2))
Setosa
csv.collect
.map(line => Array(line.getDouble(0),line.getDouble(1),line.getDouble(2),line.getDouble(3)))
.foreach(a => println(s"${a(0)},${a(1)},${a(2)},${a(3)} => ${predict(easyModel, a)}"))
5.0,3.4,1.5,0.2 => Setosa 4.9,3.1,1.5,0.1 => Setosa 5.7,4.4,1.5,0.4 => Setosa 5.4,3.4,1.7,0.2 => Setosa 4.8,3.1,1.6,0.2 => Setosa 5.0,3.5,1.3,0.3 => Setosa 6.9,3.1,4.9,1.5 => Versicolor 4.9,2.4,3.3,1.0 => Versicolor 5.0,2.0,3.5,1.0 => Versicolor 6.2,2.2,4.5,1.5 => Versicolor 5.6,2.5,3.9,1.1 => Versicolor 6.6,3.0,4.4,1.4 => Versicolor 6.8,2.8,4.8,1.4 => Versicolor 5.5,2.6,4.4,1.2 => Versicolor 5.6,2.7,4.2,1.3 => Versicolor 6.5,3.2,5.1,2.0 => Virginica 6.1,3.0,4.9,1.8 => Virginica 6.7,3.1,5.6,2.4 => Virginica 6.2,3.4,5.4,2.3 => Virginica