The following example uses the (processed) Covertype dataset from UCI Machine Learning Repository.
It is a dataset with both categorical (wilderness_area
and soil_type
) and continuous (the rest) features. The target is the cover_type
column:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
conf = SparkConf() #.set("spark.jars", "/Users/per0/wa/spark_wa/spark-tree-plotting/target/scala-2.11/spark-tree-plotting_0.2.jar")
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
covertype_dataset = spark.read.parquet("covertype_dataset.snappy.parquet")
covertype_dataset.printSchema()
root |-- elevation: long (nullable = true) |-- aspect: long (nullable = true) |-- slope: long (nullable = true) |-- horizontal_distance_to_hydrology: long (nullable = true) |-- vertical_distance_to_hydrology: long (nullable = true) |-- horizontal_distance_to_roadways: long (nullable = true) |-- hillshade_9am: long (nullable = true) |-- hillshade_noon: long (nullable = true) |-- hillshade_3pm: long (nullable = true) |-- horizontal_distance_to_fire_points: long (nullable = true) |-- wilderness_area: string (nullable = true) |-- soil_type: string (nullable = true) |-- cover_type: string (nullable = true)
The 10 first rows:
covertype_dataset.limit(10).toPandas()
elevation | aspect | slope | horizontal_distance_to_hydrology | vertical_distance_to_hydrology | horizontal_distance_to_roadways | hillshade_9am | hillshade_noon | hillshade_3pm | horizontal_distance_to_fire_points | wilderness_area | soil_type | cover_type | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2596 | 51 | 3 | 258 | 0 | 510 | 221 | 232 | 148 | 6279 | rawah wilderness area | Soil_Type_7745 | Aspen |
1 | 2590 | 56 | 2 | 212 | -6 | 390 | 220 | 235 | 151 | 6225 | rawah wilderness area | Soil_Type_7745 | Aspen |
2 | 2804 | 139 | 9 | 268 | 65 | 3180 | 234 | 238 | 135 | 6121 | rawah wilderness area | Soil_Type_4744 | Lodgepole Pine |
3 | 2785 | 155 | 18 | 242 | 118 | 3090 | 238 | 238 | 122 | 6211 | rawah wilderness area | Soil_Type_7746 | Lodgepole Pine |
4 | 2595 | 45 | 2 | 153 | -1 | 391 | 220 | 234 | 150 | 6172 | rawah wilderness area | Soil_Type_7745 | Aspen |
5 | 2579 | 132 | 6 | 300 | -15 | 67 | 230 | 237 | 140 | 6031 | rawah wilderness area | Soil_Type_7745 | Lodgepole Pine |
6 | 2606 | 45 | 7 | 270 | 5 | 633 | 222 | 225 | 138 | 6256 | rawah wilderness area | Soil_Type_7745 | Aspen |
7 | 2605 | 49 | 4 | 234 | 7 | 573 | 222 | 230 | 144 | 6228 | rawah wilderness area | Soil_Type_7745 | Aspen |
8 | 2617 | 45 | 9 | 240 | 56 | 666 | 223 | 221 | 133 | 6244 | rawah wilderness area | Soil_Type_7745 | Aspen |
9 | 2612 | 59 | 10 | 247 | 11 | 636 | 228 | 219 | 124 | 6230 | rawah wilderness area | Soil_Type_7745 | Aspen |
In order for Spark's DecisionTreeClassifier
to work with the categorical features (as well as the target), we first need to use pyspark.ml.feature.StringIndexer
s to generate a numeric representation for those columns:
from pyspark.ml.feature import StringIndexer
string_indexer_wilderness = StringIndexer(inputCol="wilderness_area", outputCol="wilderness_area_indexed")
string_indexer_soil = StringIndexer(inputCol="soil_type", outputCol="soil_type_indexed")
string_indexer_cover = StringIndexer(inputCol="cover_type", outputCol="cover_type_indexed")
To generate the new StringIndexerModels, we call .fit()
on each StringIndexer
instance:
string_indexer_wilderness_model = string_indexer_wilderness.fit(covertype_dataset)
string_indexer_soil_model = string_indexer_soil.fit(covertype_dataset)
string_indexer_cover_model = string_indexer_cover.fit(covertype_dataset)
And we create the new columns:
covertype_dataset_indexed_features = string_indexer_cover_model.transform(string_indexer_soil_model
.transform(string_indexer_wilderness_model
.transform(covertype_dataset)
)
)
New columns can be seen at the right:
covertype_dataset_indexed_features.limit(10).toPandas()
elevation | aspect | slope | horizontal_distance_to_hydrology | vertical_distance_to_hydrology | horizontal_distance_to_roadways | hillshade_9am | hillshade_noon | hillshade_3pm | horizontal_distance_to_fire_points | wilderness_area | soil_type | cover_type | wilderness_area_indexed | soil_type_indexed | cover_type_indexed | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2596 | 51 | 3 | 258 | 0 | 510 | 221 | 232 | 148 | 6279 | rawah wilderness area | Soil_Type_7745 | Aspen | 0.0 | 0.0 | 5.0 |
1 | 2590 | 56 | 2 | 212 | -6 | 390 | 220 | 235 | 151 | 6225 | rawah wilderness area | Soil_Type_7745 | Aspen | 0.0 | 0.0 | 5.0 |
2 | 2804 | 139 | 9 | 268 | 65 | 3180 | 234 | 238 | 135 | 6121 | rawah wilderness area | Soil_Type_4744 | Lodgepole Pine | 0.0 | 7.0 | 0.0 |
3 | 2785 | 155 | 18 | 242 | 118 | 3090 | 238 | 238 | 122 | 6211 | rawah wilderness area | Soil_Type_7746 | Lodgepole Pine | 0.0 | 6.0 | 0.0 |
4 | 2595 | 45 | 2 | 153 | -1 | 391 | 220 | 234 | 150 | 6172 | rawah wilderness area | Soil_Type_7745 | Aspen | 0.0 | 0.0 | 5.0 |
5 | 2579 | 132 | 6 | 300 | -15 | 67 | 230 | 237 | 140 | 6031 | rawah wilderness area | Soil_Type_7745 | Lodgepole Pine | 0.0 | 0.0 | 0.0 |
6 | 2606 | 45 | 7 | 270 | 5 | 633 | 222 | 225 | 138 | 6256 | rawah wilderness area | Soil_Type_7745 | Aspen | 0.0 | 0.0 | 5.0 |
7 | 2605 | 49 | 4 | 234 | 7 | 573 | 222 | 230 | 144 | 6228 | rawah wilderness area | Soil_Type_7745 | Aspen | 0.0 | 0.0 | 5.0 |
8 | 2617 | 45 | 9 | 240 | 56 | 666 | 223 | 221 | 133 | 6244 | rawah wilderness area | Soil_Type_7745 | Aspen | 0.0 | 0.0 | 5.0 |
9 | 2612 | 59 | 10 | 247 | 11 | 636 | 228 | 219 | 124 | 6230 | rawah wilderness area | Soil_Type_7745 | Aspen | 0.0 | 0.0 | 5.0 |
Now, we just have to VectorAssemble
our features to create the feature vector:
from pyspark.ml.feature import VectorAssembler
feature_columns = ["elevation",
"aspect",
"slope",
"horizontal_distance_to_hydrology",
"vertical_distance_to_hydrology",
"horizontal_distance_to_roadways",
"hillshade_9am",
"hillshade_noon",
"hillshade_3pm",
"horizontal_distance_to_fire_points",
"wilderness_area_indexed",
"soil_type_indexed"]
feature_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
And we have our dataset prepared for ML:
covertype_dataset_prepared = feature_assembler.transform(covertype_dataset_indexed_features)
covertype_dataset_prepared.printSchema()
root |-- elevation: long (nullable = true) |-- aspect: long (nullable = true) |-- slope: long (nullable = true) |-- horizontal_distance_to_hydrology: long (nullable = true) |-- vertical_distance_to_hydrology: long (nullable = true) |-- horizontal_distance_to_roadways: long (nullable = true) |-- hillshade_9am: long (nullable = true) |-- hillshade_noon: long (nullable = true) |-- hillshade_3pm: long (nullable = true) |-- horizontal_distance_to_fire_points: long (nullable = true) |-- wilderness_area: string (nullable = true) |-- soil_type: string (nullable = true) |-- cover_type: string (nullable = true) |-- wilderness_area_indexed: double (nullable = false) |-- soil_type_indexed: double (nullable = false) |-- cover_type_indexed: double (nullable = false) |-- features: vector (nullable = true)
Let's build a simple pyspark.ml.classification.DecisionTreeClassifier
:
# from pyspark.ml.classification import DecisionTreeClassifier
# dtree = DecisionTreeClassifier(featuresCol="features",
# labelCol="cover_type_indexed",
# maxDepth=3,
# maxBins=50)
We fit it, and we get our DecisionTreeClassificationModel
:
# dtree_model = dtree.fit(covertype_dataset_prepared)
# dtree_model
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_99684a674979) of depth 3 with 11 nodes
from pyspark.ml.classification import DecisionTreeClassificationModel
dtree_model = DecisionTreeClassificationModel.load('tree_model')
The .toDebugString
attribute prints the decision rules for the tree, but it is not very user-friendly:
print(dtree_model.toDebugString)
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_f84d275537cd) of depth 3 with 7 nodes If (feature 0 <= 3050.5) If (feature 0 <= 2540.5) If (feature 10 in {0.0}) Predict: 0.0 Else (feature 10 not in {0.0}) Predict: 2.0 Else (feature 0 > 2540.5) Predict: 0.0 Else (feature 0 > 3050.5) Predict: 1.0
spark_tree_plotting
may be helpful here ;)¶import sys
sys.path.insert(0, '/Users/per0/wa/spark_wa/spark-tree-plotting/python')
from spark_tree_plotting import plot_tree
tree_plot = plot_tree(dtree_model,
featureNames=feature_columns,
categoryNames={"wilderness_area_indexed":string_indexer_wilderness_model.labels,
"soil_type_indexed":string_indexer_soil_model.labels},
classNames=string_indexer_cover_model.labels,
filled=True, # With color!
roundedCorners=True, # Rounded corners in the nodes
roundLeaves=True # Leaves will be ellipses instead of rectangles
)
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-17-89ad5e632136> in <module> 10 filled=True, # With color! 11 roundedCorners=True, # Rounded corners in the nodes ---> 12 roundLeaves=True # Leaves will be ellipses instead of rectangles 13 ) ~/wa/spark_wa/spark-tree-plotting/python/spark_tree_plotting.py in plot_tree(DecisionTreeClassificationModel, featureNames, categoryNames, classNames, filled, roundedCorners, roundLeaves) 434 filled=filled, 435 roundedCorners=roundedCorners, --> 436 roundLeaves=roundLeaves 437 ) 438 ) /usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pydot/__init__.py in graph_from_dot_data(data) 216 """ 217 --> 218 return dot_parser.parse_dot_data(data) 219 220 /usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pydot/dot_parser.py in parse_dot_data(data) 520 521 tokens = graphparser.parseString(data) --> 522 523 if len(tokens) == 1: 524 return tokens[0] /usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pyparsing.py in parseString(self, instring, parseAll) 1816 instring = instring.expandtabs() 1817 try: -> 1818 loc, tokens = self._parse( instring, 0 ) 1819 if parseAll: 1820 loc = self.preParse( instring, loc ) /usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pyparsing.py in _parseNoCache(self, instring, loc, doActions, callPreParse) 1593 for fn in self.parseAction: 1594 try: -> 1595 tokens = fn( instring, tokensStart, retTokens ) 1596 except IndexError as parse_action_exc: 1597 exc = ParseException("exception raised in parse action") /usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pyparsing.py in wrapper(*args) 1215 while 1: 1216 try: -> 1217 ret = func(*args[limit[0]:]) 1218 foundArity[0] = True 1219 return ret /usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pydot/dot_parser.py in push_top_graph_stmt(str, loc, toks) 79 if( isinstance(element, (ParseResults, tuple, list)) and 80 len(element) == 1 and isinstance(element[0], str) ): ---> 81 82 element = element[0] 83 TypeError: isinstance() arg 2 must be a type or tuple of types
json_tree = sc._jvm.com.vfive.spark.ml.SparkMLTree(dtree_model._java_obj)
print(json_tree.toJsonPlotFormat())
{ "featureIndex":0, "gain":0.08681394658400207, "impurity":0.6230942824070332, "threshold":3050.5, "nodeType":"internal", "splitType":"continuous", "prediction":0.0, "leftChild":{ "featureIndex":0, "gain":0.08616165361635758, "impurity":0.5539261911259398, "threshold":2540.5, "nodeType":"internal", "splitType":"continuous", "prediction":0.0, "leftChild":{ "featureIndex":10, "gain":0.04640621444482429, "impurity":0.6171371727013576, "nodeType":"internal", "splitType":"categorical", "leftCategories":[ 0.0 ], "rightCategories":[ 1.0, 2.0, 3.0 ], "prediction":2.0, "leftChild":{ "impurity":0.18642232564845895, "nodeType":"leaf", "prediction":0.0 }, "rightChild":{ "impurity":0.5893401621499551, "nodeType":"leaf", "prediction":2.0 } }, "rightChild":{ "impurity":0.4430125702798494, "nodeType":"leaf", "prediction":0.0 } }, "rightChild":{ "impurity":0.5109863148417016, "nodeType":"leaf", "prediction":1.0 } }
from IPython.display import Image
Image(tree_plot)