Prepared by: Gary A. Stafford
Associated article: https://wp.me/p1RD28-61V
Run the PostgreSQL sql script
! pip install psycopg2-binary --upgrade --quiet
%run -i '03_load_sql.py'
DROP TABLE IF EXISTS "bakery_basket" DROP SEQUENCE IF EXISTS bakery_basket_id_seq CREATE SEQUENCE bakery_basket_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1 CREATE TABLE "public"."bakery_basket" ( "id" integer DEFAULT nextval('bakery_basket_id_seq') NOT NULL, "date" character varying(10) NOT NULL, "time" character varying(8) NOT NULL, "transaction" integer NOT NULL, "item" character varying(50) NOT NULL ) WITH (oids = false) INSERT INTO "bakery_basket" ("date", "time", "transaction", "item", "id") VALUES ('2016-10-30', '09:58:11', 1, 'Bread', 1), ('2016-10-30', '10:05:34', 2, 'Scandinavian', 2), ('2016-10-30', '10:07:57', 3, 'Hot chocolate', 3)
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession \
.builder \
.appName('pyspark_demo_app') \
.config('spark.driver.extraClassPath',
'postgresql-42.2.10.jar') \
.master("local[*]") \
.getOrCreate()
Load the PostgreSQL 'bakery_basket' table's contents into a DataFrame
properties = {
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://postgres:5432/demo',
'user': 'postgres',
'password': 'postgres1234',
'dbtable': 'bakery_basket',
}
df1 = spark.read \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.load()
%%time
df1.show(10)
df1.count()
+---+----------+--------+-----------+-------------+ | id| date| time|transaction| item| +---+----------+--------+-----------+-------------+ | 1|2016-10-30|09:58:11| 1| Bread| | 2|2016-10-30|10:05:34| 2| Scandinavian| | 3|2016-10-30|10:07:57| 3|Hot chocolate| +---+----------+--------+-----------+-------------+ CPU times: user 0 ns, sys: 0 ns, total: 0 ns Wall time: 3.98 s
Create a new bakery record and load into a DataFrame
data = [('2016-10-30', '10:13:27', 2, 'Pastry')]
bakery_schema = StructType([
StructField('date', StringType(), True),
StructField('time', StringType(), True),
StructField('transaction', IntegerType(), True),
StructField('item', StringType(), True)
])
df2 = spark.createDataFrame(data, bakery_schema)
df2.show()
df2.count()
+----------+--------+-----------+------+ | date| time|transaction| item| +----------+--------+-----------+------+ |2016-10-30|10:13:27| 2|Pastry| +----------+--------+-----------+------+
1
Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table
df2.write \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.mode('append') \
.save()
df1.show(10)
df1.count()
+---+----------+--------+-----------+-------------+ | id| date| time|transaction| item| +---+----------+--------+-----------+-------------+ | 1|2016-10-30|09:58:11| 1| Bread| | 2|2016-10-30|10:05:34| 2| Scandinavian| | 3|2016-10-30|10:07:57| 3|Hot chocolate| | 1|2016-10-30|10:13:27| 2| Pastry| +---+----------+--------+-----------+-------------+
4
Load the Kaggle dataset from the CSV file, containing ~21K records, into a DataFrame
! ls -lh *.csv
-rw-r--r-- 1 garystafford users 694K Nov 10 2018 BreadBasket_DMS.csv
df3 = spark.read \
.format("csv") \
.option("header", "true") \
.load("BreadBasket_DMS.csv", schema=bakery_schema)
df3.show(10)
df3.count()
+----------+--------+-----------+-------------+ | date| time|transaction| item| +----------+--------+-----------+-------------+ |2016-10-30|09:58:11| 1| Bread| |2016-10-30|10:05:34| 2| Scandinavian| |2016-10-30|10:05:34| 2| Scandinavian| |2016-10-30|10:07:57| 3|Hot chocolate| |2016-10-30|10:07:57| 3| Jam| |2016-10-30|10:07:57| 3| Cookies| |2016-10-30|10:08:41| 4| Muffin| |2016-10-30|10:13:03| 5| Coffee| |2016-10-30|10:13:03| 5| Pastry| |2016-10-30|10:13:03| 5| Bread| +----------+--------+-----------+-------------+ only showing top 10 rows
21293
Append the contents of the DataFrame to the PostgreSQL 'bakery_basket' table
df3.write \
.format('jdbc') \
.option('driver', properties['driver']) \
.option('url', properties['url']) \
.option('user', properties['user']) \
.option('password', properties['password']) \
.option('dbtable', properties['dbtable']) \
.mode('append') \
.save()
df1.show(10)
df1.count()
+---+----------+--------+-----------+-------------+ | id| date| time|transaction| item| +---+----------+--------+-----------+-------------+ | 1|2016-10-30|09:58:11| 1| Bread| | 2|2016-10-30|10:05:34| 2| Scandinavian| | 3|2016-10-30|10:07:57| 3|Hot chocolate| | 1|2016-10-30|10:13:27| 2| Pastry| | 2|2016-10-30|09:58:11| 1| Bread| | 3|2016-10-30|10:05:34| 2| Scandinavian| | 4|2016-10-30|10:05:34| 2| Scandinavian| | 5|2016-10-30|10:07:57| 3|Hot chocolate| | 6|2016-10-30|10:07:57| 3| Jam| | 7|2016-10-30|10:07:57| 3| Cookies| +---+----------+--------+-----------+-------------+ only showing top 10 rows
21297
Analyze the DataFrame's bakery data using Spark SQL
df1.createOrReplaceTempView("bakery_table")
df4 = spark.sql("SELECT * FROM bakery_table " +
"ORDER BY transaction, date, time")
df4.show(10)
df4.count()
+---+----------+--------+-----------+-------------+ | id| date| time|transaction| item| +---+----------+--------+-----------+-------------+ | 1|2016-10-30|09:58:11| 1| Bread| | 2|2016-10-30|09:58:11| 1| Bread| | 2|2016-10-30|10:05:34| 2| Scandinavian| | 3|2016-10-30|10:05:34| 2| Scandinavian| | 4|2016-10-30|10:05:34| 2| Scandinavian| | 1|2016-10-30|10:13:27| 2| Pastry| | 3|2016-10-30|10:07:57| 3|Hot chocolate| | 6|2016-10-30|10:07:57| 3| Jam| | 5|2016-10-30|10:07:57| 3|Hot chocolate| | 7|2016-10-30|10:07:57| 3| Cookies| +---+----------+--------+-----------+-------------+ only showing top 10 rows
21297
df5 = spark.sql("SELECT COUNT(DISTINCT item) AS item_count FROM bakery_table")
df5.show()
df5 = spark.sql("SELECT item, count(*) as count " +
"FROM bakery_table " +
"WHERE item NOT LIKE 'NONE' " +
"GROUP BY item ORDER BY count DESC " +
"LIMIT 10")
df5.show()
+----------+ |item_count| +----------+ | 95| +----------+ +-------------+-----+ | item|count| +-------------+-----+ | Coffee| 5471| | Bread| 3326| | Tea| 1435| | Cake| 1025| | Pastry| 857| | Sandwich| 771| | Medialuna| 616| |Hot chocolate| 591| | Cookies| 540| | Brownie| 379| +-------------+-----+
Create a vertical bar chart displaying DataFrame data
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap
from bokeh.palettes import Paired12
output_notebook()
source = ColumnDataSource(data=df5.toPandas())
tooltips = [('item', '@item'), ('count', '@{count}{,}')]
items = source.data['item'].tolist()
color_map = factor_cmap(field_name='item', palette=Paired12, factors=items)
plot = figure(x_range=items, plot_width=750, plot_height=375, min_border=0, tooltips=tooltips)
plot.vbar(x='item', bottom=0, top='count', source=source, width=0.9, fill_color=color_map)
plot.title.text = 'Top 10 Bakery Items'
plot.xaxis.axis_label = 'Bakery Items'
plot.yaxis.axis_label = 'Total Items Sold'
show(plot)
df6 = spark.sql("SELECT CONCAT(date,' ',time) as timestamp, transaction, item " +
"FROM bakery_table " +
"WHERE item NOT LIKE 'NONE'" +
"ORDER BY transaction"
)
df6.show(10)
df6.count()
+-------------------+-----------+-------------+ | timestamp|transaction| item| +-------------------+-----------+-------------+ |2016-10-30 09:58:11| 1| Bread| |2016-10-30 09:58:11| 1| Bread| |2016-10-30 10:05:34| 2| Scandinavian| |2016-10-30 10:05:34| 2| Scandinavian| |2016-10-30 10:13:27| 2| Pastry| |2016-10-30 10:05:34| 2| Scandinavian| |2016-10-30 10:07:57| 3|Hot chocolate| |2016-10-30 10:07:57| 3| Jam| |2016-10-30 10:07:57| 3|Hot chocolate| |2016-10-30 10:07:57| 3| Cookies| +-------------------+-----------+-------------+ only showing top 10 rows
20511
df7 = df6.withColumn('timestamp', to_timestamp(df6.timestamp, 'yyyy-MM-dd HH:mm:ss'))
df7.printSchema()
df7.show(10)
df7.count()
root |-- timestamp: timestamp (nullable = true) |-- transaction: integer (nullable = true) |-- item: string (nullable = true) +-------------------+-----------+-------------+ | timestamp|transaction| item| +-------------------+-----------+-------------+ |2016-10-30 09:58:11| 1| Bread| |2016-10-30 09:58:11| 1| Bread| |2016-10-30 10:05:34| 2| Scandinavian| |2016-10-30 10:05:34| 2| Scandinavian| |2016-10-30 10:13:27| 2| Pastry| |2016-10-30 10:05:34| 2| Scandinavian| |2016-10-30 10:07:57| 3|Hot chocolate| |2016-10-30 10:07:57| 3| Jam| |2016-10-30 10:07:57| 3|Hot chocolate| |2016-10-30 10:07:57| 3| Cookies| +-------------------+-----------+-------------+ only showing top 10 rows
20511
df7.createOrReplaceTempView("bakery_table")
df8 = spark.sql("SELECT DISTINCT * " +
"FROM bakery_table " +
"WHERE item NOT LIKE 'NONE'" +
"ORDER BY transaction DESC"
)
df8.show(10)
df8.count()
+-------------------+-----------+--------------+ | timestamp|transaction| item| +-------------------+-----------+--------------+ |2017-04-09 15:04:24| 9684| Smoothies| |2017-04-09 14:57:06| 9683| Pastry| |2017-04-09 14:57:06| 9683| Coffee| |2017-04-09 14:32:58| 9682| Tea| |2017-04-09 14:32:58| 9682| Tacos/Fajita| |2017-04-09 14:32:58| 9682| Muffin| |2017-04-09 14:32:58| 9682| Coffee| |2017-04-09 14:30:09| 9681|Spanish Brunch| |2017-04-09 14:30:09| 9681| Truffles| |2017-04-09 14:30:09| 9681| Tea| +-------------------+-----------+--------------+ only showing top 10 rows
18888
Read and write DataFrame data to Parquet format files
df8.write.parquet('output/bakery_parquet', mode='overwrite')
! ls -lh output/
total 0 drwxr-xr-x 404 garystafford users 13K Jun 9 12:44 bakery_parquet
df9 = spark.read.parquet('output/bakery_parquet')
df9.show(10)
df9.count()
+-------------------+-----------+---------+ | timestamp|transaction| item| +-------------------+-----------+---------+ |2017-02-15 14:54:25| 6620| Cake| |2017-02-15 14:41:27| 6619| Bread| |2017-02-15 14:40:41| 6618| Coffee| |2017-02-15 14:40:41| 6618| Bread| |2017-02-15 14:23:16| 6617| Baguette| |2017-02-15 14:23:16| 6617| Coffee| |2017-02-15 14:23:16| 6617| Salad| |2017-02-15 14:23:16| 6617| Art Tray| |2017-02-15 14:23:16| 6617|Alfajores| |2017-02-15 14:16:26| 6616| Bread| +-------------------+-----------+---------+ only showing top 10 rows
18888