Translated Beginners Guide to ML with Apache Spark

This is a translation into PySpark of the Beginners Guide: Apache Spark Machine Learning with Large Data tutorial by Dmitry Petrov from KDNuggets. I'm not going to provide a lot of commentary, I'll refer you back to the original tutorial for that, but I will provide page numbers to make it easier to follow along. Since this was an exercise to familiarize myself with PySpark, there may be some awkward bits; please let me know if you find such.

First a few preliminaries: I used the Hortonworks sandbox running on VMware, although I'm sure VirtualBox would work as well. I set up Jupyter notebook according these nice instructions by Simon Streubel. I had to bump up the executor and driver memory of the PySpark using the instructions in the answer to this question on StackExchange. spark-defaults.conf was located in /etc/spark/conf on the Hortonworks sandbox VM. Of course, if you have a working PySpark installation, you can probably ignore most of the above.

In the linked tutorial, a large and a small version of the Posts.xml file can be used. I used the small version from here since I wasn't running the example on a particularly high powered machine. That file is assumed to be located at /root/Posts.small.xml, since the Hortonworks sandbox seems to want you to run as root for some reason (ugh!). If you have a different PySpark installation, you'll almost certainly want to change that location.

So, without further ado, let the translation begin:

Page 2: 4 – Importing Libraries

In [1]:
import re
from lxml import etree
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import udf, col
from import Pipeline
from import Tokenizer, HashingTF
from import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics

Page 2: 5 – Parsing XML

In [15]:
fileName = "file:/root/Posts.small.xml"
textFile = sc.textFile(fileName)
postsXml = ( x: x.strip())
                    .filter(lambda x: not x.startswith("<?xml version="))
                    .filter(lambda x: x != "<posts>")
                    .filter(lambda x: x != "</posts>"))
# Look at the first filtered row of XML
[u'<row Id="4" PostTypeId="1" AcceptedAnswerId="7" CreationDate="2008-07-31T21:42:52.667" Score="322" ViewCount="21888" Body="&lt;p&gt;I want to use a track-bar to change a form\'s opacity.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;This is my code:&lt;/p&gt;&#xA;&#xA;&lt;pre&gt;&lt;code&gt;decimal trans = trackBar1.Value / 5000;&#xA;this.Opacity = trans;&#xA;&lt;/code&gt;&lt;/pre&gt;&#xA;&#xA;&lt;p&gt;When I try to build it, I get this error:&lt;/p&gt;&#xA;&#xA;&lt;blockquote&gt;&#xA;  &lt;p&gt;Cannot implicitly convert type \'decimal\' to \'double\'.&lt;/p&gt;&#xA;&lt;/blockquote&gt;&#xA;&#xA;&lt;p&gt;I tried making &lt;code&gt;trans&lt;/code&gt; a &lt;code&gt;double&lt;/code&gt;, but then the control doesn\'t work. This code has worked fine for me in VB.NET in the past. &lt;/p&gt;&#xA;" OwnerUserId="8" LastEditorUserId="451518" LastEditorDisplayName="Rich B" LastEditDate="2014-07-28T10:02:50.557" LastActivityDate="2014-12-20T17:18:47.807" Title="When setting a form\'s opacity should I use a decimal or double?" Tags="&lt;c#&gt;&lt;winforms&gt;&lt;type-conversion&gt;&lt;opacity&gt;" AnswerCount="13" CommentCount="1" FavoriteCount="27" CommunityOwnedDate="2012-10-31T16:42:47.213" />']
In [14]:
junk = re.compile(r"<\S+>")
extra_space = re.compile(r"( )+")

def make_row(s):
    root = etree.fromstring(s)
    id = root.get("Id", "")
    tags = root.get("Tags", "")
    title = root.get("Title", "")
    bodyPlain = junk.sub(" ", root.get("Body", ""))
    text = extra_space.sub(" ", (title + bodyPlain.replace("\n", " ")))
    return Row(id, tags, text)

