#!/usr/bin/env python # coding: utf-8 # # PySpark Demo Notebook 4 # # ## Contents # # 1. [Read CSV-Format File](#Read-CSV-Format-File) # 2. [Run PostgreSQL Script](#Run-PostgreSQL-Script) # 3. [Load PostgreSQL Data](#Run-PostgreSQL-Script) # 4. [Create a New Record](#Create-a-New-Record) # 5. [Append Record to Database Table](#Append-Record-to-Database-Table) # 6. [Overwrite Data to Database Table](#Overwrite-Data-to-Database-Table) # 7. [Analyze and Graph Data with BokehJS](#Analyze-and-Graph-Data-with-BokehJS) # 9. [Read and Write Data to Parquet](#Read-and-Write-Data-to-Parquet) # # ## Background # # _Prepared by: [Gary A. Stafford](https://twitter.com/GaryStafford) # Associated article: [Getting Started with Data Analytics using Jupyter Notebooks, PySpark, and Docker](https://wp.me/p1RD28-6Fj)_ # ## Read CSV-Format File # Read CSV-format data file into a Spark DataFrame. # In[1]: from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType # In[2]: spark = SparkSession \ .builder \ .appName('04_notebook') \ .config('spark.driver.extraClassPath', 'postgresql-42.2.10.jar') \ .getOrCreate() # In[3]: bakery_schema = StructType([ StructField('date', StringType(), True), StructField('time', StringType(), True), StructField('transaction', IntegerType(), True), StructField('item', StringType(), True) ]) # In[4]: df1 = spark.read \ .format('csv') \ .option('header', 'true') \ .load('BreadBasket_DMS.csv', schema=bakery_schema) # In[5]: print('DataFrame rows: %d' % df1.count()) print('DataFrame schema: %s' % df1) df1.show(10, False) # ## Run PostgreSQL Script # Run the sql script to create the database schema and import data from CSV file. # In[6]: get_ipython().run_line_magic('run', "-i '03_load_sql.py'") # ## Load PostgreSQL Data # Load the PostgreSQL 'transactions' table's contents into a Spark DataFrame. # In[7]: properties = { 'driver': 'org.postgresql.Driver', 'url': 'jdbc:postgresql://postgres:5432/bakery', 'user': 'postgres', 'password': 'postgres1234', 'dbtable': 'transactions', } # In[8]: df2 = spark.read \ .format('jdbc') \ .option('driver', properties['driver']) \ .option('url', properties['url']) \ .option('user', properties['user']) \ .option('password', properties['password']) \ .option('dbtable', properties['dbtable']) \ .load() # In[9]: print('DataFrame rows: %d' % df1.count()) print('DataFrame schema: %s' % df1) df2.show(10, False) # ## Create a New Record # Create a new bakery record and load into a Spark DataFrame. # In[10]: data = [('2016-10-30', '10:13:27', 2, 'Pastry')] df3 = spark.createDataFrame(data, bakery_schema) # In[11]: print('DataFrame rows: %d' % df3.count()) print('DataFrame schema: %s' % df3) df3.show(10, False) # ## Append Record to Database Table # Append the contents of the DataFrame to the bakery PostgreSQL database's 'transactions' table. # In[12]: df3.write \ .format('jdbc') \ .option('driver', properties['driver']) \ .option('url', properties['url']) \ .option('user', properties['user']) \ .option('password', properties['password']) \ .option('dbtable', properties['dbtable']) \ .mode('append') \ .save() # In[13]: # should now contain one additional row of data print('DataFrame rows: %d' % df2.count()) # ## Overwrite Data to Database Table # Overwrite the contents of the CSV file-based DataFrame to the 'transactions' table. # In[14]: df1.write \ .format('jdbc') \ .option('driver', properties['driver']) \ .option('url', properties['url']) \ .option('user', properties['user']) \ .option('password', properties['password']) \ .option('dbtable', properties['dbtable']) \ .option('truncate', 'true') \ .mode('overwrite') \ .save() # ## Analyze and Graph Data with BokehJS # Perform some simple analysis of the bakery data and plot the results with [BokehJS](https://docs.bokeh.org/en/latest/index.html). # ### Business Questions # 1. What are the busiest days of the week? # 2. What are the busiest times of the day? # 3. What are the top selling bakery items? # 4. How many items do customers usually buy? # In[15]: from math import pi from bokeh.io import output_notebook, show from bokeh.plotting import figure from bokeh.models import ColumnDataSource from bokeh.transform import factor_cmap, cumsum from bokeh.palettes import Paired12 output_notebook() # ### Pie Chart # What are the busiest days of the week? # In[16]: df1.createOrReplaceTempView('tmp_bakery') # In[17]: sql_query = "SELECT date_format(date, 'EEEE') as day, count(*) as count " \ "FROM tmp_bakery " \ "WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \ "GROUP BY day " \ "ORDER BY count ASC " \ "LIMIT 10" df4 = spark.sql(sql_query) df4.show(10, False) # In[18]: data = df4.toPandas() tooltips = [('day', '@day'), ('count', '@{count}{,}')] days = data['day'].tolist() color_map = factor_cmap(field_name='day', palette=Paired12, factors=days) data['angle'] = data['count'] / data['count'].sum() * 2 * pi plot = figure(plot_height=450, plot_width=700, title='Items Sold/Day', tooltips=tooltips, x_range=(-0.5, 1.0)) plot.wedge(x=0, y=1, radius=0.4, start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), line_color='white', fill_color=color_map, legend_field='day', source=data) plot.axis.axis_label = None plot.axis.visible = False plot.grid.grid_line_color = None show(plot) # ### Vertical Bar Chart # What are the busiest times of the day? # In[19]: def time_increment(h, m): """Calculates a 30-minute time increment Parameters: h (str): hours, '0' or '00' to '23' m (str): minutes, '0' or '00' to '59' Returns: str: 30-minute time increment, i.e. '07:30', '23:00', or '12:00' """ increment = (int(m) * (100 / 60)) / 100 # 0.0000 - 0.9833 increment = round(increment, 0) # 0.0 or 1.0 increment = int(increment) * 30 # 0 or 30 increment = str(h).rjust(2, '0') + ':' + str(increment).rjust(2, '0') return increment # i.e. '07:30' or '23:00' spark.udf.register("udfTimeIncrement", time_increment, StringType()) sql_query = "WITH tmp_table AS (" \ " SELECT udfTimeIncrement(date_format(time, 'HH'), date_format(time, 'mm')) as period, count(*) as count " \ " FROM tmp_bakery " \ " WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \ " GROUP BY period " \ " ORDER BY period ASC" \ ") " \ "SELECT period, count " \ "FROM tmp_table " \ "WHERE period BETWEEN '07:00' AND '19:00'" df5 = spark.sql(sql_query) df5.show(10, False) # In[20]: source = ColumnDataSource(data=df5.toPandas()) tooltips = [('period', '@period'), ('count', '@{count}{,}')] periods = source.data['period'].tolist() plot = figure(x_range=periods, plot_width=900, plot_height=450, min_border=0, tooltips=tooltips) plot.vbar(x='period', bottom=0, top='count', source=source, width=0.9) plot.title.text = 'Items Sold/Hour' plot.xaxis.axis_label = 'Hour of the Day' plot.yaxis.axis_label = 'Total Items Sold' show(plot) # ### Horizontal Bar Chart # What are the top selling bakery items? # In[21]: sql_query = "SELECT item, count(*) as count " \ "FROM tmp_bakery " \ "WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \ "GROUP BY item " \ "ORDER BY count DESC " \ "LIMIT 10" df6 = spark.sql(sql_query) df6.show(10, False) # In[22]: source = ColumnDataSource(data=df6.toPandas()) tooltips = [('item', '@item'), ('count', '@{count}{,}')] items = source.data['item'].tolist() items.reverse() plot = figure(y_range=items, plot_width=750, plot_height=375, min_border=0, tooltips=tooltips) plot.hbar(y='item', right='count', height=.9, source=source) plot.title.text = 'Top 10 Bakery Items' plot.yaxis.axis_label = 'Items' plot.xaxis.axis_label = 'Total Items Sold' show(plot) # ### Vertical Bar Chart # How many items do customers usually buy? # In[23]: sql_query = "WITH tmp_table AS (" \ " SELECT transaction, count(*) as order_size " \ " FROM tmp_bakery " \ " WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \ " GROUP BY transaction " \ " ORDER BY order_size DESC" \ ") " \ "SELECT order_size, count(*) as count " \ "FROM tmp_table " \ "GROUP BY order_size " \ "ORDER BY order_size ASC" \ df7 = spark.sql(sql_query) df7.show(24, False) # In[24]: source = ColumnDataSource(data=df7.toPandas()) tooltips = [('order_size', '@order_size'), ('count', '@count')] items = source.data['order_size'].tolist() items = list(map(str, items)) plot = figure(x_range=items, plot_width=750, plot_height=375, min_border=0, tooltips=tooltips) plot.vbar(x='order_size', bottom=0, top='count', source=source, width=0.9) plot.line(x='order_size', y='count', source=source, line_color='red', line_width=2) plot.title.text = 'Transaction Size' plot.xaxis.axis_label = 'Items/Transaction' plot.yaxis.axis_label = 'Total Transactions' show(plot) # ## Read and Write Data to Parquet # Perform basic analysis of the bakery data using Spark SQL. Read and write resulting DataFrame contents to [Apache Parquet](https://parquet.apache.org/) format. # In[25]: sql_query = "SELECT transaction, CAST(CONCAT(date,' ',time) as timestamp) as timestamp, item " \ "FROM tmp_bakery " \ "WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \ "ORDER BY transaction ASC, item ASC" df8 = spark.sql(sql_query) print('DataFrame rows: %d' % df8.count()) print('DataFrame schema: %s' % df8) df8.show(10, False) # In[26]: df8.write.parquet('output/bakery_parquet', mode='overwrite') # In[27]: get_ipython().system(' ls 2>&1 -lh output/bakery_parquet | head -10') get_ipython().system(" echo 'Parquet Files:' $(ls | wc -l)") # In[28]: df9 = spark.read.parquet('output/bakery_parquet') print('DataFrame rows: %d' % df9.count()) print('DataFrame schema: %s' % df9) df9.select('transaction', 'timestamp', 'item') \ .sort('transaction', 'item') \ .show(10, False)