PySpark Demo Notebook

Demo

  1. Run PostgreSQL Script
  2. Load PostgreSQL Data
  3. Create New Record
  4. Write New Record to PostgreSQL Table
  5. Load CSV Data File
  6. Write Data to PostgreSQL
  7. Analyze Data with Spark SQL
  8. Graph Data with BokehJS
  9. Read and Write Data to Parquet Format

Run PostgreSQL Script

Run the PostgreSQL sql script

In [1]:
! pip install psycopg2 psycopg2-binary
Requirement already satisfied: psycopg2 in /opt/conda/lib/python3.6/site-packages (2.7.6.1)
Requirement already satisfied: psycopg2-binary in /opt/conda/lib/python3.6/site-packages (2.7.6.1)
In [2]:
%run -i '03_load_sql.py'
DROP TABLE IF EXISTS "bakery_basket"

DROP SEQUENCE IF EXISTS bakery_basket_id_seq

CREATE SEQUENCE bakery_basket_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1


CREATE TABLE "public"."bakery_basket" (
    "id" integer DEFAULT nextval('bakery_basket_id_seq') NOT NULL,
    "date" character varying(10) NOT NULL,
    "time" character varying(8) NOT NULL,
    "transaction" integer NOT NULL,
    "item" character varying(50) NOT NULL
) WITH (oids = false)


INSERT INTO "bakery_basket" ("date", "time", "transaction", "item", "id") VALUES
('2016-10-30',	'09:58:11',	1,	'Bread',	1),
('2016-10-30',	'10:05:34',	2,	'Scandinavian',	2),
('2016-10-30',	'10:07:57',	3,	'Hot chocolate',	3)


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import to_timestamp
In [4]:
working_directory = '/home/garystafford/work/'

spark = SparkSession \
    .builder \
    .appName('pyspark_demo_app') \
    .config('spark.driver.extraClassPath',
            working_directory + 'postgresql-42.2.5.jar') \
    .master("local[*]") \
    .getOrCreate()

Load PostgreSQL Data

Load the PostgreSQL 'bakery_basket' table's contents into a DataFrame

In [5]:
properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://postgres:5432/demo',
    'user': 'postgres',
    'password': 'postgres1234',
    'dbtable': 'bakery_basket',
}

df1 = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .load()
In [6]:
%%time
df1.show(10)
df1.count()
+---+----------+--------+-----------+-------------+
| id|      date|    time|transaction|         item|
+---+----------+--------+-----------+-------------+
|  1|2016-10-30|09:58:11|          1|        Bread|
|  2|2016-10-30|10:05:34|          2| Scandinavian|
|  3|2016-10-30|10:07:57|          3|Hot chocolate|
+---+----------+--------+-----------+-------------+

CPU times: user 0 ns, sys: 10 ms, total: 10 ms
Wall time: 4.3 s

Create New Record

Create a new bakery record and load into a DataFrame

In [7]:
data = [('2016-10-30', '10:13:27', 2, 'Pastry')]

bakery_schema = StructType([
    StructField('date', StringType(), True),
    StructField('time', StringType(), True),
    StructField('transaction', IntegerType(), True),
    StructField('item', StringType(), True)
])

df2 = spark.createDataFrame(data, bakery_schema)
In [8]:
df2.show()
df2.count()
+----------+--------+-----------+------+
|      date|    time|transaction|  item|
+----------+--------+-----------+------+
|2016-10-30|10:13:27|          2|Pastry|
+----------+--------+-----------+------+

Out[8]:
1

Write New Record to PostgreSQL Table

Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table

In [9]:
df2.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .mode('append') \
    .save()
In [10]:
df1.show(10)
df1.count()
+---+----------+--------+-----------+-------------+
| id|      date|    time|transaction|         item|
+---+----------+--------+-----------+-------------+
|  1|2016-10-30|09:58:11|          1|        Bread|
|  2|2016-10-30|10:05:34|          2| Scandinavian|
|  3|2016-10-30|10:07:57|          3|Hot chocolate|
|  1|2016-10-30|10:13:27|          2|       Pastry|
+---+----------+--------+-----------+-------------+

Out[10]:
4

Load CSV File Data

Load the Kaggle dataset from the CSV file, containing ~21K records, into a DataFrame

