We start by creating the 3 RDDs from the different datasets from Amazon product reviews. The data has been kindly provided to us by Dr Julian McAuley and here is a reference of the original papers:
Note that it does not move the data at this stage due to the lazy evaluation nature.
fashion = sc.textFile('Data/Reviews/fashion.json')
electronics = sc.textFile('Data/Reviews/electronics.json')
sports = sc.textFile('Data/Reviews/sports.json')
Note that it does not move the data at this stage due to the lazy evaluation nature. We do some data exploration.
print "fashion has {0} rows, electronics {1} rows and sports {2} rows".format(fashion.count(), electronics.count(), sports.count())
print "fashion first row:"
fashion.first()
fashion has 10000 rows, electronics 10000 rows and sports 10000 rows fashion first row:
u'{"reviewerID": "A2XVJBSRI3SWDI", "asin": "0000031887", "reviewerName": "abigail", "helpful": [0, 0], "reviewText": "Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.", "overall": 5.0, "summary": "Nice tutu", "unixReviewTime": 1383523200, "reviewTime": "11 4, 2013"}'
We can union them.
union_of_rdds = fashion.union(electronics).union(sports)
print union_of_rdds.first()
{"reviewerID": "A2XVJBSRI3SWDI", "asin": "0000031887", "reviewerName": "abigail", "helpful": [0, 0], "reviewText": "Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.", "overall": 5.0, "summary": "Nice tutu", "unixReviewTime": 1383523200, "reviewTime": "11 4, 2013"}
We can now parse the file using the json library.
import json
parsed_fashion = fashion.map(lambda x: json.loads(x))
parsed_fashion.first()
{u'asin': u'0000031887', u'helpful': [0, 0], u'overall': 5.0, u'reviewText': u'Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.', u'reviewTime': u'11 4, 2013', u'reviewerID': u'A2XVJBSRI3SWDI', u'reviewerName': u'abigail', u'summary': u'Nice tutu', u'unixReviewTime': 1383523200}
Another way of loading files is by using a list of comma-separated file paths or a wildcard.
data = sc.textFile('Data/Reviews/fashion.json,Data/Reviews/electronics.json,Data/Reviews/sports.json').map(lambda x: json.loads(x))
# QUESTION: How many partitions does the rdd have?
data.getNumPartitions()
3
Now let's imagine we want to know the number of lines in each partition. For that, we need to access the data in each single partition and run operations on them instead of on each row.
For this, we will use mapPartitionsWithIndex which takes a partition index and an iterator over the data as arguments. Each function in the API is documented in: https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD
indexed_data = data.mapPartitionsWithIndex(lambda splitIndex, it: [(splitIndex, len([x for x in it]))])
indexed_data.collect()
[(0, 10000), (1, 10000), (2, 10000)]
The next thing we have been tasked to do is to get the total number of reviews per product.
product_num = data.map(lambda x: (x['asin'], 1)).reduceByKey(lambda x,y: x+y)
# The rdd product_num will contain (product_asin, total_number_reviews)
# What are the maximum and minimum number of reviews?
max_num = product_num.map(lambda x: x[1]).max()
min_num = product_num.map(lambda x: x[1]).min()
print "Max number of reviews is {0}, min number of reviews is {1}".format(max_num, min_num)
Max number of reviews is 2033, min number of reviews is 1
EXERCISE: what is the max score for each product?
We want to join the product reviews by users to the product metadata.
product_metadata = sc.textFile('Data/Products/sample_metadata.json').map(lambda x: json.loads(x))
print product_metadata.first()
{u'asin': u'0000037214', u'title': u'Purple Sequin Tiny Dancer Tutu Ballet Dance Fairy Princess Costume Accessory', u'price': 6.99, u'imUrl': u'http://ecx.images-amazon.com/images/I/31mCncNuAZL.jpg', u'related': {u'also_viewed': [u'B00JO8II76', u'B00DGN4R1Q', u'B00E1YRI4C']}, u'salesRank': {u'Clothing': 1233557}, u'brand': u'Big Dreams', u'categories': [[u'Clothing, Shoes & Jewelry', u'Girls'], [u'Clothing, Shoes & Jewelry', u'Novelty, Costumes & More', u'Costumes & Accessories', u'More Accessories', u'Kids & Baby']]}
def flatten_categories(line):
old_cats = line['categories']
line['categories'] = [item for sublist in old_cats for item in sublist]
return line
product_metadata = product_metadata.map(lambda x: flatten_categories(x))
We want to join the review data to the metadata about the product. We can use the 'asin' for that, which is a unique identifier for each product. In order to do a join, we need to turn each structure into key-value pairs.
key_val_data = data.map(lambda x: (x['asin'], x))
key_val_metadata = product_metadata.map(lambda x: (x['asin'], x))
print "We are joining {0} product reviews to {1} rows of metadata information about the products".format(key_val_data.count(),key_val_metadata.count())
print "First row of key_val_data:"
print key_val_data.first()
We are joining 30000 product reviews to 2469 rows of metadata information about the products First row of key_val_data: (u'0000031887', {u'reviewerID': u'A2XVJBSRI3SWDI', u'asin': u'0000031887', u'reviewerName': u'abigail', u'helpful': [0, 0], u'reviewText': u'Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.', u'overall': 5.0, u'summary': u'Nice tutu', u'unixReviewTime': 1383523200, u'reviewTime': u'11 4, 2013'})
print "number partitions key_val_data: ",
print key_val_data.getNumPartitions()
print "number partitions key_val_metadata: ",
print key_val_metadata.getNumPartitions()
joined = key_val_data.join(key_val_metadata)
joined.first()
number partitions key_val_data: 3 number partitions key_val_metadata: 2
(u'8179050874', ({u'asin': u'8179050874', u'helpful': [0, 0], u'overall': 1.0, u'reviewText': u"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..", u'reviewTime': u'05 17, 2009', u'reviewerID': u'A1IQJSHCMW69O5', u'reviewerName': u'Jose Perez', u'summary': u'This is not for Bold is for Curve', u'unixReviewTime': 1242518400}, {u'asin': u'8179050874', u'categories': [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries'], u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg', u'salesRank': {u'Electronics': 324466}, u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'}))
What is the number of output partitions of the join? To understand this, the best is to refer back to the Pyspark source code: https://github.com/apache/spark/blob/branch-1.3/python/pyspark/join.py
# QUESTION: what is the number of partitions of the joined dataset?
print "There are {0} partitions".format(joined.getNumPartitions())
joined.take(2)
There are 5 partitions
[(u'8179050874', ({u'asin': u'8179050874', u'helpful': [0, 0], u'overall': 1.0, u'reviewText': u"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..", u'reviewTime': u'05 17, 2009', u'reviewerID': u'A1IQJSHCMW69O5', u'reviewerName': u'Jose Perez', u'summary': u'This is not for Bold is for Curve', u'unixReviewTime': 1242518400}, {u'asin': u'8179050874', u'categories': [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries'], u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg', u'salesRank': {u'Electronics': 324466}, u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'})), (u'8179050874', ({u'asin': u'8179050874', u'helpful': [0, 0], u'overall': 1.0, u'reviewText': u'Despite being sold specifically for the Blackberry Bold 9000, it simply doesn\'t fit a Blackberry Bold.The screen protector is about a third of a millimetre too wide. As a result, the chrome trim around the outside of the Blackberry prevents it from lying flat on the edges of the screen so it does not attach to the screen properly: there is always a 2-3 millimetres of "air margin" down either one or both sides.The problems are therefore:1. It looks ugly2. It will fill with dust3. Case-mate support have been messing me around for over a month now and I\'m beginning to suspect they are just hoping that I\'ll go away and stop annoying them. In other words, the tech support is as useless as the product...', u'reviewTime': u'03 26, 2009', u'reviewerID': u'A2HC8YQVZ4HMF5', u'reviewerName': u'Wowbagger the Infinitely Prolonged', u'summary': u"Doesn't even fit the screen...", u'unixReviewTime': 1238025600}, {u'asin': u'8179050874', u'categories': [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries'], u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg', u'salesRank': {u'Electronics': 324466}, u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'}))]
To make it easier to manipulate, we will change the structure of the joined rdd to be a single dictionary.
def merge_dictionaries(metadata_line, review_line):
new_dict = review_line
new_dict.update(metadata_line)
return new_dict
nice_joined = joined.map(lambda x: merge_dictionaries(x[1][0], x[1][1]))
nice_joined.take(2)
[{u'asin': u'8179050874', u'categories': [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries'], u'helpful': [0, 0], u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg', u'overall': 1.0, u'reviewText': u"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..", u'reviewTime': u'05 17, 2009', u'reviewerID': u'A1IQJSHCMW69O5', u'reviewerName': u'Jose Perez', u'salesRank': {u'Electronics': 324466}, u'summary': u'This is not for Bold is for Curve', u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!', u'unixReviewTime': 1242518400}, {u'asin': u'8179050874', u'categories': [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries'], u'helpful': [0, 0], u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg', u'overall': 1.0, u'reviewText': u'Despite being sold specifically for the Blackberry Bold 9000, it simply doesn\'t fit a Blackberry Bold.The screen protector is about a third of a millimetre too wide. As a result, the chrome trim around the outside of the Blackberry prevents it from lying flat on the edges of the screen so it does not attach to the screen properly: there is always a 2-3 millimetres of "air margin" down either one or both sides.The problems are therefore:1. It looks ugly2. It will fill with dust3. Case-mate support have been messing me around for over a month now and I\'m beginning to suspect they are just hoping that I\'ll go away and stop annoying them. In other words, the tech support is as useless as the product...', u'reviewTime': u'03 26, 2009', u'reviewerID': u'A2HC8YQVZ4HMF5', u'reviewerName': u'Wowbagger the Infinitely Prolonged', u'salesRank': {u'Electronics': 324466}, u'summary': u"Doesn't even fit the screen...", u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!', u'unixReviewTime': 1238025600}]
A couple of questions to probe your understanding of Spark
# Testing Spark understanding
# QUESTION: if I run this, what will it print?
def change_title(line):
line['title'] = 'this is the title'
return line
categories = nice_joined.map(lambda x: change_title(x))
# ANSWER:
print categories.map(lambda x: x['title']).first()
this is the title
# QUESTION: if I run this, what will be the title of the first row?
nice_joined.map(lambda x: x['title']).first()
u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'
# QUESTION: if I run this, what will it print?
def get_first_category(line):
line['categories'] = line['categories'][0]
return line
print "BEFORE"
print "the categories in the first 2 fields are: "
nice_joined.map(lambda x: x['categories']).take(2)
BEFORE the categories in the first 2 fields are:
[[u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries'], [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries']]
# QUESTION: if I run this, what will it print?
print "A x['title']).first()FTER"
nice_joined.map(lambda x: get_first_category(x)).map(lambda x: x['categories']).take(2)
A x['title']).first()FTER
[u'Electronics', u'E']
What if we cache nice_joined first?
nice_joined.cache()
nice_joined.count()
print "AFTER CACHING"
nice_joined.map(lambda x: get_first_category(x)).map(lambda x: x['categories']).take(2)
AFTER CACHING
[u'Electronics', u'Electronics']
Now that we have joined two data sources, we can start doing some ad-hoc analysis of the data! Let's start by counting the number of reviews per category. The categories are encoded as a list of categories, so we need to count 1 for each 'sub-category'.
nice_joined.first()
nice_joined.cache()
nice_joined.count()
30000
# We want to get the distinct number of categories
all_categories = nice_joined.flatMap(lambda x: x['categories'])
print "all_categories.take(5): ",
print all_categories.take(5)
num_categories = all_categories.distinct().count()
print
print "There are {0} categories.".format(num_categories)
all_categories.take(5): [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries', u'Electronics'] There are 925 categories.
We are going to take the categories in each review and count them as being reviewed once.
category_count = nice_joined.flatMap(lambda x: [(y,1) for y in x['categories']])
category_total_count = category_count.reduceByKey(lambda x,y: x+y)
print category_total_count.take(10)
[(u'Screen Protectors', 10), (u'Jazz', 166), (u'Stands', 6), (u'Touch Screen Tablet Accessories', 320), (u'Bike Baskets', 1), (u'Thongs', 1), (u'Rink Equipment', 199), (u'Gun Safes & Cabinets', 3), (u'Soft Boxes', 2), (u'Video Games & Accessories', 17)]
sorted_categories = sorted(category_total_count.collect(), key=lambda x: x[1], reverse=True)
print "The top 5 categories are:"
print sorted_categories[:5]
The top 5 categories are: [(u'Clothing, Shoes & Jewelry', 23983), (u'Sports & Outdoors', 10443), (u'Electronics', 10367), (u'Novelty, Costumes & More', 6384), (u'Men', 4880)]
Next, we have been tasked to get the average product review length for each category. We can solve this using groupByKey!
category_review = nice_joined.flatMap(lambda x: [(y, len(x['reviewText'])) for y in x['categories']])
print "After the flatMap: " + str(category_review.first())
print "After the groupByKey: " + str(category_review.groupByKey().map(lambda x: (x[0], list(x[1]))).first())
print
grouped_category_review = category_review.groupByKey().map(lambda x: (x[0], sum(x[1])/float(len(x[1]))))
print "grouped_category_review.first(): " + str(grouped_category_review.first())
### Now we can sort the categories by average product review length
print "The top 10 categories are: " + str(sorted(grouped_category_review.collect(), key=lambda x: x[1], reverse=True)[:10])
After the flatMap: (u'Electronics', 293) After the groupByKey: (u'Screen Protectors', [191, 135, 135, 782, 782, 782, 446, 446, 446, 85]) grouped_category_review.first(): (u'Screen Protectors', 423.0) The top 10 categories are: [(u'Photos', 7570.0), (u'Bags, Packs & Accessories', 6411.0), (u'Rifles', 5079.888888888889), (u'Motets', 3404.0), (u'Free-Weight Racks', 3404.0), (u'Weight Racks', 3404.0), (u'Magnificats', 3404.0), (u'Sonatinas', 3239.2), (u'Sonatas', 3239.2), (u'Rugby', 3156.0)]
EXERCISE: Do the same thing, but this time you are not allowed to use groupByKey()!
from math import exp
from datetime import datetime
def get_part_index(splitIndex, iterator):
for it in iterator:
yield (splitIndex, it)
def count_elements(splitIndex, iterator):
n = sum(1 for _ in iterator)
yield (splitIndex, n)
print "***Creating the large rdd***"
num_parts = 16
# create the large skewed rdd
skewed_large_rdd = sc.parallelize(range(0,num_parts), num_parts).flatMap(lambda x: range(0, int(exp(x)))).mapPartitionsWithIndex(lambda ind, x: get_part_index(ind, x)).cache()
print "first 5 items:" + str(skewed_large_rdd.take(5))
print "num rows: " + str(skewed_large_rdd.count())
print "num partitions: " + str(skewed_large_rdd.getNumPartitions())
print "The distribution of elements per partition is " + str(skewed_large_rdd.mapPartitionsWithIndex(lambda ind, x: count_elements(ind, x)).collect())
print
print "***Creating the small rdd***"
small_rdd = sc.parallelize(range(0,num_parts), num_parts).map(lambda x: (x, x))
print "first 5 items:" + str(small_rdd.take(5))
print "num rows: " + str(small_rdd.count())
print "num partitions: " + str(small_rdd.getNumPartitions())
print "The distribution of elements per partition is " + str(small_rdd.mapPartitionsWithIndex(lambda ind, x: count_elements(ind, x)).collect())
print
print "Joining them"
t0 = datetime.now()
result = skewed_large_rdd.leftOuterJoin(small_rdd)
result.count()
print "The direct join takes %s"%(str(datetime.now() - t0))
print "The joined rdd has {0} partitions and {1} rows".format(result.getNumPartitions(), result.count())
***Creating the large rdd*** first 5 items:[(0, 0), (1, 0), (1, 1), (2, 0), (2, 1)] num rows: 5171502 num partitions: 16 The distribution of elements per partition is [(0, 1), (1, 2), (2, 7), (3, 20), (4, 54), (5, 148), (6, 403), (7, 1096), (8, 2980), (9, 8103), (10, 22026), (11, 59874), (12, 162754), (13, 442413), (14, 1202604), (15, 3269017)] ***Creating the small rdd*** first 5 items:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)] num rows: 16 num partitions: 16 The distribution of elements per partition is [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1), (10, 1), (11, 1), (12, 1), (13, 1), (14, 1), (15, 1)] Joining them The direct join takes 0:00:30.362042 The joined rdd has 32 partitions and 5171502 rows
import sklearn
import pickle
model = pickle.load(open('Data/classifiers/classifier.pkl', 'r'))
model_b = sc.broadcast(model)
fashion.map(lambda x: eval(x)['reviewText']).map(lambda x: (x, model_b.value.predict([x])[0])).first()
('Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.', 'fashion')
This is the latter part of the tutorial. The main focus will be on Spark DataFrames and Spark SQL.
review_filepaths = 'Data/Reviews/*'
textRDD = sc.textFile(review_filepaths)
print 'number of reviews : {0}'.format(textRDD.count())
print 'sample row : \n{0}'.format(textRDD.first())
number of reviews : 30000 sample row : {"reviewerID": "AKM1MP6P0OYPR", "asin": "0132793040", "reviewerName": "Vicki Gibson \"momo4\"", "helpful": [1, 1], "reviewText": "Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.", "overall": 5.0, "summary": "Very thorough", "unixReviewTime": 1365811200, "reviewTime": "04 13, 2013"}
A DataFrame requires schema. There are two main functions that can be used to assign schema into an RDD.
# You need SQL context do
from pyspark.sql import SQLContext
# Instantiate SQL Context
sqc = SQLContext(sc)
print sqc
<pyspark.sql.context.SQLContext object at 0x10951d590>
inferredDF = sqc.jsonFile(review_filepaths)
inferredDF.first()
Row(asin=u'0132793040', helpful=[1, 1], overall=5.0, reviewText=u'Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.', reviewTime=u'04 13, 2013', reviewerID=u'AKM1MP6P0OYPR', reviewerName=u'Vicki Gibson "momo4"', summary=u'Very thorough', unixReviewTime=1365811200)
inferredDF.printSchema()
root |-- asin: string (nullable = true) |-- helpful: array (nullable = true) | |-- element: long (containsNull = true) |-- overall: double (nullable = true) |-- reviewText: string (nullable = true) |-- reviewTime: string (nullable = true) |-- reviewerID: string (nullable = true) |-- reviewerName: string (nullable = true) |-- summary: string (nullable = true) |-- unixReviewTime: long (nullable = true)
The Documentation about different data types can be found at Spark SQL DataTypes section
# Export the modules
from pyspark.sql.types import *
# Define Schema
REVIEWS_SCHEMA_DEF = StructType([
StructField('reviewerID', StringType(), True),
StructField('asin', StringType(), True),
StructField('reviewerName', StringType(), True),
StructField('helpful', ArrayType(
IntegerType(), True),
True),
StructField('reviewText', StringType(), True),
StructField('reviewTime', StringType(), True),
StructField('overall', DoubleType(), True),
StructField('summary', StringType(), True),
StructField('unixReviewTime', LongType(), True)
])
print REVIEWS_SCHEMA_DEF
StructType(List(StructField(reviewerID,StringType,true),StructField(asin,StringType,true),StructField(reviewerName,StringType,true),StructField(helpful,ArrayType(IntegerType,true),true),StructField(reviewText,StringType,true),StructField(reviewTime,StringType,true),StructField(overall,DoubleType,true),StructField(summary,StringType,true),StructField(unixReviewTime,LongType,true)))
# Using a handcrafted schema with to create a DataFrame
appliedDF = sqlContext.jsonFile(review_filepaths,schema=REVIEWS_SCHEMA_DEF)
appliedDF.first()
Row(reviewerID=u'AKM1MP6P0OYPR', asin=u'0132793040', reviewerName=u'Vicki Gibson "momo4"', helpful=[1, 1], reviewText=u'Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.', reviewTime=u'04 13, 2013', overall=5.0, summary=u'Very thorough', unixReviewTime=1365811200)
Spark DataFrame API allow you to do multiple operations on the Data. The primary advantage of using the DataFrame API is that you can do data transoformations with the high level API without having to use Python. Using the high level API has its advantages which will be explained later in the tutorial.
DataFrame API have functionality similar to that of Core RDD API. For example:
You can use SELECT statement to select columns from your dataframe
columnDF = appliedDF.select(appliedDF.asin,
appliedDF.overall,
appliedDF.reviewText,
appliedDF.helpful[0]/appliedDF.helpful[1],
appliedDF.reviewerID,
appliedDF.unixReviewTime).\
withColumnRenamed('(helpful[0] / helpful[1])','helpful')
columnDF.show()
asin overall reviewText helpful reviewerID unixReviewTime 0132793040 5.0 Corey Barker does... 1.0 AKM1MP6P0OYPR 1365811200 0321732944 5.0 While many beginn... null A2CX7LUOHB2NDG 1341100800 0439886341 1.0 It never worked. ... 1.0 A2NWSAGRHCP8N5 1367193600 0439886341 3.0 Some of the funct... 1.0 A2WNBOD3WNDNKT 1374451200 0439886341 1.0 Do not waste your... 1.0 A1GI0U4ZRJA8WN 1334707200 0511189877 5.0 Dog got the old r... null A1QGNMC6O1VW39 1397433600 0511189877 2.0 This remote, for ... 1.0 A3J3BRHTDRFJ2G 1397433600 0511189877 5.0 We had an old Tim... 0.0 A2TY0BTJOTENPG 1395878400 0511189877 5.0 This unit works j... null A34ATBPOK6HCHY 1395532800 0511189877 5.0 It is an exact du... null A89DO69P0XZ27 1395446400 0511189877 5.0 Works on my t.v. ... 0.0 AZYNQZ94U6VDB 1401321600 0528881469 5.0 Love it has every... null A1DA3W4GTFXP6O 1405641600 0528881469 1.0 I have owned two ... null A29LPQQDG7LD5J 1352073600 0528881469 5.0 We got this GPS f... null AO94DHGC771SJ 1370131200 0528881469 1.0 I'm a professiona... 0.8 AMO214LNFCEI4 1290643200 0528881469 4.0 This is a great t... 0.9545454545454546 A28B1G1MSJ6OO1 1280016000 0528881469 3.0 Well, what can I ... 0.9555555555555556 A3N7T0DY83Y4IG 1283990400 0528881469 2.0 Not going to writ... 0.9 A1H8PY3QHMQQA0 1290556800 0528881469 2.0 My brother is a t... 0.71875 A2CPBQ5W4OGBX 1277078400 0528881469 4.0 This unit is a fa... 1.0 A265MKAR2WEH3Y 1294790400
Similar to Pandas, DataFrames come equipped with functions to address missing data.
# get null observations out
densedDF=columnDF.dropna(subset=["overall"]).fillna(0.0,subset=["helpful"])
densedDF.show()
asin overall reviewText helpful reviewerID unixReviewTime 0132793040 5.0 Corey Barker does... 1.0 AKM1MP6P0OYPR 1365811200 0321732944 5.0 While many beginn... 0.0 A2CX7LUOHB2NDG 1341100800 0439886341 1.0 It never worked. ... 1.0 A2NWSAGRHCP8N5 1367193600 0439886341 3.0 Some of the funct... 1.0 A2WNBOD3WNDNKT 1374451200 0439886341 1.0 Do not waste your... 1.0 A1GI0U4ZRJA8WN 1334707200 0511189877 5.0 Dog got the old r... 0.0 A1QGNMC6O1VW39 1397433600 0511189877 2.0 This remote, for ... 1.0 A3J3BRHTDRFJ2G 1397433600 0511189877 5.0 We had an old Tim... 0.0 A2TY0BTJOTENPG 1395878400 0511189877 5.0 This unit works j... 0.0 A34ATBPOK6HCHY 1395532800 0511189877 5.0 It is an exact du... 0.0 A89DO69P0XZ27 1395446400 0511189877 5.0 Works on my t.v. ... 0.0 AZYNQZ94U6VDB 1401321600 0528881469 5.0 Love it has every... 0.0 A1DA3W4GTFXP6O 1405641600 0528881469 1.0 I have owned two ... 0.0 A29LPQQDG7LD5J 1352073600 0528881469 5.0 We got this GPS f... 0.0 AO94DHGC771SJ 1370131200 0528881469 1.0 I'm a professiona... 0.8 AMO214LNFCEI4 1290643200 0528881469 4.0 This is a great t... 0.9545454545454546 A28B1G1MSJ6OO1 1280016000 0528881469 3.0 Well, what can I ... 0.9555555555555556 A3N7T0DY83Y4IG 1283990400 0528881469 2.0 Not going to writ... 0.9 A1H8PY3QHMQQA0 1290556800 0528881469 2.0 My brother is a t... 0.71875 A2CPBQ5W4OGBX 1277078400 0528881469 4.0 This unit is a fa... 1.0 A265MKAR2WEH3Y 1294790400
Filtering lets you select rows based on arguments. The implementation pattern is similar to filtering RDDs, But simpler.
filteredDF=densedDF.filter(densedDF.overall>=3)
filteredDF.show()
asin overall reviewText helpful reviewerID unixReviewTime 0132793040 5.0 Corey Barker does... 1.0 AKM1MP6P0OYPR 1365811200 0321732944 5.0 While many beginn... 0.0 A2CX7LUOHB2NDG 1341100800 0439886341 3.0 Some of the funct... 1.0 A2WNBOD3WNDNKT 1374451200 0511189877 5.0 Dog got the old r... 0.0 A1QGNMC6O1VW39 1397433600 0511189877 5.0 We had an old Tim... 0.0 A2TY0BTJOTENPG 1395878400 0511189877 5.0 This unit works j... 0.0 A34ATBPOK6HCHY 1395532800 0511189877 5.0 It is an exact du... 0.0 A89DO69P0XZ27 1395446400 0511189877 5.0 Works on my t.v. ... 0.0 AZYNQZ94U6VDB 1401321600 0528881469 5.0 Love it has every... 0.0 A1DA3W4GTFXP6O 1405641600 0528881469 5.0 We got this GPS f... 0.0 AO94DHGC771SJ 1370131200 0528881469 4.0 This is a great t... 0.9545454545454546 A28B1G1MSJ6OO1 1280016000 0528881469 3.0 Well, what can I ... 0.9555555555555556 A3N7T0DY83Y4IG 1283990400 0528881469 4.0 This unit is a fa... 1.0 A265MKAR2WEH3Y 1294790400 0528881469 5.0 I did a lot of co... 1.0 A37K02NKUIT68K 1293235200 0528881469 4.0 I purchased this ... 0.5 A2AW1SSVUIYV9Y 1289001600 0528881469 5.0 EXCELLENT. BEST T... 0.7142857142857143 A2AEHUKOV014BP 1284249600 0528881469 4.0 Well as one of th... 1.0 A2O8FIJR9EBU56 1278547200 0528881469 4.0 Was fast and what... 0.0 AYTBGUX49LF3W 1398470400 0528881469 5.0 We had the GPS fo... 0.0 A1E4WG8HRWWK4R 1390867200 0528881469 5.0 Back in the old d... 0.5 A2AOEW5UGXFOOQ 1294790400
Grouping is equivalent to the groupByKey in the core RDD API. You can transform the grouped values using a summary action such as:
grouped = filteredDF.groupBy("overall").count()
grouped.show()
overall count 3.0 2128 5.0 18503 4.0 5324
You can join two DataFrames together by using a common key column.
product_filepaths = 'Data/Products/*'
productRDD = sc.textFile(product_filepaths)
productRDD.first()
u'{"asin": "0000037214", "title": "Purple Sequin Tiny Dancer Tutu Ballet Dance Fairy Princess Costume Accessory", "price": 6.9900000000000002, "imUrl": "http://ecx.images-amazon.com/images/I/31mCncNuAZL.jpg", "related": {"also_viewed": ["B00JO8II76", "B00DGN4R1Q", "B00E1YRI4C"]}, "salesRank": {"Clothing": 1233557}, "brand": "Big Dreams", "categories": [["Clothing, Shoes & Jewelry", "Girls"], ["Clothing, Shoes & Jewelry", "Novelty, Costumes & More", "Costumes & Accessories", "More Accessories", "Kids & Baby"]]}'
# Load Dataset2 : Amazon Product information
# First, define Schema for second Dataset
PRODUCTS_SCHEMA_DEF = StructType([
StructField('asin', StringType(), True),
StructField('title', StringType(), True),
StructField('price', DoubleType(), True),
StructField('categories', ArrayType(ArrayType(
StringType(), True),True),True)
])
# Load the dataset
productDF = sqc.jsonFile(product_filepaths,PRODUCTS_SCHEMA_DEF)
productDF.show()
# productDF.first()
asin title price categories 0000037214 Purple Sequin Tin... 6.99 ArrayBuffer(Array... 0000032069 Adult Ballet Tutu... 7.89 ArrayBuffer(Array... 0000031909 Girls Ballet Tutu... 7.0 ArrayBuffer(Array... 0000032034 Adult Ballet Tutu... 7.87 ArrayBuffer(Array... 0000031852 Girls Ballet Tutu... 3.17 ArrayBuffer(Array... 0000032050 Adult Ballet Tutu... 12.85 ArrayBuffer(Array... 0000031887 Ballet Dress-Up F... 6.79 ArrayBuffer(Array... 0000031895 Girls Ballet Tutu... 2.99 ArrayBuffer(Array... 0123456479 SHINING IMAGE HUG... 64.98 ArrayBuffer(Array... 0132793040 Kelby Training DV... null ArrayBuffer(Array... 0188477284 Klean Kanteen Cla... null ArrayBuffer(Array... 0321732944 Kelby Training DV... null ArrayBuffer(Array... 0439886341 Digital Organizer... 8.15 ArrayBuffer(Array... 0456844570 RiZ Women's Beaut... null ArrayBuffer(Array... 0456808574 Lantin White Viso... null ArrayBuffer(Array... 0456830197 NVC Unisex Light ... null ArrayBuffer(Array... 0456856293 Kismeth Eyewear C... null ArrayBuffer(Array... 0456840532 Max-MPH Black - L... null ArrayBuffer(Array... 0456787283 FX1 Small Adult A... null ArrayBuffer(Array... 0456838384 Riz Small Unisex ... null ArrayBuffer(Array...
QUESTION: What do you think will happen if we remove some fields from this schema?
ANSWER???
Now lets join the two datasets
enrichedReviews = filteredDF.join(productDF, productDF.asin==filteredDF.asin).dropna(subset="title")
enrichedReviews.count()
25566L
When you join two RDDs, you have to restructure the data into (k,V) pairs where the key is the join key. This may involve two additional map transformations. This is not necessary in DataFrames.
enrichedReviews
DataFrame[asin: string, overall: double, reviewText: string, helpful: double, reviewerID: string, unixReviewTime: bigint, asin: string, title: string, price: double, categories: array<array<string>>]
enrichedReviews.show()
asin overall reviewText helpful reviewerID unixReviewTime asin title price categories 9983782030 5.0 I purchased this ... 0.0 A2G6OTE5JOZHOM 1320192000 9983782030 PREMIUM USB Adapt... null ArrayBuffer(Array... 9983782030 5.0 Item arrived as d... 1.0 A2AEZK0K9CFVE6 1290211200 9983782030 PREMIUM USB Adapt... null ArrayBuffer(Array... 998498480X 5.0 The charger that ... 0.0 ABB3GDPNS9TVW 1365292800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 3.0 I got this charge... 0.0 A32NKAW0Q7A9IQ 1290729600 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 5.0 Garmin Nuvi 800 s... 0.0 A1GPVV2XWNGSDX 1303776000 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 4.0 I plan on using m... 0.0 AGZ5E33UY5L7S 1290038400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 5.0 Sometimes I need ... 0.0 A2XOVBAMQ8J0YV 1325808000 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 4.0 Turns on the GPS ... 0.0 A1VCXIYTPCE6W0 1395964800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 4.0 This plug works w... 0.0 A13KD5E4V2VPTO 1363996800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 4.0 My GPS device did... 1.0 A3AGNJNG7CFVZK 1275782400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 5.0 It worked perfect... 0.0 A1B48D0I1M9HBI 1388534400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 3.0 Did not seem to m... 0.0 A1996ALUR26ETP 1388275200 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 5.0 We bought this fo... 0.0 A24JJYOP041KMM 1383436800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 5.0 It's a wall wart.... 1.0 A1DOFBSJT3SSKZ 1269561600 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 5.0 The charger was r... 0.0 ADTXKJ76N9S6W 1272326400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 5.0 This GPS is great... 0.0 A3D1ZRIDPWDM1V 1287100800 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 5.0 I used it once. W... 0.0 A3R2M5ABAYL63P 1291507200 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 3.0 I really liked th... 0.0 A1QT8GM0WW02Z5 1287878400 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 5.0 The thing I liked... 0.0 A3RYHP2OYFJ3YC 1397779200 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array... 998498480X 4.0 I really don't ha... 0.0 A261321MIS0YPB 1290729600 998498480X Garmin Nuvi 855 S... 3.77 ArrayBuffer(Array...
Now that we have done some operations on the data, we can save the file for later use. Standard data formats are a great way to opening up valuable data to your entire organization. Spark DataFrames can be saved in many different formats including and not limited to JSON, parquet, Hive and etc...
try:
columnDF.saveAsParquetFile('Data/Outputs/reviews_filtered.parquet')
except:
pass
print "Saved as parquet successfully"
Saved as parquet successfully
Spark DataFrames also allow you to use Spark SQL to query from Petabytes of data. Spark comes with a SQL like query language which can be used to query from Distributed DataFrames. A key advantage of using Spark SQL is that the Catelyst query optimizer under the hood transforms your SQL query to run it most efficiently.
Spark SQL can leverage the same functionality as the DataFrame API provides. In fact, it provides more functionality via SQL capabilities and HQL capabilities that are available to Spark SQL environment.
For the sake of time constrains, I will explain different functions available in Spark SQL environment by using examples that use multiple functions. This will benefit by:
# Read the reviews parquet file
reviewsDF = sqc.parquetFile('Data/Outputs/reviews_filtered.parquet')
# Register the DataFrames to be used in sql
reviewsDF.registerAsTable("reviews")
productDF.registerAsTable("products")
print 'There are {0} reviews about {1} products'.format(reviewsDF.count(),productDF.count())
There are 30000 reviews about 2469 products
sql_query = """SELECT reviews.asin, overall, reviewText, price
FROM reviews JOIN products ON reviews.asin=products.asin
WHERE price > 50.00
"""
result = sqc.sql(sql_query)
result.show()
asin overall reviewText price 0123456479 5.0 Very simple...I l... 64.98 0123456479 5.0 This was my gran... 64.98 0123456479 4.0 I was looking for... 64.98 0123456479 5.0 I absolutely LOVE... 64.98 0123456479 5.0 This was a gift f... 64.98 0123456479 4.0 I love the produc... 64.98 0123456479 5.0 This was everythi... 64.98 0123456479 5.0 This jewelry box ... 64.98 0123456479 5.0 I have ordered th... 64.98 0123456479 4.0 This is a nice je... 64.98 0123456479 5.0 The minute I saw ... 64.98 0123456479 5.0 This jewelry box ... 64.98 0123456479 4.0 I love pink, and ... 64.98 0123456479 5.0 This was a great ... 64.98 0123456479 4.0 This is probably ... 64.98 0123456479 5.0 Love this Jewelry... 64.98 0123456479 5.0 I got this jewelr... 64.98 0123456479 5.0 This products is ... 64.98 0123456479 3.0 I already own thi... 64.98 0123456479 5.0 I love this jewel... 64.98
Spark SQL also provides the functionality similar to User Defined Functions (UDF) offering in Hive. Spark uses registerFunction() function to register python functions in SQLContext.
import re
def transform_review(review):
x1 = re.sub('[^0-9a-zA-Z\s]+','',review)
return [x1.lower()]
result.registerAsTable("result")
sqc.registerFunction("to_lowercase", lambda x:transform_review(x),returnType=ArrayType(StringType(), True))
sql_query_transform = """SELECT asin, reviewText, to_lowercase(reviewText) as cleaned
FROM result
"""
result_transform = sqc.sql(sql_query_transform)
result_transform.show()
asin reviewText cleaned 0123456479 Very simple...I l... ArrayBuffer(very ... 0123456479 This was my gran... ArrayBuffer(this ... 0123456479 I was looking for... ArrayBuffer(i was... 0123456479 I absolutely LOVE... ArrayBuffer(i abs... 0123456479 This was a gift f... ArrayBuffer(this ... 0123456479 I love the produc... ArrayBuffer(i lov... 0123456479 This was everythi... ArrayBuffer(this ... 0123456479 This jewelry box ... ArrayBuffer(this ... 0123456479 I have ordered th... ArrayBuffer(i hav... 0123456479 This is a nice je... ArrayBuffer(this ... 0123456479 The minute I saw ... ArrayBuffer(the m... 0123456479 This jewelry box ... ArrayBuffer(this ... 0123456479 I love pink, and ... ArrayBuffer(i lov... 0123456479 This was a great ... ArrayBuffer(this ... 0123456479 This is probably ... ArrayBuffer(this ... 0123456479 Love this Jewelry... ArrayBuffer(love ... 0123456479 I got this jewelr... ArrayBuffer(i got... 0123456479 This products is ... ArrayBuffer(this ... 0123456479 I already own thi... ArrayBuffer(i alr... 0123456479 I love this jewel... ArrayBuffer(i lov...
You can also mix DataFrames, RDDs and SparkSQL to make it work for you.
We want to investigate the average rating of reviews in terms of the categories they belong to. In order to do this, we:
import sklearn
import pickle
from pyspark.sql import Row
model = pickle.load(open('Data/classifiers/classifier.pkl', 'r'))
classifier_b = sc.broadcast(model)
# fashion.map(lambda x: eval(x)['reviewText']).map(lambda x: (x, model_b.value.predict([x])[0])).first()
classifiedRDD = result_transform.map(lambda row:
(row.asin,row.reviewText,str(classifier_b.value.predict(row.cleaned)[0]))
)
classifiedRDD.first()
CLASSIFIED_SCHEMA = StructType([
StructField('asin', StringType(), True),
StructField('review', StringType(), True),
StructField('category', StringType(), True)
])
classifiedDF = sqc.createDataFrame(classifiedRDD,CLASSIFIED_SCHEMA)
classifiedDF.show()
asin review category 0123456479 Very simple...I l... fashion 0123456479 This was my gran... fashion 0123456479 I was looking for... fashion 0123456479 I absolutely LOVE... fashion 0123456479 This was a gift f... fashion 0123456479 I love the produc... fashion 0123456479 This was everythi... fashion 0123456479 This jewelry box ... fashion 0123456479 I have ordered th... fashion 0123456479 This is a nice je... fashion 0123456479 The minute I saw ... fashion 0123456479 This jewelry box ... fashion 0123456479 I love pink, and ... fashion 0123456479 This was a great ... sports 0123456479 This is probably ... fashion 0123456479 Love this Jewelry... fashion 0123456479 I got this jewelr... fashion 0123456479 This products is ... fashion 0123456479 I already own thi... fashion 0123456479 I love this jewel... fashion
classifiedDF.registerAsTable('enrichedReviews')
sql_query_test = """SELECT category, avg(overall) as avgRating
FROM reviews
JOIN products ON reviews.asin=products.asin
JOIN enrichedReviews ON products.asin=enrichedReviews.asin
WHERE price > 50.0
GROUP BY enrichedReviews.category
"""
resultTest = sqc.sql(sql_query_test)
resultTest.show()
category avgRating fashion 4.004614104227351 sports 4.284457321452837 electronics 3.7669524119042914