Read CSV-Format File

Read CSV-format data file into a Spark DataFrame.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
In [2]:
spark = SparkSession \
    .builder \
    .appName('04_notebook') \
    .config('spark.driver.extraClassPath', 'postgresql-42.2.8.jar') \
    .getOrCreate()
In [3]:
bakery_schema = StructType([
    StructField('date', StringType(), True),
    StructField('time', StringType(), True),
    StructField('transaction', IntegerType(), True),
    StructField('item', StringType(), True)
])
In [4]:
df1 = spark.read \
        .format('csv') \
        .option('header', 'true') \
        .load('BreadBasket_DMS.csv', schema=bakery_schema)
In [5]:
print('DataFrame rows: %d' % df1.count())
print('DataFrame schema: %s' % df1)
df1.show(10, False)
DataFrame rows: 21293
DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string]
+----------+--------+-----------+-------------+
|date      |time    |transaction|item         |
+----------+--------+-----------+-------------+
|2016-10-30|09:58:11|1          |Bread        |
|2016-10-30|10:05:34|2          |Scandinavian |
|2016-10-30|10:05:34|2          |Scandinavian |
|2016-10-30|10:07:57|3          |Hot chocolate|
|2016-10-30|10:07:57|3          |Jam          |
|2016-10-30|10:07:57|3          |Cookies      |
|2016-10-30|10:08:41|4          |Muffin       |
|2016-10-30|10:13:03|5          |Coffee       |
|2016-10-30|10:13:03|5          |Pastry       |
|2016-10-30|10:13:03|5          |Bread        |
+----------+--------+-----------+-------------+
only showing top 10 rows

Run PostgreSQL Script

Run the sql script to create the database schema and import data from CSV file.

In [6]:
%run -i '03_load_sql.py'
DROP TABLE IF EXISTS "transactions"

DROP SEQUENCE IF EXISTS transactions_id_seq

CREATE SEQUENCE transactions_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1


CREATE TABLE "public"."transactions"
(
    "id"          integer DEFAULT nextval('transactions_id_seq') NOT NULL,
    "date"        character varying(10)                           NOT NULL,
    "time"        character varying(8)                            NOT NULL,
    "transaction" integer                                         NOT NULL,
    "item"        character varying(50)                           NOT NULL
) WITH (oids = false)

Row count: 21293

Load PostgreSQL Data

Load the PostgreSQL 'transactions' table's contents into a Spark DataFrame.

In [7]:
properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://postgres:5432/bakery',
    'user': 'postgres',
    'password': 'postgres1234',
    'dbtable': 'transactions',
}
In [8]:
df2 = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .load()
In [9]:
print('DataFrame rows: %d' % df1.count())
print('DataFrame schema: %s' % df1)
df2.show(10, False)
DataFrame rows: 21293
DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string]
+---+----------+--------+-----------+-------------+
|id |date      |time    |transaction|item         |
+---+----------+--------+-----------+-------------+
|1  |2016-10-30|09:58:11|1          |Bread        |
|2  |2016-10-30|10:05:34|2          |Scandinavian |
|3  |2016-10-30|10:05:34|2          |Scandinavian |
|4  |2016-10-30|10:07:57|3          |Hot chocolate|
|5  |2016-10-30|10:07:57|3          |Jam          |
|6  |2016-10-30|10:07:57|3          |Cookies      |
|7  |2016-10-30|10:08:41|4          |Muffin       |
|8  |2016-10-30|10:13:03|5          |Coffee       |
|9  |2016-10-30|10:13:03|5          |Pastry       |
|10 |2016-10-30|10:13:03|5          |Bread        |
+---+----------+--------+-----------+-------------+
only showing top 10 rows

Create a New Record

Create a new bakery record and load into a Spark DataFrame.

