1. Creating an RDD

We start by creating the 3 RDDs from the different datasets from Amazon product reviews. The data has been kindly provided to us by Dr Julian McAuley and here is a reference of the original papers:

  • Image-based recommendations on styles and substitutes , J. McAuley, C. Targett, J. Shi, A. van den Hengel, SIGIR, 2015
  • Inferring networks of substitutable and complementary products, J. McAuley, R. Pandey, J. Leskovec, Knowledge Discovery and Data Mining, 2015

Note that it does not move the data at this stage due to the lazy evaluation nature.

In [1]:
fashion = sc.textFile('Data/Reviews/fashion.json')
electronics = sc.textFile('Data/Reviews/electronics.json')
sports = sc.textFile('Data/Reviews/sports.json')

Note that it does not move the data at this stage due to the lazy evaluation nature. We do some data exploration.

In [2]:
print "fashion has {0} rows, electronics {1} rows and sports {2} rows".format(fashion.count(), electronics.count(), sports.count())
print "fashion first row:"
fashion.first()
fashion has 10000 rows, electronics 10000 rows and sports 10000 rows
fashion first row:
Out[2]:
u'{"reviewerID": "A2XVJBSRI3SWDI", "asin": "0000031887", "reviewerName": "abigail", "helpful": [0, 0], "reviewText": "Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.", "overall": 5.0, "summary": "Nice tutu", "unixReviewTime": 1383523200, "reviewTime": "11 4, 2013"}'

We can union them.

In [3]:
union_of_rdds = fashion.union(electronics).union(sports)
print union_of_rdds.first()
{"reviewerID": "A2XVJBSRI3SWDI", "asin": "0000031887", "reviewerName": "abigail", "helpful": [0, 0], "reviewText": "Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.", "overall": 5.0, "summary": "Nice tutu", "unixReviewTime": 1383523200, "reviewTime": "11 4, 2013"}

We can now parse the file using the json library.

In [4]:
import json
parsed_fashion = fashion.map(lambda x: json.loads(x))
parsed_fashion.first()
Out[4]:
{u'asin': u'0000031887',
 u'helpful': [0, 0],
 u'overall': 5.0,
 u'reviewText': u'Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.',
 u'reviewTime': u'11 4, 2013',
 u'reviewerID': u'A2XVJBSRI3SWDI',
 u'reviewerName': u'abigail',
 u'summary': u'Nice tutu',
 u'unixReviewTime': 1383523200}

Another way of loading files is by using a list of comma-separated file paths or a wildcard.

In [5]:
data = sc.textFile('Data/Reviews/fashion.json,Data/Reviews/electronics.json,Data/Reviews/sports.json').map(lambda x: json.loads(x))

# QUESTION: How many partitions does the rdd have?
data.getNumPartitions()
Out[5]:
3

Now let's imagine we want to know the number of lines in each partition. For that, we need to access the data in each single partition and run operations on them instead of on each row.

For this, we will use mapPartitionsWithIndex which takes a partition index and an iterator over the data as arguments. Each function in the API is documented in: https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD

In [6]:
indexed_data = data.mapPartitionsWithIndex(lambda splitIndex, it: [(splitIndex, len([x for x in it]))])
indexed_data.collect()
Out[6]:
[(0, 10000), (1, 10000), (2, 10000)]

2. Reducers

The next thing we have been tasked to do is to get the total number of reviews per product.

In [7]:
product_num = data.map(lambda x: (x['asin'], 1)).reduceByKey(lambda x,y: x+y)
# The rdd product_num will contain (product_asin, total_number_reviews)

# What are the maximum and minimum number of reviews?
max_num = product_num.map(lambda x: x[1]).max()
min_num = product_num.map(lambda x: x[1]).min()

print "Max number of reviews is {0}, min number of reviews is {1}".format(max_num, min_num)
Max number of reviews is 2033, min number of reviews is 1

Alt text

EXERCISE: what is the max score for each product?

3. Joining multiple sources

We want to join the product reviews by users to the product metadata.

In [8]:
product_metadata = sc.textFile('Data/Products/sample_metadata.json').map(lambda x: json.loads(x))
print product_metadata.first()
{u'asin': u'0000037214', u'title': u'Purple Sequin Tiny Dancer Tutu Ballet Dance Fairy Princess Costume Accessory', u'price': 6.99, u'imUrl': u'http://ecx.images-amazon.com/images/I/31mCncNuAZL.jpg', u'related': {u'also_viewed': [u'B00JO8II76', u'B00DGN4R1Q', u'B00E1YRI4C']}, u'salesRank': {u'Clothing': 1233557}, u'brand': u'Big Dreams', u'categories': [[u'Clothing, Shoes & Jewelry', u'Girls'], [u'Clothing, Shoes & Jewelry', u'Novelty, Costumes & More', u'Costumes & Accessories', u'More Accessories', u'Kids & Baby']]}
In [9]:
def flatten_categories(line):
    old_cats = line['categories']
    line['categories'] = [item for sublist in old_cats for item in sublist]
    return line

product_metadata = product_metadata.map(lambda x: flatten_categories(x))

We want to join the review data to the metadata about the product. We can use the 'asin' for that, which is a unique identifier for each product. In order to do a join, we need to turn each structure into key-value pairs.

In [10]:
key_val_data = data.map(lambda x: (x['asin'], x))
key_val_metadata = product_metadata.map(lambda x: (x['asin'], x))