In [11]:
df3 = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load("BreadBasket_DMS.csv", schema=bakery_schema)
In [12]:
df3.show(10)
df3.count()
+----------+--------+-----------+-------------+
|      date|    time|transaction|         item|
+----------+--------+-----------+-------------+
|2016-10-30|09:58:11|          1|        Bread|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:05:34|          2| Scandinavian|
|2016-10-30|10:07:57|          3|Hot chocolate|
|2016-10-30|10:07:57|          3|          Jam|
|2016-10-30|10:07:57|          3|      Cookies|
|2016-10-30|10:08:41|          4|       Muffin|
|2016-10-30|10:13:03|          5|       Coffee|
|2016-10-30|10:13:03|          5|       Pastry|
|2016-10-30|10:13:03|          5|        Bread|
+----------+--------+-----------+-------------+
only showing top 10 rows

Out[12]:
21293

Write Data to PostgreSQL

Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table

In [13]:
df3.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .mode('append') \
    .save()
In [14]:
df1.show(10)
df1.count()
+---+----------+--------+-----------+-------------+
| id|      date|    time|transaction|         item|
+---+----------+--------+-----------+-------------+
|  1|2016-10-30|09:58:11|          1|        Bread|
|  2|2016-10-30|10:05:34|          2| Scandinavian|
|  3|2016-10-30|10:07:57|          3|Hot chocolate|
|  1|2016-10-30|10:13:27|          2|       Pastry|
|  2|2016-10-30|09:58:11|          1|        Bread|
|  3|2016-10-30|10:05:34|          2| Scandinavian|
|  4|2016-10-30|10:05:34|          2| Scandinavian|
|  5|2016-10-30|10:07:57|          3|Hot chocolate|
|  6|2016-10-30|10:07:57|          3|          Jam|
|  7|2016-10-30|10:07:57|          3|      Cookies|
+---+----------+--------+-----------+-------------+
only showing top 10 rows

Out[14]:
21297

Analyze Data with Spark SQL

Analyze the DataFrame's bakery data using Spark SQL

In [15]:
df1.createOrReplaceTempView("bakery_table")
df4 = spark.sql("SELECT * FROM bakery_table " +
                "ORDER BY transaction, date, time")
df4.show(10)
df4.count()
+---+----------+--------+-----------+-------------+
| id|      date|    time|transaction|         item|
+---+----------+--------+-----------+-------------+
|  1|2016-10-30|09:58:11|          1|        Bread|
|  2|2016-10-30|09:58:11|          1|        Bread|
|  2|2016-10-30|10:05:34|          2| Scandinavian|
|  3|2016-10-30|10:05:34|          2| Scandinavian|
|  4|2016-10-30|10:05:34|          2| Scandinavian|
|  1|2016-10-30|10:13:27|          2|       Pastry|
|  3|2016-10-30|10:07:57|          3|Hot chocolate|
|  6|2016-10-30|10:07:57|          3|          Jam|
|  5|2016-10-30|10:07:57|          3|Hot chocolate|
|  7|2016-10-30|10:07:57|          3|      Cookies|
+---+----------+--------+-----------+-------------+
only showing top 10 rows

Out[15]:
21297
In [16]:
df5 = spark.sql("SELECT COUNT(DISTINCT item) AS item_count FROM bakery_table")
df5.show()

df5 = spark.sql("SELECT item, count(*) as count " +
                "FROM bakery_table " +
                "WHERE item NOT LIKE 'NONE' " +
                "GROUP BY item ORDER BY count DESC " +
                "LIMIT 10")
df5.show()
+----------+
|item_count|
+----------+
|        95|
+----------+

+-------------+-----+
|         item|count|
+-------------+-----+
|       Coffee| 5471|
|        Bread| 3326|
|          Tea| 1435|
|         Cake| 1025|
|       Pastry|  857|
|     Sandwich|  771|
|    Medialuna|  616|
|Hot chocolate|  591|
|      Cookies|  540|
|      Brownie|  379|
+-------------+-----+

Graph Data with BokehJS

Create a vertical bar chart displaying DataFrame data

In [17]:
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap
from bokeh.palettes import Paired12

output_notebook()

source = ColumnDataSource(data=df5.toPandas())

tooltips = [('item', '@item'), ('count', '@{count}{,}')]

items = source.data['item'].tolist()
color_map = factor_cmap(field_name='item', palette=Paired12, factors=items)
plot = figure(x_range=items, plot_width=750, plot_height=375, min_border=0, tooltips=tooltips)
plot.vbar(x='item', bottom=0, top='count', source=source, width=0.9, fill_color=color_map)
plot.title.text = 'Top 10 Bakery Items'
plot.xaxis.axis_label = 'Bakery Items'
plot.yaxis.axis_label = 'Total Items Sold'