In [10]:
data = [('2016-10-30', '10:13:27', 2, 'Pastry')]
df3 = spark.createDataFrame(data, bakery_schema)
In [11]:
print('DataFrame rows: %d' % df3.count())
print('DataFrame schema: %s' % df3)
df3.show(10, False)
DataFrame rows: 1
DataFrame schema: DataFrame[date: string, time: string, transaction: int, item: string]
+----------+--------+-----------+------+
|date      |time    |transaction|item  |
+----------+--------+-----------+------+
|2016-10-30|10:13:27|2          |Pastry|
+----------+--------+-----------+------+

Append Record to Database Table

Append the contents of the DataFrame to the bakery PostgreSQL database's 'transactions' table.

In [12]:
df3.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .mode('append') \
    .save()
In [13]:
# should now contain one additional row of data
print('DataFrame rows: %d' % df2.count())
DataFrame rows: 21294

Overwrite Data to Database Table

Overwrite the contents of the CSV file-based DataFrame to the 'transactions' table.

In [14]:
df1.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .option('truncate', 'true') \
    .mode('overwrite') \
    .save()

Analyze and Graph Data with BokehJS

Perform some simple analysis of the bakery data and plot the results with BokehJS.

Business Questions

  1. What are the busiest days of the week?
  2. What are the busiest times of the day?
  3. What are the top selling bakery items?
  4. How many items do customers usually buy?
In [15]:
from math import pi
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap, cumsum
from bokeh.palettes import Paired12

output_notebook()
Loading BokehJS ...

Pie Chart

What are the busiest days of the week?

In [16]:
df1.createOrReplaceTempView('tmp_bakery')
In [17]:
sql_query = "SELECT date_format(date, 'EEEE') as day, count(*) as count " \
            "FROM tmp_bakery " \
            "WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
            "GROUP BY day " \
            "ORDER BY count ASC " \
            "LIMIT 10"
df4 = spark.sql(sql_query)
df4.show(10, False)
+---------+-----+
|day      |count|
+---------+-----+
|Wednesday|2320 |
|Monday   |2324 |
|Tuesday  |2392 |
|Thursday |2646 |
|Sunday   |3095 |
|Friday   |3124 |
|Saturday |4605 |
+---------+-----+

In [18]:
data = df4.toPandas()
tooltips = [('day', '@day'), ('count', '@{count}{,}')]
days = data['day'].tolist()
color_map = factor_cmap(field_name='day', palette=Paired12, factors=days)

data['angle'] = data['count'] / data['count'].sum() * 2 * pi
plot = figure(plot_height=450,
              plot_width=700,
              title='Items Sold/Day',
              tooltips=tooltips,
              x_range=(-0.5, 1.0))
plot.wedge(x=0,
           y=1,
           radius=0.4,
           start_angle=cumsum('angle', include_zero=True),
           end_angle=cumsum('angle'),
           line_color='white',
           fill_color=color_map,
           legend_field='day',
           source=data)
plot.axis.axis_label = None
plot.axis.visible = False
plot.grid.grid_line_color = None

show(plot)

Vertical Bar Chart

What are the busiest times of the day?

In [19]:
def time_increment(h, m):
    """Calculates a 30-minute time increment

    Parameters:
    h (str): hours, '0' or '00' to '23'
    m (str): minutes, '0' or '00' to '59'

    Returns:
    str: 30-minute time increment, i.e. '07:30', '23:00', or '12:00'

    """

    increment = (int(m) * (100 / 60)) / 100  # 0.0000 - 0.9833
    increment = round(increment, 0)  # 0.0 or 1.0
    increment = int(increment) * 30  # 0 or 30
    increment = str(h).rjust(2, '0') + ':' + str(increment).rjust(2, '0')

    return increment  # i.e. '07:30' or '23:00'


spark.udf.register("udfTimeIncrement", time_increment, StringType())