print "We are joining {0} product reviews to {1} rows of metadata information about the products".format(key_val_data.count(),key_val_metadata.count())
print "First row of key_val_data:"
print key_val_data.first()
We are joining 30000 product reviews to 2469 rows of metadata information about the products
First row of key_val_data:
(u'0000031887', {u'reviewerID': u'A2XVJBSRI3SWDI', u'asin': u'0000031887', u'reviewerName': u'abigail', u'helpful': [0, 0], u'reviewText': u'Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.', u'overall': 5.0, u'summary': u'Nice tutu', u'unixReviewTime': 1383523200, u'reviewTime': u'11 4, 2013'})
In [11]:
print "number partitions key_val_data: ", 
print key_val_data.getNumPartitions()
print "number partitions key_val_metadata: ", 
print key_val_metadata.getNumPartitions()

joined = key_val_data.join(key_val_metadata)
joined.first()
number partitions key_val_data:  3
number partitions key_val_metadata:  2
Out[11]:
(u'8179050874',
 ({u'asin': u'8179050874',
   u'helpful': [0, 0],
   u'overall': 1.0,
   u'reviewText': u"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..",
   u'reviewTime': u'05 17, 2009',
   u'reviewerID': u'A1IQJSHCMW69O5',
   u'reviewerName': u'Jose Perez',
   u'summary': u'This is not for Bold is for Curve',
   u'unixReviewTime': 1242518400},
  {u'asin': u'8179050874',
   u'categories': [u'Electronics',
    u'Computers & Accessories',
    u'Laptop & Netbook Computer Accessories',
    u'Batteries'],
   u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',
   u'salesRank': {u'Electronics': 324466},
   u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'}))

What is the number of output partitions of the join? To understand this, the best is to refer back to the Pyspark source code: https://github.com/apache/spark/blob/branch-1.3/python/pyspark/join.py

In [12]:
# QUESTION: what is the number of partitions of the joined dataset?

print "There are {0} partitions".format(joined.getNumPartitions())

joined.take(2)
There are 5 partitions
Out[12]:
[(u'8179050874',
  ({u'asin': u'8179050874',
    u'helpful': [0, 0],
    u'overall': 1.0,
    u'reviewText': u"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..",
    u'reviewTime': u'05 17, 2009',
    u'reviewerID': u'A1IQJSHCMW69O5',
    u'reviewerName': u'Jose Perez',
    u'summary': u'This is not for Bold is for Curve',
    u'unixReviewTime': 1242518400},
   {u'asin': u'8179050874',
    u'categories': [u'Electronics',
     u'Computers & Accessories',
     u'Laptop & Netbook Computer Accessories',
     u'Batteries'],
    u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',
    u'salesRank': {u'Electronics': 324466},
    u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'})),
 (u'8179050874',
  ({u'asin': u'8179050874',
    u'helpful': [0, 0],
    u'overall': 1.0,
    u'reviewText': u'Despite being sold specifically for the Blackberry Bold 9000, it simply doesn\'t fit a Blackberry Bold.The screen protector is about a third of a millimetre too wide. As a result, the chrome trim around the outside of the Blackberry prevents it from lying flat on the edges of the screen so it does not attach to the screen properly: there is always a 2-3 millimetres of "air margin" down either one or both sides.The problems are therefore:1.  It looks ugly2.  It will fill with dust3.  Case-mate support have been messing me around for over a month now and I\'m beginning to suspect they are just hoping that I\'ll go away and stop annoying them.  In other words, the tech support is as useless as the product...',
    u'reviewTime': u'03 26, 2009',
    u'reviewerID': u'A2HC8YQVZ4HMF5',
    u'reviewerName': u'Wowbagger the Infinitely Prolonged',
    u'summary': u"Doesn't even fit the screen...",
    u'unixReviewTime': 1238025600},
   {u'asin': u'8179050874',
    u'categories': [u'Electronics',
     u'Computers & Accessories',
     u'Laptop & Netbook Computer Accessories',
     u'Batteries'],
    u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',
    u'salesRank': {u'Electronics': 324466},
    u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'}))]

To make it easier to manipulate, we will change the structure of the joined rdd to be a single dictionary.

In [13]:
def merge_dictionaries(metadata_line, review_line):
    new_dict = review_line
    new_dict.update(metadata_line)
    return new_dict

nice_joined = joined.map(lambda x: merge_dictionaries(x[1][0], x[1][1]))
nice_joined.take(2)
Out[13]:
[{u'asin': u'8179050874',
  u'categories': [u'Electronics',
   u'Computers & Accessories',
   u'Laptop & Netbook Computer Accessories',
   u'Batteries'],
  u'helpful': [0, 0],
  u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',
  u'overall': 1.0,
  u'reviewText': u"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..",
  u'reviewTime': u'05 17, 2009',
  u'reviewerID': u'A1IQJSHCMW69O5',
  u'reviewerName': u'Jose Perez',
  u'salesRank': {u'Electronics': 324466},
  u'summary': u'This is not for Bold is for Curve',
  u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!',
  u'unixReviewTime': 1242518400},
 {u'asin': u'8179050874',
  u'categories': [u'Electronics',
   u'Computers & Accessories',
   u'Laptop & Netbook Computer Accessories',
   u'Batteries'],
  u'helpful': [0, 0],
  u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg',
  u'overall': 1.0,
  u'reviewText': u'Despite being sold specifically for the Blackberry Bold 9000, it simply doesn\'t fit a Blackberry Bold.The screen protector is about a third of a millimetre too wide. As a result, the chrome trim around the outside of the Blackberry prevents it from lying flat on the edges of the screen so it does not attach to the screen properly: there is always a 2-3 millimetres of "air margin" down either one or both sides.The problems are therefore:1.  It looks ugly2.  It will fill with dust3.  Case-mate support have been messing me around for over a month now and I\'m beginning to suspect they are just hoping that I\'ll go away and stop annoying them.  In other words, the tech support is as useless as the product...',
  u'reviewTime': u'03 26, 2009',
  u'reviewerID': u'A2HC8YQVZ4HMF5',
  u'reviewerName': u'Wowbagger the Infinitely Prolonged',
  u'salesRank': {u'Electronics': 324466},
  u'summary': u"Doesn't even fit the screen...",
  u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!',
  u'unixReviewTime': 1238025600}]

A couple of questions to probe your understanding of Spark

In [14]:
# Testing Spark understanding

# QUESTION: if I run this, what will it print?
def change_title(line):
    line['title'] = 'this is the title'
    return line

categories = nice_joined.map(lambda x: change_title(x))
In [15]:
# ANSWER:
print categories.map(lambda x: x['title']).first()
this is the title
In [16]:
# QUESTION: if I run this, what will be the title of the first row?
nice_joined.map(lambda x: x['title']).first()
Out[16]:
u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!'
In [17]:
# QUESTION: if I run this, what will it print?
def get_first_category(line):
    line['categories'] = line['categories'][0]
    return line

print "BEFORE"
print "the categories in the first 2 fields are: "
nice_joined.map(lambda x: x['categories']).take(2)
BEFORE
the categories in the first 2 fields are: 
Out[17]:
[[u'Electronics',
  u'Computers & Accessories',
  u'Laptop & Netbook Computer Accessories',
  u'Batteries'],
 [u'Electronics',
  u'Computers & Accessories',
  u'Laptop & Netbook Computer Accessories',
  u'Batteries']]
In [18]:
# QUESTION: if I run this, what will it print?
print "A x['title']).first()FTER"
nice_joined.map(lambda x: get_first_category(x)).map(lambda x: x['categories']).take(2)
A x['title']).first()FTER
Out[18]:
[u'Electronics', u'E']

What if we cache nice_joined first?

In [19]:
nice_joined.cache()
nice_joined.count()

print "AFTER CACHING"
nice_joined.map(lambda x: get_first_category(x)).map(lambda x: x['categories']).take(2)
AFTER CACHING
Out[19]:
[u'Electronics', u'Electronics']

4. GroupByKey

Now that we have joined two data sources, we can start doing some ad-hoc analysis of the data! Let's start by counting the number of reviews per category. The categories are encoded as a list of categories, so we need to count 1 for each 'sub-category'.

In [20]:
nice_joined.first()
nice_joined.cache()
nice_joined.count()
Out[20]:
30000
In [21]:
# We want to get the distinct number of categories
all_categories = nice_joined.flatMap(lambda x: x['categories'])
print "all_categories.take(5): ",
print all_categories.take(5)
num_categories = all_categories.distinct().count()
print

print "There are {0} categories.".format(num_categories)
all_categories.take(5):  [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries', u'Electronics']

There are 925 categories.

We are going to take the categories in each review and count them as being reviewed once.

In [22]:
category_count = nice_joined.flatMap(lambda x: [(y,1) for y in x['categories']])
category_total_count = category_count.reduceByKey(lambda x,y: x+y)
print category_total_count.take(10)
[(u'Screen Protectors', 10), (u'Jazz', 166), (u'Stands', 6), (u'Touch Screen Tablet Accessories', 320), (u'Bike Baskets', 1), (u'Thongs', 1), (u'Rink Equipment', 199), (u'Gun Safes & Cabinets', 3), (u'Soft Boxes', 2), (u'Video Games & Accessories', 17)]
In [23]:
sorted_categories = sorted(category_total_count.collect(), key=lambda x: x[1], reverse=True)
print "The top 5 categories are:"
print sorted_categories[:5]
The top 5 categories are:
[(u'Clothing, Shoes & Jewelry', 23983), (u'Sports & Outdoors', 10443), (u'Electronics', 10367), (u'Novelty, Costumes & More', 6384), (u'Men', 4880)]

Next, we have been tasked to get the average product review length for each category. We can solve this using groupByKey!

In [24]:
category_review = nice_joined.flatMap(lambda x: [(y, len(x['reviewText'])) for y in x['categories']])
print "After the flatMap: " + str(category_review.first())
print "After the groupByKey: " + str(category_review.groupByKey().map(lambda x: (x[0], list(x[1]))).first())
print

grouped_category_review = category_review.groupByKey().map(lambda x: (x[0], sum(x[1])/float(len(x[1]))))
print "grouped_category_review.first(): " + str(grouped_category_review.first())

### Now we can sort the categories by average product review length
print "The top 10 categories are: " + str(sorted(grouped_category_review.collect(), key=lambda x: x[1], reverse=True)[:10])
After the flatMap: (u'Electronics', 293)
After the groupByKey: (u'Screen Protectors', [191, 135, 135, 782, 782, 782, 446, 446, 446, 85])

grouped_category_review.first(): (u'Screen Protectors', 423.0)
The top 10 categories are: [(u'Photos', 7570.0), (u'Bags, Packs & Accessories', 6411.0), (u'Rifles', 5079.888888888889), (u'Motets', 3404.0), (u'Free-Weight Racks', 3404.0), (u'Weight Racks', 3404.0), (u'Magnificats', 3404.0), (u'Sonatinas', 3239.2), (u'Sonatas', 3239.2), (u'Rugby', 3156.0)]

EXERCISE: Do the same thing, but this time you are not allowed to use groupByKey()!

Optional: Data skewness

In [25]:
from math import exp
from datetime import datetime

def get_part_index(splitIndex, iterator):
    for it in iterator:
        yield (splitIndex, it)
       
def count_elements(splitIndex, iterator):
    n = sum(1 for _ in iterator)
    yield (splitIndex, n)
        
print "***Creating the large rdd***"
num_parts = 16
# create the large skewed rdd
skewed_large_rdd = sc.parallelize(range(0,num_parts), num_parts).flatMap(lambda x: range(0, int(exp(x)))).mapPartitionsWithIndex(lambda ind, x: get_part_index(ind, x)).cache()
print "first 5 items:" + str(skewed_large_rdd.take(5))
print "num rows: " + str(skewed_large_rdd.count())
print "num partitions: " + str(skewed_large_rdd.getNumPartitions())
print "The distribution of elements per partition is " + str(skewed_large_rdd.mapPartitionsWithIndex(lambda ind, x: count_elements(ind, x)).collect())
print

print "***Creating the small rdd***"
small_rdd = sc.parallelize(range(0,num_parts), num_parts).map(lambda x: (x, x))
print "first 5 items:" + str(small_rdd.take(5))
print "num rows: " + str(small_rdd.count())
print "num partitions: " + str(small_rdd.getNumPartitions())
print "The distribution of elements per partition is " + str(small_rdd.mapPartitionsWithIndex(lambda ind, x: count_elements(ind, x)).collect())

print

print "Joining them"
t0 = datetime.now()
result = skewed_large_rdd.leftOuterJoin(small_rdd)
result.count() 
print "The direct join takes %s"%(str(datetime.now() - t0))
print "The joined rdd has {0} partitions and {1} rows".format(result.getNumPartitions(), result.count())
***Creating the large rdd***
first 5 items:[(0, 0), (1, 0), (1, 1), (2, 0), (2, 1)]
num rows: 5171502
num partitions: 16
The distribution of elements per partition is [(0, 1), (1, 2), (2, 7), (3, 20), (4, 54), (5, 148), (6, 403), (7, 1096), (8, 2980), (9, 8103), (10, 22026), (11, 59874), (12, 162754), (13, 442413), (14, 1202604), (15, 3269017)]

***Creating the small rdd***
first 5 items:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
num rows: 16
num partitions: 16
The distribution of elements per partition is [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1), (10, 1), (11, 1), (12, 1), (13, 1), (14, 1), (15, 1)]

Joining them
The direct join takes 0:00:30.362042
The joined rdd has 32 partitions and 5171502 rows

Optional: Integrating Spark with popular Python libraries

In [26]:
import sklearn
import pickle

model = pickle.load(open('Data/classifiers/classifier.pkl', 'r'))
model_b = sc.broadcast(model)
fashion.map(lambda x: eval(x)['reviewText']).map(lambda x: (x, model_b.value.predict([x])[0])).first()
Out[26]:
('Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.',
 'fashion')

Spark.. A View from the trenches: Part 2

Introduction

This is the latter part of the tutorial. The main focus will be on Spark DataFrames and Spark SQL.

In [27]:
review_filepaths = 'Data/Reviews/*'
textRDD = sc.textFile(review_filepaths)

print 'number of reviews : {0}'.format(textRDD.count())

print 'sample row : \n{0}'.format(textRDD.first())
number of reviews : 30000
sample row : 
{"reviewerID": "AKM1MP6P0OYPR", "asin": "0132793040", "reviewerName": "Vicki Gibson \"momo4\"", "helpful": [1, 1], "reviewText": "Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.", "overall": 5.0, "summary": "Very thorough", "unixReviewTime": 1365811200, "reviewTime": "04 13, 2013"}

5. Loading Data into a DataFrame

A DataFrame requires schema. There are two main functions that can be used to assign schema into an RDD.

  • Inferring Schema : This functions infers the schema of the RDD by observing it
  • Applying Schema : This function applies a manually defined schema an RDD
In [28]:
# You need SQL context do 
from pyspark.sql import SQLContext

# Instantiate SQL Context
sqc = SQLContext(sc)

print sqc
<pyspark.sql.context.SQLContext object at 0x10951d590>

Inferring the Schema Using Reflection

In [29]:
inferredDF = sqc.jsonFile(review_filepaths)
inferredDF.first()
Out[29]:
Row(asin=u'0132793040', helpful=[1, 1], overall=5.0, reviewText=u'Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.', reviewTime=u'04 13, 2013', reviewerID=u'AKM1MP6P0OYPR', reviewerName=u'Vicki Gibson "momo4"', summary=u'Very thorough', unixReviewTime=1365811200)
In [30]:
inferredDF.printSchema()
root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

Manually Specifying the Schema

The Documentation about different data types can be found at Spark SQL DataTypes section

EXERCISE (3 mins)

  • Let's add the fields "summary" ::string and "unixReviewTime" ::long to the schema.
In [31]:
# Export the modules
from pyspark.sql.types import *

# Define Schema
REVIEWS_SCHEMA_DEF = StructType([
        StructField('reviewerID', StringType(), True),
        StructField('asin', StringType(), True),
        StructField('reviewerName', StringType(), True),
        StructField('helpful', ArrayType(
                IntegerType(), True), 
            True),
        StructField('reviewText', StringType(), True),
        StructField('reviewTime', StringType(), True),
        StructField('overall', DoubleType(), True),
        StructField('summary', StringType(), True),
        StructField('unixReviewTime', LongType(), True)
    ])

print REVIEWS_SCHEMA_DEF
StructType(List(StructField(reviewerID,StringType,true),StructField(asin,StringType,true),StructField(reviewerName,StringType,true),StructField(helpful,ArrayType(IntegerType,true),true),StructField(reviewText,StringType,true),StructField(reviewTime,StringType,true),StructField(overall,DoubleType,true),StructField(summary,StringType,true),StructField(unixReviewTime,LongType,true)))
In [32]:
# Using a handcrafted schema with to create a DataFrame
appliedDF = sqlContext.jsonFile(review_filepaths,schema=REVIEWS_SCHEMA_DEF)
appliedDF.first()
Out[32]:
Row(reviewerID=u'AKM1MP6P0OYPR', asin=u'0132793040', reviewerName=u'Vicki Gibson "momo4"', helpful=[1, 1], reviewText=u'Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.', reviewTime=u'04 13, 2013', overall=5.0, summary=u'Very thorough', unixReviewTime=1365811200)

6. DataFrame operations

Spark DataFrame API allow you to do multiple operations on the Data. The primary advantage of using the DataFrame API is that you can do data transoformations with the high level API without having to use Python. Using the high level API has its advantages which will be explained later in the tutorial.

DataFrame API have functionality similar to that of Core RDD API. For example:

  • map : foreach, Select
  • mapPartition : foreachPartition
  • filter : filter
  • groupByKey, reduceByKey : groupBy

6.1. Selecting Columns

You can use SELECT statement to select columns from your dataframe

EXERCISE (3 mins)

  • Divide the numerator in 'helpful' field by the denominator in 'helpful' field to get the fraction
In [33]:
columnDF = appliedDF.select(appliedDF.asin,
                            appliedDF.overall,
                            appliedDF.reviewText,
                            appliedDF.helpful[0]/appliedDF.helpful[1],
                            appliedDF.reviewerID,
                            appliedDF.unixReviewTime).\
                    withColumnRenamed('(helpful[0] / helpful[1])','helpful')
columnDF.show()
asin       overall reviewText           helpful            reviewerID     unixReviewTime
0132793040 5.0     Corey Barker does... 1.0                AKM1MP6P0OYPR  1365811200    
0321732944 5.0     While many beginn... null               A2CX7LUOHB2NDG 1341100800    
0439886341 1.0     It never worked. ... 1.0                A2NWSAGRHCP8N5 1367193600    
0439886341 3.0     Some of the funct... 1.0                A2WNBOD3WNDNKT 1374451200    
0439886341 1.0     Do not waste your... 1.0                A1GI0U4ZRJA8WN 1334707200    
0511189877 5.0     Dog got the old r... null               A1QGNMC6O1VW39 1397433600    
0511189877 2.0     This remote, for ... 1.0                A3J3BRHTDRFJ2G 1397433600    
0511189877 5.0     We had an old Tim... 0.0                A2TY0BTJOTENPG 1395878400    
0511189877 5.0     This unit works j... null               A34ATBPOK6HCHY 1395532800    
0511189877 5.0     It is an exact du... null               A89DO69P0XZ27  1395446400    
0511189877 5.0     Works on my t.v. ... 0.0                AZYNQZ94U6VDB  1401321600    
0528881469 5.0     Love it has every... null               A1DA3W4GTFXP6O 1405641600    
0528881469 1.0     I have owned two ... null               A29LPQQDG7LD5J 1352073600    
0528881469 5.0     We got this GPS f... null               AO94DHGC771SJ  1370131200    
0528881469 1.0     I'm a professiona... 0.8                AMO214LNFCEI4  1290643200    
0528881469 4.0     This is a great t... 0.9545454545454546 A28B1G1MSJ6OO1 1280016000    
0528881469 3.0     Well, what can I ... 0.9555555555555556 A3N7T0DY83Y4IG 1283990400    
0528881469 2.0     Not going to writ... 0.9                A1H8PY3QHMQQA0 1290556800    
0528881469 2.0     My brother is a t... 0.71875            A2CPBQ5W4OGBX  1277078400    
0528881469 4.0     This unit is a fa... 1.0                A265MKAR2WEH3Y 1294790400    

6.2. Missing Values

Similar to Pandas, DataFrames come equipped with functions to address missing data.

  • dropna function: can be used to remove observations with missing values
  • fillna function: can be used to fill missing values with a default value
In [34]:
# get null observations out
densedDF=columnDF.dropna(subset=["overall"]).fillna(0.0,subset=["helpful"]) 
densedDF.show()
asin       overall reviewText           helpful            reviewerID     unixReviewTime
0132793040 5.0     Corey Barker does... 1.0                AKM1MP6P0OYPR  1365811200    
0321732944 5.0     While many beginn... 0.0                A2CX7LUOHB2NDG 1341100800    
0439886341 1.0     It never worked. ... 1.0                A2NWSAGRHCP8N5 1367193600    
0439886341 3.0     Some of the funct... 1.0                A2WNBOD3WNDNKT 1374451200    
0439886341 1.0     Do not waste your... 1.0                A1GI0U4ZRJA8WN 1334707200    
0511189877 5.0     Dog got the old r... 0.0                A1QGNMC6O1VW39 1397433600    
0511189877 2.0     This remote, for ... 1.0                A3J3BRHTDRFJ2G 1397433600    
0511189877 5.0     We had an old Tim... 0.0                A2TY0BTJOTENPG 1395878400    
0511189877 5.0     This unit works j... 0.0                A34ATBPOK6HCHY 1395532800    
0511189877 5.0     It is an exact du... 0.0                A89DO69P0XZ27  1395446400    
0511189877 5.0     Works on my t.v. ... 0.0                AZYNQZ94U6VDB  1401321600    
0528881469 5.0     Love it has every... 0.0                A1DA3W4GTFXP6O 1405641600    
0528881469 1.0     I have owned two ... 0.0                A29LPQQDG7LD5J 1352073600    
0528881469 5.0     We got this GPS f... 0.0                AO94DHGC771SJ  1370131200    
0528881469 1.0     I'm a professiona... 0.8                AMO214LNFCEI4  1290643200    
0528881469 4.0     This is a great t... 0.9545454545454546 A28B1G1MSJ6OO1 1280016000    
0528881469 3.0     Well, what can I ... 0.9555555555555556 A3N7T0DY83Y4IG 1283990400    
0528881469 2.0     Not going to writ... 0.9                A1H8PY3QHMQQA0 1290556800    
0528881469 2.0     My brother is a t... 0.71875            A2CPBQ5W4OGBX  1277078400    
0528881469 4.0     This unit is a fa... 1.0                A265MKAR2WEH3Y 1294790400    

6.3. Filtering rows

Filtering lets you select rows based on arguments. The implementation pattern is similar to filtering RDDs, But simpler.

EXERCISE (3 mins)

  • Let's filter all the reviews that have an overall score greater than or equal to 3.0
In [35]:
filteredDF=densedDF.filter(densedDF.overall>=3)
filteredDF.show()
asin       overall reviewText           helpful            reviewerID     unixReviewTime
0132793040 5.0     Corey Barker does... 1.0                AKM1MP6P0OYPR  1365811200    
0321732944 5.0     While many beginn... 0.0                A2CX7LUOHB2NDG 1341100800    
0439886341 3.0     Some of the funct... 1.0                A2WNBOD3WNDNKT 1374451200    
0511189877 5.0     Dog got the old r... 0.0                A1QGNMC6O1VW39 1397433600    
0511189877 5.0     We had an old Tim... 0.0                A2TY0BTJOTENPG 1395878400    
0511189877 5.0     This unit works j... 0.0                A34ATBPOK6HCHY 1395532800    
0511189877 5.0     It is an exact du... 0.0                A89DO69P0XZ27  1395446400    
0511189877 5.0     Works on my t.v. ... 0.0                AZYNQZ94U6VDB  1401321600    
0528881469 5.0     Love it has every... 0.0                A1DA3W4GTFXP6O 1405641600    
0528881469 5.0     We got this GPS f... 0.0                AO94DHGC771SJ  1370131200    
0528881469 4.0     This is a great t... 0.9545454545454546 A28B1G1MSJ6OO1 1280016000    
0528881469 3.0     Well, what can I ... 0.9555555555555556 A3N7T0DY83Y4IG 1283990400    
0528881469 4.0     This unit is a fa... 1.0                A265MKAR2WEH3Y 1294790400    
0528881469 5.0     I did a lot of co... 1.0                A37K02NKUIT68K 1293235200    
0528881469 4.0     I purchased this ... 0.5                A2AW1SSVUIYV9Y 1289001600    
0528881469 5.0     EXCELLENT. BEST T... 0.7142857142857143 A2AEHUKOV014BP 1284249600    
0528881469 4.0     Well as one of th... 1.0                A2O8FIJR9EBU56 1278547200    
0528881469 4.0     Was fast and what... 0.0                AYTBGUX49LF3W  1398470400    
0528881469 5.0     We had the GPS fo... 0.0                A1E4WG8HRWWK4R 1390867200    
0528881469 5.0     Back in the old d... 0.5                A2AOEW5UGXFOOQ 1294790400    

6.4. Grouping by overall scores

Grouping is equivalent to the groupByKey in the core RDD API. You can transform the grouped values using a summary action such as:

  • count
  • sum
  • average
  • max and so on ...
In [36]:
grouped = filteredDF.groupBy("overall").count()
grouped.show()
overall count
3.0     2128 
5.0     18503
4.0     5324 

6.5. Joining DataFrames together

You can join two DataFrames together by using a common key column.

In [37]:
product_filepaths = 'Data/Products/*'
productRDD = sc.textFile(product_filepaths)
productRDD.first()
Out[37]:
u'{"asin": "0000037214", "title": "Purple Sequin Tiny Dancer Tutu Ballet Dance Fairy Princess Costume Accessory", "price": 6.9900000000000002, "imUrl": "http://ecx.images-amazon.com/images/I/31mCncNuAZL.jpg", "related": {"also_viewed": ["B00JO8II76", "B00DGN4R1Q", "B00E1YRI4C"]}, "salesRank": {"Clothing": 1233557}, "brand": "Big Dreams", "categories": [["Clothing, Shoes & Jewelry", "Girls"], ["Clothing, Shoes & Jewelry", "Novelty, Costumes & More", "Costumes & Accessories", "More Accessories", "Kids & Baby"]]}'
In [38]:
# Load Dataset2 : Amazon Product information
# First, define Schema for second Dataset
PRODUCTS_SCHEMA_DEF = StructType([
        StructField('asin', StringType(), True),
        StructField('title', StringType(), True),
        StructField('price', DoubleType(), True),
        StructField('categories', ArrayType(ArrayType(
            StringType(), True),True),True)
    ])

# Load the dataset
productDF = sqc.jsonFile(product_filepaths,PRODUCTS_SCHEMA_DEF)
productDF.show()
# productDF.first()
asin       title                price categories          
0000037214 Purple Sequin Tin... 6.99  ArrayBuffer(Array...
0000032069 Adult Ballet Tutu... 7.89  ArrayBuffer(Array...
0000031909 Girls Ballet Tutu... 7.0   ArrayBuffer(Array...
0000032034 Adult Ballet Tutu... 7.87  ArrayBuffer(Array...
0000031852 Girls Ballet Tutu... 3.17  ArrayBuffer(Array...
0000032050 Adult Ballet Tutu... 12.85 ArrayBuffer(Array...
0000031887 Ballet Dress-Up F... 6.79  ArrayBuffer(Array...
0000031895 Girls Ballet Tutu... 2.99  ArrayBuffer(Array...
0123456479 SHINING IMAGE HUG... 64.98 ArrayBuffer(Array...
0132793040 Kelby Training DV... null  ArrayBuffer(Array...
0188477284 Klean Kanteen Cla... null  ArrayBuffer(Array...
0321732944 Kelby Training DV... null  ArrayBuffer(Array...
0439886341 Digital Organizer... 8.15  ArrayBuffer(Array...
0456844570 RiZ Women's Beaut... null  ArrayBuffer(Array...
0456808574 Lantin White Viso... null  ArrayBuffer(Array...
0456830197 NVC Unisex Light ... null  ArrayBuffer(Array...
0456856293 Kismeth Eyewear C... null  ArrayBuffer(Array...
0456840532 Max-MPH Black - L... null  ArrayBuffer(Array...
0456787283 FX1 Small Adult A... null  ArrayBuffer(Array...
0456838384 Riz Small Unisex ... null  ArrayBuffer(Array...

QUESTION: What do you think will happen if we remove some fields from this schema?

  1. The schema fails
  2. The schema works fine

ANSWER???

Now lets join the two datasets

In [39]:
enrichedReviews = filteredDF.join(productDF, productDF.asin==filteredDF.asin).dropna(subset="title")
enrichedReviews.count()
Out[39]:
25566L

When you join two RDDs, you have to restructure the data into (k,V) pairs where the key is the join key. This may involve two additional map transformations. This is not necessary in DataFrames.

In [40]:
enrichedReviews
Out[40]:
DataFrame[asin: string, overall: double, reviewText: string, helpful: double, reviewerID: string, unixReviewTime: bigint, asin: string, title: string, price: double, categories: array<array<string>>]
In [41]:
enrichedReviews.show()
asin       overall reviewText           helpful reviewerID     unixReviewTime asin       title                price categories          
9983782030 5.0     I purchased this ... 0.0     A2G6OTE5JOZHOM 1320192000     9983782030 PREMIUM USB Adapt... null  ArrayBuffer(Array...
9983782030 5.0     Item arrived as d... 1.0     A2AEZK0K9CFVE6 1290211200     9983782030 PREMIUM USB Adapt... null  ArrayBuffer(Array...
998498480X 5.0     The charger that ... 0.0     ABB3GDPNS9TVW  1365292800     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 3.0     I got this charge... 0.0     A32NKAW0Q7A9IQ 1290729600     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 5.0     Garmin Nuvi 800 s... 0.0     A1GPVV2XWNGSDX 1303776000     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 4.0     I plan on using m... 0.0     AGZ5E33UY5L7S  1290038400     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 5.0     Sometimes I need ... 0.0     A2XOVBAMQ8J0YV 1325808000     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 4.0     Turns on the GPS ... 0.0     A1VCXIYTPCE6W0 1395964800     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 4.0     This plug works w... 0.0     A13KD5E4V2VPTO 1363996800     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 4.0     My GPS device did... 1.0     A3AGNJNG7CFVZK 1275782400     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 5.0     It worked perfect... 0.0     A1B48D0I1M9HBI 1388534400     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 3.0     Did not seem to m... 0.0     A1996ALUR26ETP 1388275200     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 5.0     We bought this fo... 0.0     A24JJYOP041KMM 1383436800     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 5.0     It's a wall wart.... 1.0     A1DOFBSJT3SSKZ 1269561600     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 5.0     The charger was r... 0.0     ADTXKJ76N9S6W  1272326400     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 5.0     This GPS is great... 0.0     A3D1ZRIDPWDM1V 1287100800     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 5.0     I used it once. W... 0.0     A3R2M5ABAYL63P 1291507200     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 3.0     I really liked th... 0.0     A1QT8GM0WW02Z5 1287878400     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 5.0     The thing I liked... 0.0     A3RYHP2OYFJ3YC 1397779200     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...
998498480X 4.0     I really don't ha... 0.0     A261321MIS0YPB 1290729600     998498480X Garmin Nuvi 855 S... 3.77  ArrayBuffer(Array...

7. Saving your DataFrame

Now that we have done some operations on the data, we can save the file for later use. Standard data formats are a great way to opening up valuable data to your entire organization. Spark DataFrames can be saved in many different formats including and not limited to JSON, parquet, Hive and etc...

In [42]:
try:
    columnDF.saveAsParquetFile('Data/Outputs/reviews_filtered.parquet')
except:
    pass

print "Saved as parquet successfully"
Saved as parquet successfully

8. Using Spark SQL

Spark DataFrames also allow you to use Spark SQL to query from Petabytes of data. Spark comes with a SQL like query language which can be used to query from Distributed DataFrames. A key advantage of using Spark SQL is that the Catelyst query optimizer under the hood transforms your SQL query to run it most efficiently.

8.1. Example Queries

Spark SQL can leverage the same functionality as the DataFrame API provides. In fact, it provides more functionality via SQL capabilities and HQL capabilities that are available to Spark SQL environment.

For the sake of time constrains, I will explain different functions available in Spark SQL environment by using examples that use multiple functions. This will benefit by:

  • Covering many functions that are possible via spark SQL
  • Giving an understanding about how to pipe multiple functions together
In [43]:
# Read the reviews parquet file
reviewsDF = sqc.parquetFile('Data/Outputs/reviews_filtered.parquet')

# Register the DataFrames to be used in sql
reviewsDF.registerAsTable("reviews")
productDF.registerAsTable("products")

print 'There are {0} reviews about {1} products'.format(reviewsDF.count(),productDF.count())
There are 30000 reviews about 2469 products
In [44]:
sql_query = """SELECT reviews.asin, overall, reviewText, price
            FROM reviews JOIN products ON  reviews.asin=products.asin
            WHERE price > 50.00
"""

result = sqc.sql(sql_query)
result.show()
asin       overall reviewText           price
0123456479 5.0     Very simple...I l... 64.98
0123456479 5.0     This was  my gran... 64.98
0123456479 4.0     I was looking for... 64.98
0123456479 5.0     I absolutely LOVE... 64.98
0123456479 5.0     This was a gift f... 64.98
0123456479 4.0     I love the produc... 64.98
0123456479 5.0     This was everythi... 64.98
0123456479 5.0     This jewelry box ... 64.98
0123456479 5.0     I have ordered th... 64.98
0123456479 4.0     This is a nice je... 64.98
0123456479 5.0     The minute I saw ... 64.98
0123456479 5.0     This jewelry box ... 64.98
0123456479 4.0     I love pink, and ... 64.98
0123456479 5.0     This was a great ... 64.98
0123456479 4.0     This is probably ... 64.98
0123456479 5.0     Love this Jewelry... 64.98
0123456479 5.0     I got this jewelr... 64.98
0123456479 5.0     This products is ... 64.98
0123456479 3.0     I already own thi... 64.98
0123456479 5.0     I love this jewel... 64.98

Optional: User Defined Functions

Spark SQL also provides the functionality similar to User Defined Functions (UDF) offering in Hive. Spark uses registerFunction() function to register python functions in SQLContext.

In [45]:
import re

def transform_review(review):
    x1 = re.sub('[^0-9a-zA-Z\s]+','',review)
    return [x1.lower()]

result.registerAsTable("result")
sqc.registerFunction("to_lowercase", lambda x:transform_review(x),returnType=ArrayType(StringType(), True))

sql_query_transform = """SELECT asin, reviewText, to_lowercase(reviewText) as cleaned
            FROM result
"""

result_transform = sqc.sql(sql_query_transform)
result_transform.show()
asin       reviewText           cleaned             
0123456479 Very simple...I l... ArrayBuffer(very ...
0123456479 This was  my gran... ArrayBuffer(this ...
0123456479 I was looking for... ArrayBuffer(i was...
0123456479 I absolutely LOVE... ArrayBuffer(i abs...
0123456479 This was a gift f... ArrayBuffer(this ...
0123456479 I love the produc... ArrayBuffer(i lov...
0123456479 This was everythi... ArrayBuffer(this ...
0123456479 This jewelry box ... ArrayBuffer(this ...
0123456479 I have ordered th... ArrayBuffer(i hav...
0123456479 This is a nice je... ArrayBuffer(this ...
0123456479 The minute I saw ... ArrayBuffer(the m...
0123456479 This jewelry box ... ArrayBuffer(this ...
0123456479 I love pink, and ... ArrayBuffer(i lov...
0123456479 This was a great ... ArrayBuffer(this ...
0123456479 This is probably ... ArrayBuffer(this ...
0123456479 Love this Jewelry... ArrayBuffer(love ...
0123456479 I got this jewelr... ArrayBuffer(i got...
0123456479 This products is ... ArrayBuffer(this ...
0123456479 I already own thi... ArrayBuffer(i alr...
0123456479 I love this jewel... ArrayBuffer(i lov...

Optional : Mix and Match!!

You can also mix DataFrames, RDDs and SparkSQL to make it work for you.

Scenario

We want to investigate the average rating of reviews in terms of the categories they belong to. In order to do this, we:

  • query the needed data using DataFrames API
  • classify the reviews into different categories using core RDD API
  • query the avearage rating for each category using Spark SQL
In [46]:
import sklearn
import pickle

from pyspark.sql import Row

model = pickle.load(open('Data/classifiers/classifier.pkl', 'r'))
classifier_b = sc.broadcast(model)

# fashion.map(lambda x: eval(x)['reviewText']).map(lambda x: (x, model_b.value.predict([x])[0])).first()
classifiedRDD = result_transform.map(lambda row: 
                                     (row.asin,row.reviewText,str(classifier_b.value.predict(row.cleaned)[0]))
                                    )

classifiedRDD.first()

CLASSIFIED_SCHEMA = StructType([
        StructField('asin', StringType(), True),
        StructField('review', StringType(), True),
        StructField('category', StringType(), True)
    ])

classifiedDF = sqc.createDataFrame(classifiedRDD,CLASSIFIED_SCHEMA)

classifiedDF.show()
asin       review               category
0123456479 Very simple...I l... fashion 
0123456479 This was  my gran... fashion 
0123456479 I was looking for... fashion 
0123456479 I absolutely LOVE... fashion 
0123456479 This was a gift f... fashion 
0123456479 I love the produc... fashion 
0123456479 This was everythi... fashion 
0123456479 This jewelry box ... fashion 
0123456479 I have ordered th... fashion 
0123456479 This is a nice je... fashion 
0123456479 The minute I saw ... fashion 
0123456479 This jewelry box ... fashion 
0123456479 I love pink, and ... fashion 
0123456479 This was a great ... sports  
0123456479 This is probably ... fashion 
0123456479 Love this Jewelry... fashion 
0123456479 I got this jewelr... fashion 
0123456479 This products is ... fashion 
0123456479 I already own thi... fashion 
0123456479 I love this jewel... fashion 
In [47]:
classifiedDF.registerAsTable('enrichedReviews')

sql_query_test = """SELECT category, avg(overall) as avgRating
            FROM reviews 
            JOIN products ON reviews.asin=products.asin 
            JOIN enrichedReviews ON products.asin=enrichedReviews.asin
            WHERE price > 50.0
            GROUP BY enrichedReviews.category
"""

resultTest = sqc.sql(sql_query_test)
resultTest.show()
category    avgRating         
fashion     4.004614104227351 
sports      4.284457321452837 
electronics 3.7669524119042914