Prepared by: Gary A. Stafford
Associated article: Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker
Read CSV-format data file into a Spark DataFrame.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession \
.builder \
.appName('04_notebook') \
.config('spark.driver.extraClassPath', 'postgresql-42.2.10.jar') \
.getOrCreate()
bakery_schema = StructType([
StructField('date', StringType(), True),
StructField('time', StringType(), True),
StructField('transaction', IntegerType(), True),
StructField('item', StringType(), True)
])
df1 = spark.read \
.format('csv') \
.option('header', 'true') \
.load('BreadBasket_DMS.csv', schema=bakery_schema)
print('DataFrame rows: %d' % df1.count())
print('DataFrame schema: %s' % df1)
df1.show(10, False)
Run the sql script to create the database schema and import data from CSV file.
%run -i '03_load_sql.py'
Load the PostgreSQL 'transactions' table's contents into a Spark DataFrame.
properties = {
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://postgres:5432/bakery',
'user': 'postgres',
'password': 'postgres1234',
'dbtable': 'transactions',
}
df2 = spark.read \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.load()
print('DataFrame rows: %d' % df1.count())
print('DataFrame schema: %s' % df1)
df2.show(10, False)
Create a new bakery record and load into a Spark DataFrame.
data = [('2016-10-30', '10:13:27', 2, 'Pastry')]
df3 = spark.createDataFrame(data, bakery_schema)
print('DataFrame rows: %d' % df3.count())
print('DataFrame schema: %s' % df3)
df3.show(10, False)
Append the contents of the DataFrame to the bakery PostgreSQL database's 'transactions' table.
df3.write \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.mode('append') \
.save()
# should now contain one additional row of data
print('DataFrame rows: %d' % df2.count())
Overwrite the contents of the CSV file-based DataFrame to the 'transactions' table.
df1.write \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.option('truncate', 'true') \
.mode('overwrite') \
.save()
from math import pi
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap, cumsum
from bokeh.palettes import Paired12
output_notebook()
What are the busiest days of the week?
df1.createOrReplaceTempView('tmp_bakery')
sql_query = "SELECT date_format(date, 'EEEE') as day, count(*) as count " \
"FROM tmp_bakery " \
"WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
"GROUP BY day " \
"ORDER BY count ASC " \
"LIMIT 10"
df4 = spark.sql(sql_query)
df4.show(10, False)
data = df4.toPandas()
tooltips = [('day', '@day'), ('count', '@{count}{,}')]
days = data['day'].tolist()
color_map = factor_cmap(field_name='day', palette=Paired12, factors=days)
data['angle'] = data['count'] / data['count'].sum() * 2 * pi
plot = figure(plot_height=450,
plot_width=700,
title='Items Sold/Day',
tooltips=tooltips,
x_range=(-0.5, 1.0))
plot.wedge(x=0,
y=1,
radius=0.4,
start_angle=cumsum('angle', include_zero=True),
end_angle=cumsum('angle'),
line_color='white',
fill_color=color_map,
legend_field='day',
source=data)
plot.axis.axis_label = None
plot.axis.visible = False
plot.grid.grid_line_color = None
show(plot)
What are the busiest times of the day?
def time_increment(h, m):
"""Calculates a 30-minute time increment
Parameters:
h (str): hours, '0' or '00' to '23'
m (str): minutes, '0' or '00' to '59'
Returns:
str: 30-minute time increment, i.e. '07:30', '23:00', or '12:00'
"""
increment = (int(m) * (100 / 60)) / 100 # 0.0000 - 0.9833
increment = round(increment, 0) # 0.0 or 1.0
increment = int(increment) * 30 # 0 or 30
increment = str(h).rjust(2, '0') + ':' + str(increment).rjust(2, '0')
return increment # i.e. '07:30' or '23:00'
spark.udf.register("udfTimeIncrement", time_increment, StringType())
sql_query = "WITH tmp_table AS (" \
" SELECT udfTimeIncrement(date_format(time, 'HH'), date_format(time, 'mm')) as period, count(*) as count " \
" FROM tmp_bakery " \
" WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
" GROUP BY period " \
" ORDER BY period ASC" \
") " \
"SELECT period, count " \
"FROM tmp_table " \
"WHERE period BETWEEN '07:00' AND '19:00'"
df5 = spark.sql(sql_query)
df5.show(10, False)
source = ColumnDataSource(data=df5.toPandas())
tooltips = [('period', '@period'), ('count', '@{count}{,}')]
periods = source.data['period'].tolist()
plot = figure(x_range=periods,
plot_width=900,
plot_height=450,
min_border=0,
tooltips=tooltips)
plot.vbar(x='period', bottom=0, top='count', source=source, width=0.9)
plot.title.text = 'Items Sold/Hour'
plot.xaxis.axis_label = 'Hour of the Day'
plot.yaxis.axis_label = 'Total Items Sold'
show(plot)
What are the top selling bakery items?
sql_query = "SELECT item, count(*) as count " \
"FROM tmp_bakery " \
"WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
"GROUP BY item " \
"ORDER BY count DESC " \
"LIMIT 10"
df6 = spark.sql(sql_query)
df6.show(10, False)
source = ColumnDataSource(data=df6.toPandas())
tooltips = [('item', '@item'), ('count', '@{count}{,}')]
items = source.data['item'].tolist()
items.reverse()
plot = figure(y_range=items,
plot_width=750,
plot_height=375,
min_border=0,
tooltips=tooltips)
plot.hbar(y='item', right='count', height=.9, source=source)
plot.title.text = 'Top 10 Bakery Items'
plot.yaxis.axis_label = 'Items'
plot.xaxis.axis_label = 'Total Items Sold'
show(plot)
How many items do customers usually buy?
sql_query = "WITH tmp_table AS (" \
" SELECT transaction, count(*) as order_size " \
" FROM tmp_bakery " \
" WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
" GROUP BY transaction " \
" ORDER BY order_size DESC" \
") " \
"SELECT order_size, count(*) as count " \
"FROM tmp_table " \
"GROUP BY order_size " \
"ORDER BY order_size ASC" \
df7 = spark.sql(sql_query)
df7.show(24, False)
source = ColumnDataSource(data=df7.toPandas())
tooltips = [('order_size', '@order_size'), ('count', '@count')]
items = source.data['order_size'].tolist()
items = list(map(str, items))
plot = figure(x_range=items,
plot_width=750,
plot_height=375,
min_border=0,
tooltips=tooltips)
plot.vbar(x='order_size', bottom=0, top='count', source=source, width=0.9)
plot.line(x='order_size',
y='count',
source=source,
line_color='red',
line_width=2)
plot.title.text = 'Transaction Size'
plot.xaxis.axis_label = 'Items/Transaction'
plot.yaxis.axis_label = 'Total Transactions'
show(plot)
Perform basic analysis of the bakery data using Spark SQL. Read and write resulting DataFrame contents to Apache Parquet format.
sql_query = "SELECT transaction, CAST(CONCAT(date,' ',time) as timestamp) as timestamp, item " \
"FROM tmp_bakery " \
"WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
"ORDER BY transaction ASC, item ASC"
df8 = spark.sql(sql_query)
print('DataFrame rows: %d' % df8.count())
print('DataFrame schema: %s' % df8)
df8.show(10, False)
df8.write.parquet('output/bakery_parquet', mode='overwrite')
! ls 2>&1 -lh output/bakery_parquet | head -10
! echo 'Parquet Files:' $(ls | wc -l)
df9 = spark.read.parquet('output/bakery_parquet')
print('DataFrame rows: %d' % df9.count())
print('DataFrame schema: %s' % df9)
df9.select('transaction', 'timestamp', 'item') \
.sort('transaction', 'item') \
.show(10, False)