sql_query = "WITH tmp_table AS (" \
            "  SELECT udfTimeIncrement(date_format(time, 'HH'), date_format(time, 'mm')) as period, count(*) as count " \
            "  FROM tmp_bakery " \
            "  WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
            "  GROUP BY period " \
            "  ORDER BY period ASC" \
            ") " \
            "SELECT period, count " \
            "FROM tmp_table " \
            "WHERE period BETWEEN '07:00' AND '19:00'"

df5 = spark.sql(sql_query)
df5.show(10, False)
+------+-----+
|period|count|
+------+-----+
|07:00 |1    |
|07:30 |23   |
|08:00 |209  |
|08:30 |436  |
|09:00 |960  |
|09:30 |1006 |
|10:00 |1238 |
|10:30 |1428 |
|11:00 |1628 |
|11:30 |1474 |
+------+-----+
only showing top 10 rows

In [20]:
source = ColumnDataSource(data=df5.toPandas())
tooltips = [('period', '@period'), ('count', '@{count}{,}')]
periods = source.data['period'].tolist()
plot = figure(x_range=periods,
              plot_width=900,
              plot_height=450,
              min_border=0,
              tooltips=tooltips)
plot.vbar(x='period', bottom=0, top='count', source=source, width=0.9)
plot.title.text = 'Items Sold/Hour'
plot.xaxis.axis_label = 'Hour of the Day'
plot.yaxis.axis_label = 'Total Items Sold'

show(plot)

Horizontal Bar Chart

What are the top selling bakery items?

In [21]:
sql_query = "SELECT item, count(*) as count " \
            "FROM tmp_bakery " \
            "WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
            "GROUP BY item " \
            "ORDER BY count DESC " \
            "LIMIT 10"

df6 = spark.sql(sql_query)
df6.show(10, False)
+-------------+-----+
|item         |count|
+-------------+-----+
|Coffee       |5471 |
|Bread        |3325 |
|Tea          |1435 |
|Cake         |1025 |
|Pastry       |856  |
|Sandwich     |771  |
|Medialuna    |616  |
|Hot chocolate|590  |
|Cookies      |540  |
|Brownie      |379  |
+-------------+-----+

In [22]:
source = ColumnDataSource(data=df6.toPandas())
tooltips = [('item', '@item'), ('count', '@{count}{,}')]
items = source.data['item'].tolist()
items.reverse()
plot = figure(y_range=items,
              plot_width=750,
              plot_height=375,
              min_border=0,
              tooltips=tooltips)
plot.hbar(y='item', right='count', height=.9, source=source)
plot.title.text = 'Top 10 Bakery Items'
plot.yaxis.axis_label = 'Items'
plot.xaxis.axis_label = 'Total Items Sold'

show(plot)

Vertical Bar Chart

How many items do customers usually buy?

In [23]:
sql_query = "WITH tmp_table AS (" \
            "  SELECT transaction, count(*) as order_size " \
            "  FROM tmp_bakery " \
            "  WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
            "  GROUP BY transaction " \
            "  ORDER BY order_size DESC" \
            ") " \
            "SELECT order_size, count(*) as count " \
            "FROM tmp_table " \
            "GROUP BY order_size " \
            "ORDER BY order_size ASC" \

df7 = spark.sql(sql_query)
df7.show(24, False)
+----------+-----+
|order_size|count|
+----------+-----+
|1         |3630 |
|2         |2908 |
|3         |1528 |
|4         |850  |
|5         |341  |
|6         |135  |
|7         |38   |
|8         |21   |
|9         |7    |
|10        |2    |
|11        |4    |
+----------+-----+

In [24]:
source = ColumnDataSource(data=df7.toPandas())
tooltips = [('order_size', '@order_size'), ('count', '@count')]
items = source.data['order_size'].tolist()
items = list(map(str, items))
plot = figure(x_range=items,
              plot_width=750,
              plot_height=375,
              min_border=0,
              tooltips=tooltips)
plot.vbar(x='order_size', bottom=0, top='count', source=source, width=0.9)
plot.line(x='order_size',
          y='count',
          source=source,
          line_color='red',
          line_width=2)