postsRdd = make_row )
# Look at the first row of postsRDD
[<Row(4, <c#><winforms><type-conversion><opacity>, When setting a form's opacity should I use a decimal or double? I want to use a track-bar to change a form's opacity. This is my code: decimal trans = trackBar1.Value / 5000; this.Opacity = trans; When I try to build it, I get this error: Cannot implicitly convert type 'decimal' to 'double'. I tried making a , but then the control doesn't work. This code has worked fine for me in VB.NET in the past. )>]
In [10]:
schemaString = "Id Tags Text"
schema = StructType( [StructField(x, StringType(), True) for x in schemaString.split(" ")] )
postsDf = sqlContext.createDataFrame(postsRdd, schema)
# Now take a look at the data frame
|Id|                Tags|                Text|
| 4|<c#><winforms><ty...|When setting a fo...|
| 6|<html><css><css3>...|Why doesn't the p...|
| 7|                    | An explicit cast...|
| 9|<c#><.net><datetime>|How do I calculat...|
|11|      <c#><datediff>|How do I calculat...|
|12|                    | Well, here's how...|
|13|<html><browser><t...|Determining a web...|
|14|              <.net>|Difference betwee...|
|16|<c#><linq><web-se...|Filling a DataSet...|
|17|<mysql><database>...|Binary Data in My...|
|18|                    | For a table like...|
|19|<performance><alg...|What is the faste...|
|21|                    | Many years ago, ...|
|22|                    | The best way tha...|
|24|<mysql><database>...|Throw an error in...|
|25|<c++><c><sockets>...|How to use the C ...|
|26|                    | The answer by is...|
 IMHO yours s...|
|29|                    | There are no HTT...|
|30|                    | I've had no trou...|

Page 3: 6 – Preparing training and testing datasets

In [5]:
targetTag = "java"
sqlfunc = udf(lambda x : 1.0 if (targetTag in x) else 0.0, DoubleType())
postsLabelled = postsDf.withColumn("Label", sqlfunc(postsDf.Tags))

positive = postsLabelled.filter(postsLabelled.Label == 1.0)
negative = postsLabelled.filter(postsLabelled.Label != 1.0)

positiveTrain = positive.sample(False, 0.9)
negativeTrain = negative.sample(False, 0.9)
training = positiveTrain.unionAll(negativeTrain)

negativeTrainTmp = negativeTrain.withColumnRenamed("Label", "Flag").select('Id', 'Flag')
negativeTest = (negative.join(negativeTrainTmp, 
                              negative["Id"] == negativeTrainTmp["Id"], 
                              "LeftOuter").filter("Flag is null")
                        .select(negative["Id"], 'Tags', 'Text', 'Label'))

positiveTrainTmp = (positiveTrain.withColumnRenamed("Label", "Flag")
                                 .select('Id', 'Flag'))
positiveTest = (positive.join(positiveTrainTmp, 
                              positive["Id"] == positiveTrainTmp["Id"], 
                             "LeftOuter").filter("Flag is null")
                        .select(positive["Id"], 'Tags', 'Text', 'Label'))

testing = negativeTest.unionAll(positiveTest)

Page 3: 7 – Training a model

In [16]:
numFeatures = 64000
numEpochs = 30
regParam = 0.02

tokenizer = Tokenizer(inputCol = "Text",
                      outputCol = "Words")

hashingTF = HashingTF(numFeatures = numFeatures,
                      inputCol = tokenizer.getOutputCol(),
                      outputCol = "Features")

lr = LogisticRegression(maxIter = numEpochs,
                        regParam = regParam,
                        featuresCol = "Features",
                        labelCol = "Label",
                        # Despite appearing in the docs, rawPredictionCol was not available
                        # on LogisticRegression in the version of PySpark I had. Perhaps,
                        # my version was not up to date.
                        #rawPredictionCol = "Score"
                        predictionCol = "Prediction")

pipeline = Pipeline(stages = [tokenizer, hashingTF, lr])  

model =

Page 3: 8 – Testing a model

In [17]:
# Predict the result for a single test case.

testTitle = "Easiest way to merge a release into one JAR file"

testBody = """Is there a tool or script which easily merges a bunch 
 of href="
 %29" JAR files into one JAR file? A bonus would be to 
 easily set the main-file manifest and make it executable.
 I would like to run it with something like: As far as I 
 can tell, it has no dependencies which indicates that it 
 shouldn't be an easy single-file tool, but the downloaded
 ZIP file contains a lot of libraries."""

testText = testTitle + testBody

testDF = sqlContext.createDataFrame([Row(Label=99.0, Text=testText)])

result = model.transform(testDF)

prediction = result.collect()[0][6]

print("Prediction: {0}".format(prediction))

# Evaluate the quality of the model based on training dataset. 

testingResult = model.transform(testing)

testingResultScores = ("Prediction", "Label").rdd
                                    .map(lambda r: (float(r[0]), float(r[1]))))

bc = BinaryClassificationMetrics(testingResultScores)
print("Area under the ROC: {0}".format(bc.areaUnderROC))
Prediction: 0.0
Area under the ROC: 0.601640398991

Thats it. I hope you found this translation helpful.