This demostrates how to read a parquet file and construct a DataFrame
from pyspark import SparkContext, SQLContext, SparkConf
conf = SparkConf().setMaster("local").set("spark.driver.memory", "1g").set("spark.executor.memory", "1g")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
df = sqlContext.read.parquet('hadoop-tutorials-data/UN_Pop_Stats.parquet')
df.show(2)
df.printSchema()
This shows how to query dataframes and how to show explain plan
import pyspark.sql.functions as func
you can see how you can filter, groupBy, aggregate and sort the dataframe
y_df = df.filter(df.Sex == '"Both"') \
.groupBy(df.Location,df.Time) \
.agg(func.sum(df.Value*1000) \
.alias("Sum")) \
.orderBy(df.Time)
you can see how select can be used on dataframes to select the columns you need
c_df = y_df.filter(df.Location == '"Switzerland"') \
.select(df.Time,"Sum") \
.collect()
print(c_df)
%matplotlib notebook
import matplotlib.pyplot as plt
plt.figure(figsize=(14,6))
x_val = [x[0][1] for x in c_df]
y_val = [x[1] for x in c_df]
plt.plot(range(len(y_val)), y_val)
plt.xticks(range(len(x_val)), x_val, size='small')
plt.show()
y_df.explain()
This demonstrates how json can be manipulated using dataframes
df_json = sqlContext.read.json('hadoop-tutorials-data/meetup-final.json')
df_json.printSchema()
df_json.groupBy(df_json.event.event_name,df_json.group.group_city,df_json.venue.venue_name) \
.count() \
.select("event[event_name]","group[group_city]","venue[venue_name]","count") \
.orderBy("count", ascending = False) \
.show()
from pyspark.sql.functions import explode
df_json.select(df_json.event.event_name,explode("group.group_topics")).show()
wh_df = sqlContext.read.parquet("hadoop-tutorials-data/WH_VR.parquet")
wh_df.printSchema()
wh_df.columns
wh_df.select("NAMELAST","APPT_START_DATE").show(10)
wh_df.registerTempTable("Vistor_Records")
Count the vistors by last name
count_by_name = sqlContext.sql("select NAMELAST, count(1) as count from Vistor_Records group by NAMELAST order by count desc")
count_by_name.show()
count the number of vistors by year
count_by_day = sqlContext.sql("select year(APPT_START_DATE), count(1) as count from Vistor_Records \
where APPT_START_DATE is not null \
group by year(APPT_START_DATE) \
order by year(APPT_START_DATE)").collect()
plt.figure(figsize=(14,6))
x_val = [x[0] for x in sorted(count_by_day)]
y_val = [x[1] for x in sorted(count_by_day)]
plt.bar(range(len(y_val)), y_val)
plt.xticks(range(len(x_val)), x_val, size='small')
plt.show()
This demonstrates how you can create UDF - user defined function
from datetime import datetime
from pyspark.sql.types import StringType
def get_weekday(str):
return str.strftime("%A")
sqlContext.registerFunction("get_weekday", lambda x: \
get_weekday(x), \
StringType())
count_by_wday = sqlContext.sql("select get_weekday(APPT_START_DATE), count(1) as count from Vistor_Records \
where APPT_START_DATE is not null \
group by get_weekday(APPT_START_DATE) \
order by count desc")
count_by_wday.show()
group_size = sqlContext.sql("select distinct UIN, Total_People from Vistor_Records \
where Total_People > 30 \
and Total_People < 200").collect()
import numpy as np
plt.figure(figsize=(14,6))
x_val = [int(x[1]) for x in group_size]
print min(x_val),max(x_val)
bins = np.arange(min(x_val), max(x_val), 10)
n, bins, patches = plt.hist(x_val, bins=bins, facecolor='green')