#!/usr/bin/env python # coding: utf-8 # ## This notebook is part of Hadoop tutorials delivered by IT-DB group # ### SPARK RDD Hands-On Lab # ### Hands-On 1 - Load and inspect the data from HDFS # #### Please execute the following steps if you are running in Spark local mode # *SPARK context must be explicity created in local mode* # In[ ]: from pyspark import SparkContext # In[ ]: sc = SparkContext() # #### Run the following cell to create an rdd containing the UN Pop Stats data # In[ ]: rdd = sc.textFile("hadoop-tutorials-data/UN_Pop_Stats.csv") # #### Lets see if the rdd contains any data; take(n) - returns the first n elements of the RDD # In[ ]: rdd.take(5) # #### oops, there is a header, lets remove it first # In[ ]: headless_rdd = rdd.filter(lambda line: 'LocID' not in line) # #### inspect the data to see if the header is still present # In[ ]: headless_rdd.take(5) # ### Hands-On 2 - Convert headless_rdd to an RDD containing python namedtuple objects # #### Lets import couple of things we need # In[ ]: from collections import namedtuple from pprint import pprint # #### schema for the namedtuple # *Named tuples are tuples that allow their elements to be accessed by name instead of just index* # In[ ]: pData = namedtuple('pData',['LocID','Location','VarID','Variant','Time','MidPeriod','SexID','Sex','AgeGrp','AgeGrpStart','AgeGrpSpan','Value']) # #### Function to map the data # In[ ]: def map_record(record): columns = record.split(",")[:12] return pData(*columns) # #### The following map created new python namedtuple rdd # In[ ]: ntuple_rdd = headless_rdd.map(map_record) # #### Inspect the data # In[ ]: ntuple_rdd.take(5) # ### Hands-On 3 - Aggregate the population by Age Group for Switzerland for 2015 # *This introduces filter, map, reduceByKey transformations and collect actions* # In[ ]: plot_rdd = ntuple_rdd.filter(lambda record: record.Location =='"Switzerland"' and record.Time == '"2015"' and record.Sex in ['"Male"','"Female"']) \ .map(lambda record: (int(record.AgeGrpStart),int(float(record.Value)*1000))) \ .reduceByKey(lambda x,y: x+y) \ .sortByKey() \ .collect() # #### Lets draw the population distribution histogram # In[ ]: get_ipython().run_line_magic('matplotlib', 'notebook') import matplotlib.pyplot as plt # In[ ]: plt.figure(figsize=(14,6)) x_val = [x[0] for x in sorted(plot_rdd)] y_val = [x[1] for x in sorted(plot_rdd)] print plot_rdd plt.bar(range(len(y_val)), y_val) plt.xticks(range(len(x_val)), x_val, size='small') plt.show() # ### Exercise 1 - show the centenarian populations by country for 2015 ordered by values in the decending order # *Hint - this requires use of filter, map, reduceByKey and sortByKey* # In[ ]: get_ipython().run_line_magic('load', 'key/solution1.py') # ### Hands-On 4 - Calculate the male to female ratio across Locations (countries) and Time (years) # *This introduces transformations join rdd, combineByKey, groupByKey and reduceByKey* # #### First, lets import couple of bits we need # In[ ]: from operator import add # #### calculate the total male population for each Location (country) and Time (year) # In[ ]: m_rdd = ntuple_rdd.filter(lambda record: record.Sex == '"Male"') \ .map(lambda record: ((record.Location,record.Time,record.Sex),float(record.Value))) \ .reduceByKey(add) \ .map(lambda record: ((record[0][0],record[0][1]),(record[0][2],record[1]))) # #### inspect the rdd # In[ ]: m_rdd.take(5) # #### calculate the total female population for each Location (country) and Time (year) # In[ ]: f_rdd = ntuple_rdd.filter(lambda record: record.Sex == '"Female"') \ .map(lambda record: ((record.Location,record.Time,record.Sex),float(record.Value))) \ .reduceByKey(add) \ .map(lambda record: ((record[0][0],record[0][1]),(record[0][2],record[1]))) # #### inspect how the data looks in the rdd # In[ ]: f_rdd.take(5) # #### join the rdd's # In[ ]: join_rdd = m_rdd.join(f_rdd) # #### inspect the data in the join_rdd # In[ ]: join_rdd.take(5) # #### final rdd containing the male to female ratio for each country and year # In[ ]: fn_rdd = join_rdd.map(lambda record: (record[1][0][1]/record[1][1][1],(record[0][0],record[0][1]))) # #### since the output is going to be huge, lets just filer for Estonia # In[ ]: ratio_rdd = fn_rdd.filter(lambda record: record[1][0] == '"Estonia"').map(lambda (x,y): (y,x)).sortByKey().collect() # In[ ]: plt.figure(figsize=(14,6)) x_val = [x[0][1] for x in sorted(ratio_rdd)] y_val = [x[1] for x in sorted(ratio_rdd)] print plot_rdd plt.plot(range(len(y_val)), y_val) plt.xticks(range(len(x_val)), x_val, size='small') plt.show() # #### And finally countries with extreme ratios! # In[ ]: fn_rdd.filter(lambda record: record[0] > 2.5 or record[0] < 0.8).sortByKey().collect() # #### Exercise 2 - how to achieve the same with groupByKey # In[ ]: get_ipython().run_line_magic('load', 'key/solution2.py') # #### Exercise 3 - how to achieve the same with reduceByKey # In[ ]: get_ipython().run_line_magic('load', 'key/solution3.py') # #### Exercise 4 - how to achieve the same with combineByKey # In[ ]: get_ipython().run_line_magic('load', 'key/solution4.py') # ### HandsOn 5 - Calculate the old-age dependency ratio (ratio of pop between 65+ to 25-64) # *This introduces brodcast variables* # #### create broadcast variable, good for performance for map-side join (for lookup table or feature vector) # In[ ]: broadcastWorkingAge = sc.broadcast([25,30,35,40,45,50,55,60]) # #### Function to map the record as WORKING or RETIRED based on age # In[ ]: def map_agegrp(record): if int(record.AgeGrpStart) in broadcastWorkingAge.value: AgeGroup = 'WORKING' else: AgeGroup = 'RETIRED' return ((record.Location,record.Time,AgeGroup),float(record.Value)) # #### Function to calculate the ratio # In[ ]: def cal_ratio(record): if record[1][0] == 'WORKING': ratio = record[1][3] / record[1][1] else: ratio = record[1][1] / record[1][3] return (ratio,(record[0][0],record[0][1])) # #### rdd holding the old-age dependency ratio for each country and for each year # In[ ]: ntuple_rdd.filter(lambda record: record.Sex == '"Both"' and int(record.AgeGrpStart) not in [0,5,10,15,20]) \ .map(map_agegrp) \ .reduceByKey(add) \ .map(lambda record: ((record[0][0],record[0][1]),(record[0][2],record[1]))) \ .reduceByKey(lambda a, b: a + b) \ .map(cal_ratio) \ .take(10) # #### HandsOn 5 - some misc. rdd functions # #### check the number of partitions in RDD # In[ ]: ntuple_rdd.getNumPartitions() # #### repartition the RDD # In[ ]: rep_rdd = ntuple_rdd.repartition(5) # #### check again the number of partitions # In[ ]: rep_rdd.getNumPartitions() # #### which rdd to cache? # In[ ]: ntuple_rdd.cache() # #### each rdd has an id and can be given a friendly name # In[ ]: rdd.id() # In[ ]: rdd.name() # #### get distinct values of a column # In[ ]: ntuple_rdd.map(lambda record: record.Location).distinct().collect() # ### Hands-On 7 - Convert the ntuple_rdd to DataFrame and save the DF as parquet file # #### Create sqlContext # *This is only required if you are running in SPARK local mode* # In[ ]: from pyspark import SQLContext # In[ ]: sqlContext = SQLContext(sc) # #### Convert RDD to DF and inspect the data # In[ ]: df = ntuple_rdd.toDF() # In[ ]: df.show()