#!/usr/bin/env python # coding: utf-8 # ## This notebook is part of Hadoop tutorials delivered by IT-DB group # ### SPARK DataFrame Hands-On Lab # ### Hands-On 1 - Construct a DataFrame from parquet file # *This demostrates how to read a parquet file and construct a DataFrame* # #### First lets create sparkcontext (sc) and SQLContext (sqlContext) # In[ ]: from pyspark import SparkContext, SQLContext, SparkConf # In[ ]: conf = SparkConf().setMaster("local").set("spark.driver.memory", "1g").set("spark.executor.memory", "1g") # In[ ]: sc = SparkContext(conf = conf) # In[ ]: sqlContext = SQLContext(sc) # #### Read the parquet file into DataFrame # In[ ]: df = sqlContext.read.parquet('hadoop-tutorials-data/UN_Pop_Stats.parquet') # #### Inspect the data # In[ ]: df.show(2) # #### Print the schema of the DataFrame # In[ ]: df.printSchema() # #### Hands-On 2 - Calculate the year wise population of switzerland # *This shows how to query dataframes and how to show explain plan* # #### first, lets import the bits we need # In[ ]: import pyspark.sql.functions as func # #### year wise population for all countries # *you can see how you can filter, groupBy, aggregate and sort the dataframe* # In[ ]: y_df = df.filter(df.Sex == '"Both"') \ .groupBy(df.Location,df.Time) \ .agg(func.sum(df.Value*1000) \ .alias("Sum")) \ .orderBy(df.Time) # #### filter out for switzerland (or for that matter your country of choice) # *you can see how select can be used on dataframes to select the columns you need* # In[ ]: c_df = y_df.filter(df.Location == '"Switzerland"') \ .select(df.Time,"Sum") \ .collect() # In[ ]: print(c_df) # #### plot the results using matlibplot # In[ ]: get_ipython().run_line_magic('matplotlib', 'notebook') import matplotlib.pyplot as plt # In[ ]: plt.figure(figsize=(14,6)) x_val = [x[0][1] for x in c_df] y_val = [x[1] for x in c_df] plt.plot(range(len(y_val)), y_val) plt.xticks(range(len(x_val)), x_val, size='small') plt.show() # #### finally you can view the explain plan generated by catalyst optimizer # In[ ]: y_df.explain() # ### Hands-On 3 - Construct the dataframes using json # *This demonstrates how json can be manipulated using dataframes* # #### Read a json into a dataframe # In[ ]: df_json = sqlContext.read.json('hadoop-tutorials-data/meetup-final.json') # #### dataframe can infer schema from json file # In[ ]: df_json.printSchema() # #### Top events by rsvp's # In[ ]: df_json.groupBy(df_json.event.event_name,df_json.group.group_city,df_json.venue.venue_name) \ .count() \ .select("event[event_name]","group[group_city]","venue[venue_name]","count") \ .orderBy("count", ascending = False) \ .show() # #### what if the json contains an array, no problem we can explode it # In[ ]: from pyspark.sql.functions import explode # In[ ]: df_json.select(df_json.event.event_name,explode("group.group_topics")).show() # ### Hands-On 4 - This demostrates how DataFrame can be persisted as table and issue queries against it # #### Load the whitehouse vistor records parquet file into DataFrame # In[ ]: wh_df = sqlContext.read.parquet("hadoop-tutorials-data/WH_VR.parquet") # #### Print the schema to understand the layout # In[ ]: wh_df.printSchema() # In[ ]: wh_df.columns # #### Inspect the data # In[ ]: wh_df.select("NAMELAST","APPT_START_DATE").show(10) # #### persist the DataFrame as temporary table # In[ ]: wh_df.registerTempTable("Vistor_Records") # #### You can now use issue the queries against this table using sqlContext.sql interface # *Count the vistors by last name* # In[ ]: count_by_name = sqlContext.sql("select NAMELAST, count(1) as count from Vistor_Records group by NAMELAST order by count desc") # In[ ]: count_by_name.show() # *count the number of vistors by year* # In[ ]: count_by_day = sqlContext.sql("select year(APPT_START_DATE), count(1) as count from Vistor_Records \ where APPT_START_DATE is not null \ group by year(APPT_START_DATE) \ order by year(APPT_START_DATE)").collect() # In[ ]: plt.figure(figsize=(14,6)) x_val = [x[0] for x in sorted(count_by_day)] y_val = [x[1] for x in sorted(count_by_day)] plt.bar(range(len(y_val)), y_val) plt.xticks(range(len(x_val)), x_val, size='small') plt.show() # #### Count the number of vistors by weekday # *This demonstrates how you can create UDF - user defined function* # In[ ]: from datetime import datetime from pyspark.sql.types import StringType def get_weekday(str): return str.strftime("%A") sqlContext.registerFunction("get_weekday", lambda x: \ get_weekday(x), \ StringType()) # In[ ]: count_by_wday = sqlContext.sql("select get_weekday(APPT_START_DATE), count(1) as count from Vistor_Records \ where APPT_START_DATE is not null \ group by get_weekday(APPT_START_DATE) \ order by count desc") # In[ ]: count_by_wday.show() # #### Finally lets produce histogram on the group size # In[ ]: group_size = sqlContext.sql("select distinct UIN, Total_People from Vistor_Records \ where Total_People > 30 \ and Total_People < 200").collect() # In[ ]: import numpy as np plt.figure(figsize=(14,6)) x_val = [int(x[1]) for x in group_size] print min(x_val),max(x_val) bins = np.arange(min(x_val), max(x_val), 10) n, bins, patches = plt.hist(x_val, bins=bins, facecolor='green')