This notebook is part of Hadoop tutorials delivered by IT-DB group

SPARK RDD Hands-On Lab

Hands-On 1 - Load and inspect the data from HDFS

Please execute the following steps if you are running in Spark local mode

SPARK context must be explicity created in local mode

In [ ]:
from pyspark import SparkContext
In [ ]:
sc = SparkContext()

Run the following cell to create an rdd containing the UN Pop Stats data

In [ ]:
rdd = sc.textFile("hadoop-tutorials-data/UN_Pop_Stats.csv")

Lets see if the rdd contains any data; take(n) - returns the first n elements of the RDD

In [ ]:
rdd.take(5)

oops, there is a header, lets remove it first

In [ ]:
headless_rdd = rdd.filter(lambda line: 'LocID' not in line)

inspect the data to see if the header is still present

In [ ]:
headless_rdd.take(5)

Hands-On 2 - Convert headless_rdd to an RDD containing python namedtuple objects

Lets import couple of things we need

In [ ]:
from collections import namedtuple
from pprint import pprint

schema for the namedtuple

Named tuples are tuples that allow their elements to be accessed by name instead of just index

In [ ]:
pData = namedtuple('pData',['LocID','Location','VarID','Variant','Time','MidPeriod','SexID','Sex','AgeGrp','AgeGrpStart','AgeGrpSpan','Value'])

Function to map the data

In [ ]:
def map_record(record):
  columns = record.split(",")[:12]
  return pData(*columns)

The following map created new python namedtuple rdd

In [ ]:
ntuple_rdd = headless_rdd.map(map_record)

Inspect the data

In [ ]:
ntuple_rdd.take(5)

Hands-On 3 - Aggregate the population by Age Group for Switzerland for 2015

This introduces filter, map, reduceByKey transformations and collect actions

In [ ]:
plot_rdd = ntuple_rdd.filter(lambda record: record.Location =='"Switzerland"' and record.Time == '"2015"' and record.Sex in ['"Male"','"Female"']) \
    .map(lambda record: (int(record.AgeGrpStart),int(float(record.Value)*1000))) \
    .reduceByKey(lambda x,y: x+y) \
    .sortByKey() \
    .collect()

Lets draw the population distribution histogram

In [ ]:
%matplotlib notebook
import matplotlib.pyplot as plt
In [ ]:
plt.figure(figsize=(14,6))
x_val = [x[0] for x in sorted(plot_rdd)]
y_val = [x[1] for x in sorted(plot_rdd)]
print plot_rdd
plt.bar(range(len(y_val)), y_val)
plt.xticks(range(len(x_val)), x_val, size='small')
plt.show()

Exercise 1 - show the centenarian populations by country for 2015 ordered by values in the decending order

Hint - this requires use of filter, map, reduceByKey and sortByKey

In [ ]:
%load key/solution1.py

Hands-On 4 - Calculate the male to female ratio across Locations (countries) and Time (years)

This introduces transformations join rdd, combineByKey, groupByKey and reduceByKey

First, lets import couple of bits we need

In [ ]:
from operator import add

calculate the total male population for each Location (country) and Time (year)

In [ ]:
m_rdd = ntuple_rdd.filter(lambda record: record.Sex == '"Male"') \
    .map(lambda record: ((record.Location,record.Time,record.Sex),float(record.Value))) \
    .reduceByKey(add) \
    .map(lambda record: ((record[0][0],record[0][1]),(record[0][2],record[1])))

inspect the rdd

In [ ]:
m_rdd.take(5)

calculate the total female population for each Location (country) and Time (year)

In [ ]:
f_rdd = ntuple_rdd.filter(lambda record: record.Sex == '"Female"') \
    .map(lambda record: ((record.Location,record.Time,record.Sex),float(record.Value))) \
    .reduceByKey(add) \
    .map(lambda record: ((record[0][0],record[0][1]),(record[0][2],record[1])))

