DataFrame: a table with built-in operations
Transformer: transforms one DataFrame into another DataFrame
Estimator: eg. a learning algorithm that trains on a DataFrame and produces a Model
Pipeline: chains Transformers and Estimators to produce a Model
Evaluator: measures how well a fitted Model does on held-out test data
We will use a dataset[1] that contains 8.9M book reviews from Amazon, spanning May 1996 - July 2014.
Dataset characteristics:
[1] Image-based recommendations on styles and substitutes J. McAuley, C. Targett, J. Shi, A. van den Hengel SIGIR, 2015 http://jmcauley.ucsd.edu/data/amazon/
%%time
raw_reviews = sqlContext.read.json('data/amazon/reviews_Books_5.json')
CPU times: user 8.26 ms, sys: 1.2 ms, total: 9.46 ms Wall time: 37.8 s
raw_reviews.printSchema()
root |-- asin: string (nullable = true) |-- helpful: array (nullable = true) | |-- element: long (containsNull = true) |-- overall: double (nullable = true) |-- reviewText: string (nullable = true) |-- reviewTime: string (nullable = true) |-- reviewerID: string (nullable = true) |-- reviewerName: string (nullable = true) |-- summary: string (nullable = true) |-- unixReviewTime: long (nullable = true)
%%time
all_reviews = raw_reviews.select('reviewText', 'overall')
all_reviews.cache()
all_reviews.show(2)
+--------------------+-------+ | reviewText|overall| +--------------------+-------+ |Spiritually and m...| 5.0| |This is one my mu...| 5.0| +--------------------+-------+ only showing top 2 rows CPU times: user 3.88 ms, sys: 998 µs, total: 4.88 ms Wall time: 3.01 s
%%time
all_reviews.groupBy('overall').count().show()
+-------+-------+ |overall| count| +-------+-------+ | 1.0| 323833| | 3.0| 955189| | 5.0|4980815| | 4.0|2223094| | 2.0| 415110| +-------+-------+ CPU times: user 3.66 ms, sys: 3.28 ms, total: 6.94 ms Wall time: 15.4 s
We will avoid neutral reviews by keeping only reviews with 1 or 5 stars overall score. We will also filter out the reviews that contain no text.
nonneutral_reviews = all_reviews.filter(
(all_reviews.overall == 1.0) | (all_reviews.overall == 5.0))
reviews = nonneutral_reviews.filter(all_reviews.reviewText != '')
reviews.cache()
all_reviews.unpersist()
DataFrame[reviewText: string, overall: double]
%%time
reviews.count()
CPU times: user 37 µs, sys: 2.06 ms, total: 2.09 ms Wall time: 5.59 s
5304187
trainingData, testData = reviews.randomSplit([0.8, 0.2])
A transformer to convert numerical features to binary (0/1) features
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=2.5, inputCol='overall', outputCol='label')
Let's see what it will do on data:
bin_reviews = binarizer.transform(reviews)
bin_reviews.show(2)
+--------------------+-------+-----+ | reviewText|overall|label| +--------------------+-------+-----+ |Spiritually and m...| 5.0| 1.0| |This is one my mu...| 5.0| 1.0| +--------------------+-------+-----+ only showing top 2 rows
A transformer that converts the input string to lowercase and then splits it by white spaces.
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
Let's see what it will do on data:
tokenized_reviews = tokenizer.transform(bin_reviews)
tokenized_reviews.show(10)
+--------------------+-------+-----+--------------------+ | reviewText|overall|label| words| +--------------------+-------+-----+--------------------+ |Spiritually and m...| 5.0| 1.0|[spiritually, and...| |This is one my mu...| 5.0| 1.0|[this, is, one, m...| |This book provide...| 5.0| 1.0|[this, book, prov...| |I first read THE ...| 5.0| 1.0|[i, first, read, ...| |A timeless classi...| 5.0| 1.0|[a, timeless, cla...| |Reading this made...| 5.0| 1.0|[reading, this, m...| |As you read, Gibr...| 5.0| 1.0|[as, you, read,, ...| |Deep, moving dram...| 5.0| 1.0|[deep,, moving, d...| |This is a timeles...| 5.0| 1.0|[this, is, a, tim...| |An amazing work. ...| 5.0| 1.0|[an, amazing, wor...| +--------------------+-------+-----+--------------------+ only showing top 10 rows
A transformer that filters out stop words from input. Note: null values from input array are preserved unless adding null to stopWords explicitly.
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered")
Let's see what it will do on data:
removed_reviews = remover.transform(tokenized_reviews)
removed_reviews.show(2)
sample_review = removed_reviews.first()
print sample_review['words'][:10]
print sample_review['filtered'][:10]
+--------------------+-------+-----+--------------------+--------------------+ | reviewText|overall|label| words| filtered| +--------------------+-------+-----+--------------------+--------------------+ |Spiritually and m...| 5.0| 1.0|[spiritually, and...|[spiritually, men...| |This is one my mu...| 5.0| 1.0|[this, is, one, m...|[books., masterpi...| +--------------------+-------+-----+--------------------+--------------------+ only showing top 2 rows [u'spiritually', u'and', u'mentally', u'inspiring!', u'a', u'book', u'that', u'allows', u'you', u'to'] [u'spiritually', u'mentally', u'inspiring!', u'book', u'allows', u'question', u'morals', u'help', u'discover', u'really']
A Transformer that converts a sequence of words into a fixed-length feature Vector. It maps a sequence of terms to their term frequencies using a hashing function.
from pyspark.ml.feature import HashingTF
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")
Let's see what it will do on data:
hashed_reviews = hashingTF.transform(removed_reviews)
hashed_reviews.show(2)
sample = hashed_reviews.first()
sample['features']
+--------------------+-------+-----+--------------------+--------------------+--------------------+ | reviewText|overall|label| words| filtered| features| +--------------------+-------+-----+--------------------+--------------------+--------------------+ |Spiritually and m...| 5.0| 1.0|[spiritually, and...|[spiritually, men...|(262144,[15260,30...| |This is one my mu...| 5.0| 1.0|[this, is, one, m...|[books., masterpi...|(262144,[501,2326...| +--------------------+-------+-----+--------------------+--------------------+--------------------+ only showing top 2 rows
SparseVector(262144, {15260: 1.0, 30697: 1.0, 53057: 1.0, 57834: 1.0, 96171: 1.0, 112792: 1.0, 118861: 1.0, 146153: 1.0, 146406: 1.0, 181834: 1.0, 192854: 1.0})
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.01)
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[binarizer, tokenizer, remover, hashingTF, lr])
%%time
pipeLineModel = pipeline.fit(trainingData)
CPU times: user 16 ms, sys: 2.71 ms, total: 18.7 ms Wall time: 38.7 s
%%time
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
predictions = pipeLineModel.transform(testData)
aur = evaluator.evaluate(predictions)
print 'Area under ROC: ', aur
/usr/hdp/2.4.2.0-258/spark/python/pyspark/ml/classification.py:207: UserWarning: weights is deprecated. Use coefficients instead. warnings.warn("weights is deprecated. Use coefficients instead.")
Area under ROC: 0.966076886439 CPU times: user 2.34 s, sys: 1.34 s, total: 3.68 s Wall time: 19.9 s
%%time
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
param_grid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10000, 100000]) \
.addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
.addGrid(lr.maxIter, [10, 20]) \
.build()
cv = (CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(param_grid)
.setNumFolds(3))
cv_model = cv.fit(trainingData)
CPU times: user 20.2 s, sys: 10.8 s, total: 31.1 s Wall time: 12min 46s
%%time
new_predictions = cv_model.transform(testData)
new_aur = evaluator.evaluate(new_predictions)
print 'Area under ROC: ', new_aur
Area under ROC: 0.968664347321 CPU times: user 942 ms, sys: 561 ms, total: 1.5 s Wall time: 12.6 s
best_model = cv_model.bestModel
print 'bestModel Type: ', type(best_model)
print 'bestModel Stages: ', best_model.stages
best_tokenizer = best_model.stages[3]
best_lr = best_model.stages[4]
bestModel Type: <class 'pyspark.ml.pipeline.PipelineModel'> bestModel Stages: [Binarizer_4e75a232715a7b413190, Tokenizer_4341864d68f1fca3846c, StopWordsRemover_4de29c833c5925392818, HashingTF_4117a4ad7d3c7e10d67a, LogisticRegression_46eea6e84cab6d486cb2]
print best_tokenizer.extractParamMap()
# numFeatures: 100000, regParam: 0.1, maxIter: 20
{Param(parent='HashingTF_4117a4ad7d3c7e10d67a', name='outputCol', doc='output column name.'): 'features', Param(parent='LogisticRegression_46eea6e84cab6d486cb2', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent='HashingTF_4117a4ad7d3c7e10d67a', name='inputCol', doc='input column name.'): 'filtered', Param(parent='HashingTF_4117a4ad7d3c7e10d67a', name='numFeatures', doc='number of features.'): 100000, Param(parent='LogisticRegression_46eea6e84cab6d486cb2', name='regParam', doc='regularization parameter (>= 0).'): 0.1}
print 'Intercept: ', best_lr.intercept
print 'First 5 LR weights: ', best_lr.weights[:5]
Intercept: 2.60999655811 First 5 LR weights: [-0.0012044 0.03860943 -0.33466001 0.03434626 0.00059916]
We can now print the top 10 most words in positive and negative reviews
%matplotlib inline
from bokeh.charts import Bar, show
from bokeh.io import output_notebook
from bokeh.charts.attributes import cat
output_notebook()
from operator import add
import pandas as pd
rdd = removed_reviews.filter('label = 1.0').select('filtered').rdd
top_10_positive_words = (rdd
.flatMap(lambda review: review.filtered)
.filter(lambda word: word != '')
.map(lambda word: (word, 1))
.reduceByKey(add)
.map(lambda (word, count): (count, word))
.sortByKey(ascending=False)
.take(10))
df = pd.DataFrame(top_10_positive_words, columns=['Count', 'Word'])
p = Bar(df, label=cat(columns='Word', sort=False), values='Count',
title='Top 10 words in positive reviews',
legend='top_right')
show(p)
<Bokeh Notebook handle for In[17]>
from operator import add
rdd = removed_reviews.filter('label = 0.0').select('filtered').rdd
top_10_negative_words = (rdd
.flatMap(lambda review: review.filtered)
.filter(lambda word: word != '')
.map(lambda word: (word, 1))
.reduceByKey(add)
.map(lambda (word, count): (count, word))
.sortByKey(ascending=False)
.take(10))
df = pd.DataFrame(top_10_negative_words, columns=['Count', 'Word'])
p = Bar(df, label=cat(columns='Word', sort=False), values='Count',
title='Top 10 words in negative reviews',
legend='top_right')
show(p)
<Bokeh Notebook handle for In[53]>