Op Iris Sample

Derived from https://github.com/salesforce/TransmogrifAI/tree/master/helloworld/notebooks

The following code illustrates how TransmogrifAI can be used to do classify multiple classes over the Iris dataset.

First we need to load the transmogrifai and Spark Mllib jars.

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.3.3`
import $ivy.`org.apache.spark::spark-mllib:2.3.3`
import $ivy.`sh.almond::almond-spark:0.5.0`
import $ivy.`com.salesforce.transmogrifai::transmogrifai-core:0.5.1`
Out[1]:
import $ivy.$                                  

import $ivy.$                                    

import $ivy.$                              

import $ivy.$                                                       

We also want avoid too extensive logging and long outputs in our notebook.

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("breeze").setLevel(Level.WARN)
Logger.getLogger("com.salesforce.op").setLevel(Level.WARN)

repl.pprinter() = repl.pprinter().copy(defaultHeight = 15)
Out[2]:
import org.apache.log4j.{Level, Logger}

Define features

Let us first define the case Class which describes the schema for the data.

For now, we also need a few workarounds here for issues caused by the class wrapping required for serialization.

In [3]:
// Needed for now for case classes defined within Ammonite. Won't be necessary in future versions of Spark.
// See https://github.com/alexarchambault/ammonite-spark/issues/19 and https://github.com/apache/spark/pull/23607
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
case class Iris(
  sepalLength: Double,
  sepalWidth: Double,
  petalLength: Double,
  petalWidth: Double,
  irisClass: String
)
// Required to make sure the String representation of the case class doesn't change in later cells.
implicit val irisTypeTag = scala.reflect.runtime.universe.weakTypeTag[Iris]
Out[3]:
defined class Iris
irisTypeTag: reflect.runtime.package.universe.WeakTypeTag[Iris] = TypeTag[Helper.this.Iris]

Feature Engineering

We then define the set of raw features that we would like to extract from the data. The raw features are defined using FeatureBuilders, and are strongly typed. TransmogrifAI supports the following basic feature types: Text, Numeric, Vector, List , Set, Map. In addition it supports many specific feature types which extend these base types: Email extends Text; Integral, Real and Binary extend Numeric; Currency and Percentage extend Real. For a complete view of the types supported see the Type Hierarchy and Automatic Feature Engineering section in the Documentation.

Basic FeatureBuilders will be created for you if you use the TransmogrifAI CLI to bootstrap your project as described here. However, it is often useful to edit this code to customize feature generation and take full advantage of the Feature types available (selecting the appropriate type will improve automatic feature engineering steps).

When defining raw features, specify the extract logic to be applied to the raw data, and also annotate the features as either predictor or response variables via the FeatureBuilders:

In [4]:
import com.salesforce.op.features.FeatureBuilder
import com.salesforce.op.features.types._

val sepalLength = FeatureBuilder.Real[Iris].extract(_.sepalLength.toReal).asPredictor
val sepalWidth = FeatureBuilder.Real[Iris].extract(_.sepalWidth.toReal).asPredictor
val petalLength = FeatureBuilder.Real[Iris].extract(_.petalLength.toReal).asPredictor
val petalWidth = FeatureBuilder.Real[Iris].extract(_.petalWidth.toReal).asPredictor
val irisClass = FeatureBuilder.Text[Iris].extract(_.irisClass.toText).asResponse
Out[4]:
import com.salesforce.op.features.FeatureBuilder

import com.salesforce.op.features.types._


sepalLength: com.salesforce.op.features.Feature[Real] = Feature(
  "sepalLength",
  false,
  FeatureGeneratorStage_000000000001,
  List(),
  "Real_000000000001",
  List()
)
sepalWidth: com.salesforce.op.features.Feature[Real] = Feature(
  "sepalWidth",
  false,
  FeatureGeneratorStage_000000000002,
  List(),
  "Real_000000000002",
  List()
)
petalLength: com.salesforce.op.features.Feature[Real] = Feature(
  "petalLength",
  false,
  FeatureGeneratorStage_000000000003,
  List(),
  "Real_000000000003",
  List()
)
petalWidth: com.salesforce.op.features.Feature[Real] = Feature(
  "petalWidth",
  false,
  FeatureGeneratorStage_000000000004,
  List(),
  "Real_000000000004",
  List()
)
irisClass: com.salesforce.op.features.Feature[Text] = Feature(
  "irisClass",
  true,
  FeatureGeneratorStage_000000000005,
  List(),
  "Text_000000000005",
  List()
)
In [5]:
import org.apache.spark.sql._

implicit val spark = {
  NotebookSparkSession.builder()
    .progress(false)
    .master("local[*]")
    .getOrCreate()
}
Loading spark-stubs
Getting spark JARs
log4j:WARN No appenders could be found for logger (org.eclipse.jetty.util.log).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Creating SparkSession
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/04/04 22:59:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[5]:
import org.apache.spark.sql._


spark: SparkSession = [email protected]
In [6]:
import com.salesforce.op._
import com.salesforce.op.evaluators.Evaluators
import com.salesforce.op.readers.DataReaders
import com.salesforce.op.stages.impl.classification.MultiClassificationModelSelector
import com.salesforce.op.stages.impl.tuning.DataCutter
import org.apache.spark.sql.Encoders
Out[6]:
import com.salesforce.op._

import com.salesforce.op.evaluators.Evaluators

import com.salesforce.op.readers.DataReaders

import com.salesforce.op.stages.impl.classification.MultiClassificationModelSelector

import com.salesforce.op.stages.impl.tuning.DataCutter

import org.apache.spark.sql.Encoders

Next step is to encode the case class using org.apache.spark.sql.Encoders

In [7]:
implicit val irisEncoder = Encoders.product[Iris]
Out[7]:
irisEncoder: Encoder[Iris] = ExpressionEncoder(
  StructType(
    StructField("sepalLength", DoubleType, false, {}),
    StructField("sepalWidth", DoubleType, false, {}),
    StructField("petalLength", DoubleType, false, {}),
    StructField("petalWidth", DoubleType, false, {}),
    StructField("irisClass", StringType, true, {})
  ),
  false,
  List(
    Alias(
      Invoke(
        AssertNotNull(
          AssertNotNull(
...

Create a DataRead which will load csv and map to schema of type Iris

In [8]:
val irisReader = DataReaders.Simple.csvCase[Iris]()
Out[8]:
irisReader: readers.CSVProductReader[Iris] = [email protected]

Feature Engineering

See Creating Shortcuts for Transformers and Estimators for more documentation on how shortcuts for stages can be created. We now define a Feature of type Vector, that is a vector representation of all the features we would like to use as predictors in our workflow.

In [9]:
val labels = irisClass.indexed()
val features = Seq(sepalLength, sepalWidth, petalLength, petalWidth).transmogrify()
Out[9]:
labels: com.salesforce.op.features.FeatureLike[RealNN] = Feature(
  "irisClass_1-stagesApplied_RealNN_000000000006",
  true,
  OpStringIndexerNoFilter_000000000006,
  WrappedArray(
    Feature(
      "irisClass",
      true,
      FeatureGeneratorStage_000000000005,
      List(),
      "Text_000000000005",
      List()
    )
  ),
...
features: com.salesforce.op.features.FeatureLike[OPVector] = Feature(
  "petalLength-petalWidth-sepalLength-sepalWidth_2-stagesApplied_OPVector_000000000008",
  false,
  VectorsCombiner_000000000008,
  WrappedArray(
    Feature(
      "petalLength-petalWidth-sepalLength-sepalWidth_1-stagesApplied_OPVector_000000000007",
      false,
      RealVectorizer_000000000007,
      WrappedArray(
        Feature(
          "sepalLength",
...

Next step is to create a DataCutter : Creates instance that will split data into training and test set filtering out any labels that don't meet the minimum fraction cutoff or fall in the top N labels specified

In [10]:
val randomSeed = 42L
val cutter = DataCutter(reserveTestFraction = 0.2, seed = randomSeed)
Out[10]:
randomSeed: Long = 42L
cutter: DataCutter = DataCutter_000000000009

Create a MultiClassModelSelector and specify splitter created above. Then set the input - labels and features.

In [11]:
val prediction = MultiClassificationModelSelector
    .withCrossValidation(splitter = Option(cutter), seed = randomSeed)
    .setInput(labels, features).getOutput()
19/04/04 23:00:03 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
19/04/04 23:00:03 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
Out[11]:
prediction: com.salesforce.op.features.FeatureLike[Prediction] = Feature(
  "irisClass-petalLength-petalWidth-sepalLength-sepalWidth_4-stagesApplied_Prediction_000000000011",
  true,
  ModelSelector_000000000011,
  WrappedArray(
    Feature(
      "irisClass_1-stagesApplied_RealNN_000000000006",
      true,
      OpStringIndexerNoFilter_000000000006,
      WrappedArray(
        Feature(
          "irisClass",
          true,
...
In [12]:
val evaluator = Evaluators.MultiClassification.f1().setLabelCol(labels).setPredictionCol(prediction)
Out[12]:
evaluator: evaluators.OpMultiClassificationEvaluator = OpMultiClassificationEvaluator_000000000012

Everything we’ve done so far has been purely at the level of definitions. We have defined how we would like to extract our raw features from data of type Iris, and we have defined how we would like to manipulate them. In order to actually manifest the data described by these features, we need to add them to a workflow and attach a data source to the workflow.

Please note the trainFilePath is the derived path from folder where host folder is mounted as a volume (/home/beakerx/helloworld) in this case. This can be changed as well depending on the location and volume director you are mounting the data from. You can also create a new DataReader with a new path.

In [13]:
import com.salesforce.op.readers.DataReaders

val trainFilePath = "datasets/IrisDataset/iris.data"
// Define a way to read data into our Passenger class from our CSV file
val trainDataReader = DataReaders.Simple.csvCase[Iris](
  path = Option(trainFilePath)
  //key = _.id.toString
)
Out[13]:
import com.salesforce.op.readers.DataReaders


trainFilePath: String = "datasets/IrisDataset/iris.data"
trainDataReader: readers.CSVProductReader[Iris] = [email protected]

Workflow for TransmogrifAI. Takes the final features that the user wants to generate as inputs and constructs the full DAG needed to generate them from those features lineage. Then fits any estimators in the pipeline dag to create a sequence of transformations that are saved in a workflow model.

In [14]:
val workflow = new OpWorkflow().setResultFeatures(prediction, labels).setReader(trainDataReader)
Out[14]:
workflow: OpWorkflow = [email protected]

When we now call ‘train’ on this workflow, it automatically computes and executes the entire DAG of Stages needed to compute the features fitting all the estimators on the training data in the process. Calling score on the fitted workflow then transforms the underlying training data to produce a DataFrame with the all the features manifested. The score method can optionally be passed an evaluator that produces metrics. workflow.train() methods fits all of the estimators in the pipeline and return a pipeline model of only transformers. Uses data loaded as specified by the data reader to generate the initial data set.

In [15]:
val fittedWorkflow = workflow.train()
println("Summary:\n" + fittedWorkflow.summaryPretty())
Summary:
Evaluated OpRandomForestClassifier, OpLogisticRegression models using Cross Validation and error metric.
Evaluated 18 OpRandomForestClassifier models with error metric between [0.06780584574255233, 0.7332635181732267].
Evaluated 8 OpLogisticRegression models with error metric between [0.057780559032123494, 0.2260946931964275].
+--------------------------------------------------------+
|         Selected Model - OpLogisticRegression          |
+--------------------------------------------------------+
| Model Param      | Value                               |
+------------------+-------------------------------------+
| aggregationDepth | 2                                   |
| elasticNetParam  | 0.5                                 |
| family           | auto                                |
| fitIntercept     | true                                |
| maxIter          | 50                                  |
| modelType        | OpLogisticRegression                |
| name             | OpLogisticRegression_00000000000c_3 |
| regParam         | 0.01                                |
| standardization  | true                                |
| tol              | 1.0E-6                              |
| uid              | OpLogisticRegression_00000000000c   |
+------------------+-------------------------------------+
+-------------------------------------------------------+
|               Model Evaluation Metrics                |
+-------------------------------------------------------+
| Metric Name | Training Set Value | Hold Out Set Value |
+-------------+--------------------+--------------------+
| error       | 0.0461538461538461 | 0.0                |
| f1          | 0.9542123135981102 | 1.0                |
| precision   | 0.9545787545787545 | 1.0                |
| recall      | 0.9538461538461539 | 1.0                |
+-------------+--------------------+--------------------+
+-----------------------------------------------------------+
|                    Top Model Insights                     |
+-----------------------------------------------------------+
| Top Positive Correlations       |       Correlation Value |
+---------------------------------+-------------------------+
| sepalWidth(sepalWidth = null)   | -1.7976931348623157E308 |
| sepalWidth                      | -1.7976931348623157E308 |
| sepalLength(sepalLength = null) | -1.7976931348623157E308 |
| sepalLength                     | -1.7976931348623157E308 |
| petalLength(petalLength = null) | -1.7976931348623157E308 |
| petalLength                     | -1.7976931348623157E308 |
| petalWidth(petalWidth = null)   | -1.7976931348623157E308 |
| petalWidth                      | -1.7976931348623157E308 |
+---------------------------------+-------------------------+
+----------------------------------------------------------+
| Top Negative Correlations       |      Correlation Value |
+---------------------------------+------------------------+
| petalWidth                      | 1.7976931348623157E308 |
| petalWidth(petalWidth = null)   | 1.7976931348623157E308 |
| petalLength                     | 1.7976931348623157E308 |
| petalLength(petalLength = null) | 1.7976931348623157E308 |
| sepalLength                     | 1.7976931348623157E308 |
| sepalLength(sepalLength = null) | 1.7976931348623157E308 |
| sepalWidth                      | 1.7976931348623157E308 |
| sepalWidth(sepalWidth = null)   | 1.7976931348623157E308 |
+---------------------------------+------------------------+
+------------------------------------------------------+
| Top Contributions               | Contribution Value |
+---------------------------------+--------------------+
| sepalWidth                      |  4.936926469035047 |
| petalWidth                      |  3.436492303761425 |
| sepalLength                     |  1.559475987118261 |
| petalLength                     | 0.8951550768404886 |
| sepalWidth(sepalWidth = null)   |                0.0 |
| sepalLength(sepalLength = null) |                0.0 |
| petalLength(petalLength = null) |                0.0 |
| petalWidth(petalWidth = null)   |                0.0 |
+---------------------------------+--------------------+
Out[15]:
fittedWorkflow: OpWorkflowModel = [email protected]

After model has been fitted we use scoreAndEvaluate() function to evaluate the metrics.

In [16]:
println("Scoring the model:\n=================")
val (dataframe, metrics) = fittedWorkflow.scoreAndEvaluate(evaluator = evaluator)

println("Transformed dataframe columns:\n--------------------------")
dataframe.columns.foreach(println)

println("Metrics:\n------------")
println(metrics)
Scoring the model:
=================
Transformed dataframe columns:
--------------------------
key
irisClass_1-stagesApplied_RealNN_000000000006
irisClass-petalLength-petalWidth-sepalLength-sepalWidth_4-stagesApplied_Prediction_000000000011
Metrics:
------------
{
  "Precision" : 0.9604700854700854,
  "Recall" : 0.96,
  "F1" : 0.9602349852021629,
  "Error" : 0.040000000000000036,
  "ThresholdMetrics" : {
    "topNs" : [ 1, 3 ],
    "thresholds" : [ 0.0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1, 0.11, 0.12, 0.13, 0.14, 0.15, 0.16, 0.17, 0.18, 0.19, 0.2, 0.21, 0.22, 0.23, 0.24, 0.25, 0.26, 0.27, 0.28, 0.29, 0.3, 0.31, 0.32, 0.33, 0.34, 0.35, 0.36, 0.37, 0.38, 0.39, 0.4, 0.41, 0.42, 0.43, 0.44, 0.45, 0.46, 0.47, 0.48, 0.49, 0.5, 0.51, 0.52, 0.53, 0.54, 0.55, 0.56, 0.57, 0.58, 0.59, 0.6, 0.61, 0.62, 0.63, 0.64, 0.65, 0.66, 0.67, 0.68, 0.69, 0.7, 0.71, 0.72, 0.73, 0.74, 0.75, 0.76, 0.77, 0.78, 0.79, 0.8, 0.81, 0.82, 0.83, 0.84, 0.85, 0.86, 0.87, 0.88, 0.89, 0.9, 0.91, 0.92, 0.93, 0.94, 0.95, 0.96, 0.97, 0.98, 0.99, 1.0 ],
    "correctCounts" : {
      "1" : [ 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 144, 143, 142, 141, 140, 140, 138, 137, 137, 137, 136, 135, 134, 134, 132, 130, 128, 122, 117, 117, 115, 114, 111, 109, 107, 104, 103, 98, 94, 90, 84, 80, 79, 76, 71, 70, 68, 67, 64, 62, 58, 56, 55, 52, 51, 51, 50, 48, 45, 35, 0 ],
      "3" : [ 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 150, 149, 149, 148, 148, 148, 148, 147, 147, 147, 147, 146, 146, 146, 145, 144, 143, 142, 141, 140, 140, 138, 137, 137, 137, 136, 135, 134, 134, 132, 130, 128, 122, 117, 117, 115, 114, 111, 109, 107, 104, 103, 98, 94, 90, 84, 80, 79, 76, 71, 70, 68, 67, 64, 62, 58, 56, 55, 52, 51, 51, 50, 48, 45, 35, 0 ]
    },
    "incorrectCounts" : {
      "1" : [ 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 4, 4, 4, 4, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ],
      "3" : [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 5, 6, 4, 4, 4, 4, 3, 3, 3, 3, 2, 2, 2, 2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ]
    },
    "noPredictionCounts" : {
      "1" : [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 4, 5, 6, 7, 9, 10, 10, 11, 12, 13, 14, 15, 17, 20, 22, 28, 33, 33, 35, 36, 39, 41, 43, 46, 47, 52, 56, 60, 66, 70, 71, 74, 79, 80, 82, 83, 86, 88, 92, 94, 95, 98, 99, 99, 100, 102, 105, 115, 150 ],
      "3" : [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 4, 5, 6, 7, 9, 10, 10, 11, 12, 13, 14, 15, 17, 20, 22, 28, 33, 33, 35, 36, 39, 41, 43, 46, 47, 52, 56, 60, 66, 70, 71, 74, 79, 80, 82, 83, 86, 88, 92, 94, 95, 98, 99, 99, 100, 102, 105, 115, 150 ]
    }
  }
}
Out[16]:
dataframe: DataFrame = [key: string, irisClass_1-stagesApplied_RealNN_000000000006: double ... 1 more field]
metrics: evaluators.EvaluationMetrics = MultiClassificationMetrics(
  0.9604700854700854,
  0.96,
  0.9602349852021629,
  0.040000000000000036,
  ThresholdMetrics(
    WrappedArray(1, 3),
    WrappedArray(
      0.0,
      0.01,
      0.02,
      0.03,
      0.04,
      0.05,
...