#!/usr/bin/env python # coding: utf-8 # Translated Beginners Guide to ML with Apache Spark # ================================================== # # This is a translation into PySpark of the [*Beginners Guide: Apache Spark Machine Learning with Large Data*](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html) tutorial by Dmitry Petrov from KDNuggets. I'm not going to provide a lot of commentary, I'll refer you back to the original tutorial for that, but I will provide page numbers to make it easier to follow along. Since this was an exercise to familiarize myself with PySpark, there may be some awkward bits; please let me know if you find such. # # First a few preliminaries: I used the Hortonworks sandbox running on VMware, although I'm sure VirtualBox would work as well. I set up Jupyter notebook according these nice [instructions](http://simnotes.github.io) by Simon Streubel. I had to bump up the executor and driver memory of the PySpark using the instructions in the answer to [this](http://stackoverflow.com/questions/32336915/pyspark-java-lang-outofmemoryerror-java-heap-space) question on StackExchange. `spark-defaults.conf` was located in `/etc/spark/conf` on the Hortonworks sandbox VM. Of course, if you have a working PySpark installation, you can probably ignore most of the above. # # In the linked tutorial, a large and a small version of the `Posts.xml` file can be used. I used the small version from [here](https://www.dropbox.com/s/n2skgloqoadpa30/Posts.small.xml?dl=0) since I wasn't running the example on a particularly high powered machine. That file is assumed to be located at `/root/Posts.small.xml`, since the Hortonworks sandbox seems to want you to run as root for some reason (ugh!). If you have a different PySpark installation, you'll almost certainly want to change that location. # # So, without further ado, let the translation begin: # [Page 2](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/2): **4 – Importing Libraries** # In[1]: import re from lxml import etree from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, StringType, DoubleType from pyspark.sql.functions import udf, col from pyspark.ml import Pipeline from pyspark.ml.feature import Tokenizer, HashingTF from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics # [Page 2](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/2): **5 – Parsing XML** # In[15]: fileName = "file:/root/Posts.small.xml" textFile = sc.textFile(fileName) postsXml = (textFile.map(lambda x: x.strip()) .filter(lambda x: not x.startswith("") .filter(lambda x: x != "")) # Look at the first filtered row of XML postsXml.collect()[:1] # In[14]: junk = re.compile(r"<\S+>") extra_space = re.compile(r"( )+") def make_row(s): root = etree.fromstring(s) id = root.get("Id", "") tags = root.get("Tags", "") title = root.get("Title", "") bodyPlain = junk.sub(" ", root.get("Body", "")) text = extra_space.sub(" ", (title + bodyPlain.replace("\n", " "))) return Row(id, tags, text) postsRdd = postsXml.map( make_row ) # Look at the first row of postsRDD print(postsRdd.collect()[:1]) # In[10]: schemaString = "Id Tags Text" schema = StructType( [StructField(x, StringType(), True) for x in schemaString.split(" ")] ) postsDf = sqlContext.createDataFrame(postsRdd, schema) # Now take a look at the data frame postsDf.show() # [Page 3](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/3): **6 – Preparing training and testing datasets** # In[5]: targetTag = "java" sqlfunc = udf(lambda x : 1.0 if (targetTag in x) else 0.0, DoubleType()) postsLabelled = postsDf.withColumn("Label", sqlfunc(postsDf.Tags)) positive = postsLabelled.filter(postsLabelled.Label == 1.0) negative = postsLabelled.filter(postsLabelled.Label != 1.0) positiveTrain = positive.sample(False, 0.9) negativeTrain = negative.sample(False, 0.9) training = positiveTrain.unionAll(negativeTrain) negativeTrainTmp = negativeTrain.withColumnRenamed("Label", "Flag").select('Id', 'Flag') negativeTest = (negative.join(negativeTrainTmp, negative["Id"] == negativeTrainTmp["Id"], "LeftOuter").filter("Flag is null") .select(negative["Id"], 'Tags', 'Text', 'Label')) positiveTrainTmp = (positiveTrain.withColumnRenamed("Label", "Flag") .select('Id', 'Flag')) positiveTest = (positive.join(positiveTrainTmp, positive["Id"] == positiveTrainTmp["Id"], "LeftOuter").filter("Flag is null") .select(positive["Id"], 'Tags', 'Text', 'Label')) testing = negativeTest.unionAll(positiveTest) # [Page 3](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/3): **7 – Training a model** # In[16]: numFeatures = 64000 numEpochs = 30 regParam = 0.02 tokenizer = Tokenizer(inputCol = "Text", outputCol = "Words") hashingTF = HashingTF(numFeatures = numFeatures, inputCol = tokenizer.getOutputCol(), outputCol = "Features") lr = LogisticRegression(maxIter = numEpochs, regParam = regParam, featuresCol = "Features", labelCol = "Label", # Despite appearing in the docs, rawPredictionCol was not available # on LogisticRegression in the version of PySpark I had. Perhaps, # my version was not up to date. #rawPredictionCol = "Score" predictionCol = "Prediction") pipeline = Pipeline(stages = [tokenizer, hashingTF, lr]) model = pipeline.fit(training) # [Page 3](http://www.kdnuggets.com/2015/11/petrov-apache-spark-machine-learning-large-data.html/3): **8 – Testing a model** # In[17]: # Predict the result for a single test case. testTitle = "Easiest way to merge a release into one JAR file" testBody = """Is there a tool or script which easily merges a bunch of href="http://en.wikipedia.org/wiki/JAR_%28file_format %29" JAR files into one JAR file? A bonus would be to easily set the main-file manifest and make it executable. I would like to run it with something like: As far as I can tell, it has no dependencies which indicates that it shouldn't be an easy single-file tool, but the downloaded ZIP file contains a lot of libraries.""" testText = testTitle + testBody testDF = sqlContext.createDataFrame([Row(Label=99.0, Text=testText)]) result = model.transform(testDF) prediction = result.collect()[0][6] print("Prediction: {0}".format(prediction)) # Evaluate the quality of the model based on training dataset. testingResult = model.transform(testing) testingResultScores = (testingResult.select("Prediction", "Label").rdd .map(lambda r: (float(r[0]), float(r[1])))) bc = BinaryClassificationMetrics(testingResultScores) print("Area under the ROC: {0}".format(bc.areaUnderROC)) # Thats it. I hope you found this translation helpful.