show(plot)
Loading BokehJS ...
In [18]:
df6 = spark.sql("SELECT CONCAT(date,' ',time) as timestamp, transaction, item " +
                "FROM bakery_table " +
                "WHERE item NOT LIKE 'NONE'" +
                "ORDER BY transaction"
               )
df6.show(10)
df6.count()
+-------------------+-----------+-------------+
|          timestamp|transaction|         item|
+-------------------+-----------+-------------+
|2016-10-30 09:58:11|          1|        Bread|
|2016-10-30 09:58:11|          1|        Bread|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:13:27|          2|       Pastry|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:07:57|          3|Hot chocolate|
|2016-10-30 10:07:57|          3|          Jam|
|2016-10-30 10:07:57|          3|Hot chocolate|
|2016-10-30 10:07:57|          3|      Cookies|
+-------------------+-----------+-------------+
only showing top 10 rows

Out[18]:
20511
In [19]:
df7 = df6.withColumn('timestamp', to_timestamp(df6.timestamp, 'yyyy-MM-dd HH:mm:ss'))
df7.printSchema()
df7.show(10)
df7.count()
root
 |-- timestamp: timestamp (nullable = true)
 |-- transaction: integer (nullable = true)
 |-- item: string (nullable = true)

+-------------------+-----------+-------------+
|          timestamp|transaction|         item|
+-------------------+-----------+-------------+
|2016-10-30 09:58:11|          1|        Bread|
|2016-10-30 09:58:11|          1|        Bread|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:13:27|          2|       Pastry|
|2016-10-30 10:05:34|          2| Scandinavian|
|2016-10-30 10:07:57|          3|Hot chocolate|
|2016-10-30 10:07:57|          3|          Jam|
|2016-10-30 10:07:57|          3|Hot chocolate|
|2016-10-30 10:07:57|          3|      Cookies|
+-------------------+-----------+-------------+
only showing top 10 rows

Out[19]:
20511
In [20]:
df7.createOrReplaceTempView("bakery_table")
df8 = spark.sql("SELECT DISTINCT * " +
                "FROM bakery_table " +
                "WHERE item NOT LIKE 'NONE'" +
                "ORDER BY transaction DESC"
                )
df8.show(10)
df8.count()
+-------------------+-----------+----------------+
|          timestamp|transaction|            item|
+-------------------+-----------+----------------+
|2017-04-09 15:04:24|       9684|       Smoothies|
|2017-04-09 14:57:06|       9683|          Pastry|
|2017-04-09 14:57:06|       9683|          Coffee|
|2017-04-09 14:32:58|       9682|          Muffin|
|2017-04-09 14:32:58|       9682|          Coffee|
|2017-04-09 14:32:58|       9682|    Tacos/Fajita|
|2017-04-09 14:32:58|       9682|             Tea|
|2017-04-09 14:30:09|       9681|        Truffles|
|2017-04-09 14:30:09|       9681|Christmas common|
|2017-04-09 14:30:09|       9681|             Tea|
+-------------------+-----------+----------------+
only showing top 10 rows

Out[20]:
18888

Read and Write Data to Parquet Format

Read and write DataFrame data to Parquet format files

In [21]:
df8.write.parquet('output/bakery_parquet', mode='overwrite')
In [22]:
df9 = spark.read.parquet('output/bakery_parquet')
df9.show(10)
df9.count()
+-------------------+-----------+---------+
|          timestamp|transaction|     item|
+-------------------+-----------+---------+
|2017-02-15 14:54:25|       6620|     Cake|
|2017-02-15 14:41:27|       6619|    Bread|
|2017-02-15 14:40:41|       6618|   Coffee|
|2017-02-15 14:40:41|       6618|    Bread|
|2017-02-15 14:23:16|       6617| Baguette|
|2017-02-15 14:23:16|       6617|   Coffee|
|2017-02-15 14:23:16|       6617|    Salad|
|2017-02-15 14:23:16|       6617| Art Tray|
|2017-02-15 14:23:16|       6617|Alfajores|
|2017-02-15 14:16:26|       6616|    Bread|
+-------------------+-----------+---------+
only showing top 10 rows

Out[22]:
18888