Prepared by: Gary A. Stafford
Associated article: https://wp.me/p1RD28-61V
Setup Spark SparkSession
! pip install pip --upgrade --quiet
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession \
.builder \
.appName('pyspark_demo_app') \
.master("local[*]") \
.getOrCreate()
Load the Kaggle dataset from the CSV file, containing ~21K records, into a DataFrame
bakery_schema = StructType([
StructField('date', StringType(), True),
StructField('time', StringType(), True),
StructField('transaction', IntegerType(), True),
StructField('item', StringType(), True)
])
df_bakery1 = spark.read \
.format("csv") \
.option("header", "true") \
.load("BreadBasket_DMS.csv", schema=bakery_schema)
df_bakery1.show(10)
df_bakery1.count()
+----------+--------+-----------+-------------+ | date| time|transaction| item| +----------+--------+-----------+-------------+ |2016-10-30|09:58:11| 1| Bread| |2016-10-30|10:05:34| 2| Scandinavian| |2016-10-30|10:05:34| 2| Scandinavian| |2016-10-30|10:07:57| 3|Hot chocolate| |2016-10-30|10:07:57| 3| Jam| |2016-10-30|10:07:57| 3| Cookies| |2016-10-30|10:08:41| 4| Muffin| |2016-10-30|10:13:03| 5| Coffee| |2016-10-30|10:13:03| 5| Pastry| |2016-10-30|10:13:03| 5| Bread| +----------+--------+-----------+-------------+ only showing top 10 rows
21293
Transform the DataFrame's bakery data using Spark SQL
df_bakery1.createOrReplaceTempView("bakery_table_tmp1")
df_bakery2 = spark.sql("SELECT date, transaction, item " +
"FROM bakery_table_tmp1 " +
"WHERE item NOT LIKE 'NONE'" +
"ORDER BY transaction")
df_bakery2.show(5)
df_bakery2.count()
+----------+-----------+-------------+ | date|transaction| item| +----------+-----------+-------------+ |2016-10-30| 1| Bread| |2016-10-30| 2| Scandinavian| |2016-10-30| 2| Scandinavian| |2016-10-30| 3|Hot chocolate| |2016-10-30| 3| Jam| +----------+-----------+-------------+ only showing top 5 rows
20507
df_bakery2.createOrReplaceTempView("bakery_table_tmp2")
df_bakery3 = spark.sql("SELECT date, count(*) as count " +
"FROM bakery_table_tmp2 " +
"WHERE date >= '2017-01-01' " +
"GROUP BY date " +
"ORDER BY date")
df_bakery3.show(5)
df_bakery3.count()
+----------+-----+ | date|count| +----------+-----+ |2017-01-01| 1| |2017-01-03| 87| |2017-01-04| 76| |2017-01-05| 95| |2017-01-06| 84| +----------+-----+ only showing top 5 rows
98
! pip install plotly chart-studio --upgrade --quiet
import chart_studio.tools
import chart_studio.plotly as py
import plotly.graph_objs as go
from numpy import arange
from scipy import stats, signal
import warnings
warnings.filterwarnings('ignore')
# *** UPDATE WITH YOUR CREDENTIALS FOR PLOTLY ***
chart_studio.tools.set_credentials_file(username='username_goes_here', api_key='api_key_goes_here')
df_bakery4 = df_bakery3.toPandas()
# Generated linear fit
xi = arange(0, len(df_bakery4.index))
slope, intercept, r_value, p_value, std_err = stats.linregress(xi, df_bakery4['count'])
line = slope * xi + intercept
layout = dict(title='2017 Bakery Sales',
xaxis=dict(
title='Month',
showgrid=True,
zeroline=True,
showline=True,
ticks='outside',
tickangle=45,
showticklabels=True),
yaxis=dict(
title='Items Sold/Day',
showgrid=True,
zeroline=True,
showline=True,
ticks='outside',
showticklabels=True))
trace1 = go.Bar(x=df_bakery4['date'], y=df_bakery4['count'], name='Items Sold')
trace2 = go.Scatter(x=df_bakery4['date'], y=line, mode='lines', name='Linear Fit')
trace3 = go.Scatter(x=df_bakery4['date'], y=signal.savgol_filter(df_bakery4['count'], 53, 3),
mode='lines', name='Savitzky-Golay')
data = [trace1, trace2, trace3]
fig = dict(data=data, layout=layout)
py.iplot(fig, filename='jupyter-basic_bar.html')