inspect how the data looks in the rdd

In [ ]:
f_rdd.take(5)

join the rdd's

In [ ]:
join_rdd = m_rdd.join(f_rdd)

inspect the data in the join_rdd

In [ ]:
join_rdd.take(5)

final rdd containing the male to female ratio for each country and year

In [ ]:
fn_rdd = join_rdd.map(lambda record: (record[1][0][1]/record[1][1][1],(record[0][0],record[0][1])))

since the output is going to be huge, lets just filer for Estonia

In [ ]:
ratio_rdd = fn_rdd.filter(lambda record: record[1][0] == '"Estonia"').map(lambda (x,y): (y,x)).sortByKey().collect()
In [ ]:
plt.figure(figsize=(14,6))
x_val = [x[0][1] for x in sorted(ratio_rdd)]
y_val = [x[1] for x in sorted(ratio_rdd)]
print plot_rdd
plt.plot(range(len(y_val)), y_val)
plt.xticks(range(len(x_val)), x_val, size='small')
plt.show()

And finally countries with extreme ratios!

In [ ]:
fn_rdd.filter(lambda record: record[0] > 2.5 or record[0] < 0.8).sortByKey().collect()

Exercise 2 - how to achieve the same with groupByKey

In [ ]:
%load key/solution2.py

Exercise 3 - how to achieve the same with reduceByKey

In [ ]:
%load key/solution3.py

Exercise 4 - how to achieve the same with combineByKey

In [ ]:
%load key/solution4.py

HandsOn 5 - Calculate the old-age dependency ratio (ratio of pop between 65+ to 25-64)

This introduces brodcast variables

create broadcast variable, good for performance for map-side join (for lookup table or feature vector)

In [ ]:
broadcastWorkingAge = sc.broadcast([25,30,35,40,45,50,55,60])

Function to map the record as WORKING or RETIRED based on age

In [ ]:
def map_agegrp(record):
    if int(record.AgeGrpStart) in broadcastWorkingAge.value:
         AgeGroup = 'WORKING'
    else:
         AgeGroup = 'RETIRED'
    return ((record.Location,record.Time,AgeGroup),float(record.Value))

Function to calculate the ratio

In [ ]:
def cal_ratio(record):
    if record[1][0] == 'WORKING':
         ratio = record[1][3] / record[1][1]
    else:
         ratio = record[1][1] / record[1][3]
    return (ratio,(record[0][0],record[0][1]))

rdd holding the old-age dependency ratio for each country and for each year

In [ ]:
ntuple_rdd.filter(lambda record: record.Sex == '"Both"' and int(record.AgeGrpStart) not in [0,5,10,15,20]) \
    .map(map_agegrp) \
    .reduceByKey(add) \
    .map(lambda record: ((record[0][0],record[0][1]),(record[0][2],record[1]))) \
    .reduceByKey(lambda a, b: a + b) \
    .map(cal_ratio) \
    .take(10)

HandsOn 5 - some misc. rdd functions

check the number of partitions in RDD

In [ ]:
ntuple_rdd.getNumPartitions()

repartition the RDD

In [ ]:
rep_rdd = ntuple_rdd.repartition(5)

check again the number of partitions

In [ ]:
rep_rdd.getNumPartitions()

which rdd to cache?

In [ ]:
ntuple_rdd.cache()

each rdd has an id and can be given a friendly name

In [ ]:
rdd.id()
In [ ]:
rdd.name()

get distinct values of a column

In [ ]:
ntuple_rdd.map(lambda record: record.Location).distinct().collect()

Hands-On 7 - Convert the ntuple_rdd to DataFrame and save the DF as parquet file

Create sqlContext

This is only required if you are running in SPARK local mode

In [ ]:
from pyspark import SQLContext
In [ ]:
sqlContext = SQLContext(sc)

Convert RDD to DF and inspect the data

In [ ]:
df = ntuple_rdd.toDF()
In [ ]:
df.show()