plot.title.text = 'Transaction Size'
plot.xaxis.axis_label = 'Items/Transaction'
plot.yaxis.axis_label = 'Total Transactions'

show(plot)

Read and Write Data to Parquet

Perform basic analysis of the bakery data using Spark SQL. Read and write resulting DataFrame contents to Apache Parquet format.

In [25]:
sql_query = "SELECT transaction, CAST(CONCAT(date,' ',time) as timestamp) as timestamp, item " \
            "FROM tmp_bakery " \
            "WHERE item NOT LIKE 'NONE' AND item NOT LIKE 'Adjustment' " \
            "ORDER BY transaction ASC, item ASC"

df8 = spark.sql(sql_query)
print('DataFrame rows: %d' % df8.count())
print('DataFrame schema: %s' % df8)
df8.show(10, False)
DataFrame rows: 20506
DataFrame schema: DataFrame[transaction: int, timestamp: timestamp, item: string]
+-----------+-------------------+-------------+
|transaction|timestamp          |item         |
+-----------+-------------------+-------------+
|1          |2016-10-30 09:58:11|Bread        |
|2          |2016-10-30 10:05:34|Scandinavian |
|2          |2016-10-30 10:05:34|Scandinavian |
|3          |2016-10-30 10:07:57|Cookies      |
|3          |2016-10-30 10:07:57|Hot chocolate|
|3          |2016-10-30 10:07:57|Jam          |
|4          |2016-10-30 10:08:41|Muffin       |
|5          |2016-10-30 10:13:03|Bread        |
|5          |2016-10-30 10:13:03|Coffee       |
|5          |2016-10-30 10:13:03|Pastry       |
+-----------+-------------------+-------------+
only showing top 10 rows

In [26]:
df8.write.parquet('output/bakery_parquet', mode='overwrite')
In [27]:
! ls 2>&1 -lh output/bakery_parquet | head -10
! echo 'Parquet Files:' $(ls | wc -l)
total 800K
-rw-r--r-- 1 garystaf users 1.9K Dec  6 03:46 part-00000-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 2.0K Dec  6 03:46 part-00001-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 1.8K Dec  6 03:46 part-00002-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 2.0K Dec  6 03:46 part-00003-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 1.9K Dec  6 03:46 part-00004-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 1.9K Dec  6 03:46 part-00005-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 2.0K Dec  6 03:46 part-00006-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 1.9K Dec  6 03:46 part-00007-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet
-rw-r--r-- 1 garystaf users 2.1K Dec  6 03:46 part-00008-50c8ea60-bdf4-4213-a6cd-78c9df626246-c000.snappy.parquet
Parquet Files: 13
In [28]:
df9 = spark.read.parquet('output/bakery_parquet')
print('DataFrame rows: %d' % df9.count())
print('DataFrame schema: %s' % df9)
df9.select('transaction', 'timestamp', 'item') \
    .sort('transaction', 'item') \
    .show(10, False)
DataFrame rows: 20506
DataFrame schema: DataFrame[transaction: int, timestamp: timestamp, item: string]
+-----------+-------------------+-------------+
|transaction|timestamp          |item         |
+-----------+-------------------+-------------+
|1          |2016-10-30 09:58:11|Bread        |
|2          |2016-10-30 10:05:34|Scandinavian |
|2          |2016-10-30 10:05:34|Scandinavian |
|3          |2016-10-30 10:07:57|Cookies      |
|3          |2016-10-30 10:07:57|Hot chocolate|
|3          |2016-10-30 10:07:57|Jam          |
|4          |2016-10-30 10:08:41|Muffin       |
|5          |2016-10-30 10:13:03|Bread        |
|5          |2016-10-30 10:13:03|Coffee       |
|5          |2016-10-30 10:13:03|Pastry       |
+-----------+-------------------+-------------+
only showing top 10 rows