Prepared by: Gary A. Stafford
Associated article: Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker
Setup the SparkSession, the entry point to programming Spark with the Dataset and DataFrame API.
from pyspark.sql import SparkSession
# reference: https://spark.apache.org/docs/latest/configuration.html#viewing-spark-properties
spark = SparkSession \
.builder \
.appName('05_notebook') \
.getOrCreate()
spark.sparkContext.getConf().getAll()
[('spark.driver.host', '87ee13b37142'), ('spark.app.id', 'local-1577921437466'), ('spark.app.name', '05_notebook'), ('spark.driver.port', '33709'), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.executor.id', 'driver'), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true')]
df1 = spark.read \
.format('csv') \
.option('header', 'true') \
.option('delimiter', ',') \
.option('inferSchema', True) \
.load('BreadBasket_DMS.csv')
print('DataFrame rows: %d' % df1.count())
print('DataFrame schema: %s' % df1)
df1.show(10, False)
DataFrame rows: 21293 DataFrame schema: DataFrame[Date: timestamp, Time: string, Transaction: int, Item: string] +-------------------+--------+-----------+-------------+ |Date |Time |Transaction|Item | +-------------------+--------+-----------+-------------+ |2016-10-30 00:00:00|09:58:11|1 |Bread | |2016-10-30 00:00:00|10:05:34|2 |Scandinavian | |2016-10-30 00:00:00|10:05:34|2 |Scandinavian | |2016-10-30 00:00:00|10:07:57|3 |Hot chocolate| |2016-10-30 00:00:00|10:07:57|3 |Jam | |2016-10-30 00:00:00|10:07:57|3 |Cookies | |2016-10-30 00:00:00|10:08:41|4 |Muffin | |2016-10-30 00:00:00|10:13:03|5 |Coffee | |2016-10-30 00:00:00|10:13:03|5 |Pastry | |2016-10-30 00:00:00|10:13:03|5 |Bread | +-------------------+--------+-----------+-------------+ only showing top 10 rows
Analyze the DataFrame's bakery data using Spark SQL.
df1.createOrReplaceTempView('tmp_bakery')
df2 = spark.sql("SELECT date, count(*) as count " + "FROM tmp_bakery " +
"GROUP BY date " + "ORDER BY date")
print('DataFrame rows: %d' % df2.count())
df3 = df2.withColumn("hourly_period", df2['date'].substr(1, 2))
print(df3.show(10))
DataFrame rows: 159 +-------------------+-----+-------------+ | date|count|hourly_period| +-------------------+-----+-------------+ |2016-10-30 00:00:00| 180| 20| |2016-10-31 00:00:00| 205| 20| |2016-11-01 00:00:00| 154| 20| |2016-11-02 00:00:00| 169| 20| |2016-11-03 00:00:00| 195| 20| |2016-11-04 00:00:00| 192| 20| |2016-11-05 00:00:00| 283| 20| |2016-11-06 00:00:00| 203| 20| |2016-11-07 00:00:00| 149| 20| |2016-11-08 00:00:00| 147| 20| +-------------------+-----+-------------+ only showing top 10 rows None
Load the Kaggle dataset from the CSV file, containing ~21K rows, into a Spark DataFrame.
import os
from dotenv import load_dotenv
import chart_studio.tools
import chart_studio.plotly as py
import plotly.graph_objs as go
from numpy import arange
from scipy import stats, signal
import warnings
warnings.filterwarnings('ignore')
# load your plotly credentials
load_dotenv()
chart_studio.tools.set_credentials_file(username=os.getenv('PLOTLY_USERNAME'),
api_key=os.getenv('PLOTLY_API_KEY'))
# convert the Spark DataFrame into a pandas DataFrame
pdf = df2.toPandas()
# calculates a linear least-squares regression using scipy
xi = arange(0, len(pdf.index))
slope, intercept, r_value, p_value, std_err = stats.linregress(
xi, pdf['count'])
line = slope * xi + intercept
layout = dict(title='Bakery Sales',
xaxis=dict(title='Month',
showgrid=True,
zeroline=True,
showline=True,
ticks='outside',
tickangle=45,
showticklabels=True),
yaxis=dict(title='Items Sold/Day',
showgrid=True,
zeroline=True,
showline=True,
ticks='outside',
showticklabels=True))
trace1 = go.Bar(x=pdf['date'], y=pdf['count'], name='Items Sold')
trace2 = go.Scatter(x=pdf['date'], y=line, mode='lines', name='Linear Fit')
trace3 = go.Scatter(x=pdf['date'],
y=signal.savgol_filter(pdf['count'], 53, 3),
mode='lines',
name='Savitzky-Golay')
data = [trace1, trace2, trace3]
fig = dict(data=data, layout=layout)
py.iplot(fig, filename='jupyter-basic_bar.html')