This notebook is part of Hadoop tutorials delivered by IT-DB group

SPARK DataFrame Hands-On Lab

Hands-On 1 - Construct a DataFrame from parquet file

This demostrates how to read a parquet file and construct a DataFrame

First lets create sparkcontext (sc) and SQLContext (sqlContext)

In [ ]:
from pyspark import SparkContext, SQLContext, SparkConf
In [ ]:
conf = SparkConf().setMaster("local").set("spark.driver.memory", "1g").set("spark.executor.memory", "1g")
In [ ]:
sc = SparkContext(conf = conf)
In [ ]:
sqlContext = SQLContext(sc)

Read the parquet file into DataFrame

In [ ]:
df = sqlContext.read.parquet('hadoop-tutorials-data/UN_Pop_Stats.parquet')

Inspect the data

In [ ]:
df.show(2)
In [ ]:
df.printSchema()

Hands-On 2 - Calculate the year wise population of switzerland

This shows how to query dataframes and how to show explain plan

first, lets import the bits we need

In [ ]:
import pyspark.sql.functions as func

year wise population for all countries

you can see how you can filter, groupBy, aggregate and sort the dataframe

In [ ]:
y_df = df.filter(df.Sex == '"Both"') \
    .groupBy(df.Location,df.Time) \
    .agg(func.sum(df.Value*1000) \
        .alias("Sum")) \
    .orderBy(df.Time)

filter out for switzerland (or for that matter your country of choice)

you can see how select can be used on dataframes to select the columns you need

In [ ]:
c_df = y_df.filter(df.Location == '"Switzerland"') \
    .select(df.Time,"Sum") \
    .collect()
In [ ]:
print(c_df)

plot the results using matlibplot

In [ ]:
%matplotlib notebook
import matplotlib.pyplot as plt
In [ ]:
plt.figure(figsize=(14,6))
x_val = [x[0][1] for x in c_df]
y_val = [x[1] for x in c_df]
plt.plot(range(len(y_val)), y_val)
plt.xticks(range(len(x_val)), x_val, size='small')
plt.show()

finally you can view the explain plan generated by catalyst optimizer

In [ ]:
y_df.explain()

Hands-On 3 - Construct the dataframes using json

This demonstrates how json can be manipulated using dataframes

Read a json into a dataframe

In [ ]:
df_json = sqlContext.read.json('hadoop-tutorials-data/meetup-final.json')

dataframe can infer schema from json file

In [ ]:
df_json.printSchema()

Top events by rsvp's

In [ ]:
df_json.groupBy(df_json.event.event_name,df_json.group.group_city,df_json.venue.venue_name) \
    .count() \
    .select("event[event_name]","group[group_city]","venue[venue_name]","count") \
    .orderBy("count", ascending = False) \
    .show()

what if the json contains an array, no problem we can explode it

In [ ]:
from pyspark.sql.functions import explode
In [ ]:
df_json.select(df_json.event.event_name,explode("group.group_topics")).show()

Hands-On 4 - This demostrates how DataFrame can be persisted as table and issue queries against it

Load the whitehouse vistor records parquet file into DataFrame

In [ ]:
wh_df = sqlContext.read.parquet("hadoop-tutorials-data/WH_VR.parquet")
In [ ]:
wh_df.printSchema()
In [ ]:
wh_df.columns

Inspect the data

In [ ]:
wh_df.select("NAMELAST","APPT_START_DATE").show(10)

persist the DataFrame as temporary table

In [ ]:
wh_df.registerTempTable("Vistor_Records")

You can now use issue the queries against this table using sqlContext.sql interface

Count the vistors by last name

In [ ]:
count_by_name = sqlContext.sql("select NAMELAST, count(1) as count from Vistor_Records group by NAMELAST order by count desc")
In [ ]:
count_by_name.show()

count the number of vistors by year

In [ ]:
count_by_day = sqlContext.sql("select year(APPT_START_DATE), count(1) as count from Vistor_Records \
    where APPT_START_DATE is not null \
    group by year(APPT_START_DATE) \
    order by year(APPT_START_DATE)").collect()
In [ ]:
plt.figure(figsize=(14,6))
x_val = [x[0] for x in sorted(count_by_day)]
y_val = [x[1] for x in sorted(count_by_day)]
plt.bar(range(len(y_val)), y_val)
plt.xticks(range(len(x_val)), x_val, size='small')
plt.show()

Count the number of vistors by weekday

This demonstrates how you can create UDF - user defined function

In [ ]:
from datetime import datetime
from pyspark.sql.types import StringType
def get_weekday(str):
    return str.strftime("%A")
sqlContext.registerFunction("get_weekday", lambda x: \
                            get_weekday(x), \
                            StringType())
In [ ]:
count_by_wday = sqlContext.sql("select get_weekday(APPT_START_DATE), count(1) as count from Vistor_Records \
    where APPT_START_DATE is not null \
    group by get_weekday(APPT_START_DATE) \
    order by count desc")
In [ ]:
count_by_wday.show()

Finally lets produce histogram on the group size

In [ ]:
group_size = sqlContext.sql("select distinct UIN, Total_People from Vistor_Records \
    where Total_People > 30 \
    and Total_People < 200").collect()
In [ ]:
import numpy as np
plt.figure(figsize=(14,6))
x_val = [int(x[1]) for x in group_size]
print min(x_val),max(x_val)
bins = np.arange(min(x_val), max(x_val), 10) 
n, bins, patches = plt.hist(x_val, bins=bins, facecolor='green')