© Copyright 2018 IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing permissions and
limitations under the License.
Obtain the IP address of the host that you want to connect to by running the appropriate command for your operating system:
ifconfig
ipconfig
hostname -i
Edit the HOST = "XXX.XXX.XXX.XXX"
value in the next cell to provide the IP address.
// Set your host IP address
val Host = "XXX.XXX.XXX.XXX"
// Port will be 1100 for version 1.1.2 or later (5555 for version 1.1.1)
val Port = "1100"
Use cell magic to install the Brunel integration for Apache Toree (Scala).
%AddJar -magic https://brunelvis.org/jar/spark-kernel-brunel-all-2.3.jar -f
Starting download from https://brunelvis.org/jar/spark-kernel-brunel-all-2.3.jar Finished download of spark-kernel-brunel-all-2.3.jar
Import packages for Scala, Spark, and IBM Db2 Event Store.
import sys.process._
import java.io.File
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import org.apache.log4j.{Level, LogManager, Logger}
import org.apache.spark._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.ibm.event.EventSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import com.ibm.event.catalog.TableSchema
import com.ibm.event.common.ConfigurationReader
import com.ibm.event.example.DataGenerator
import com.ibm.event.oltp.EventContext
import com.ibm.event.oltp.InsertResult
ConfigurationReader.setConnectionEndpoints(Host + ":" + Port)
val sqlContext = new EventSession(spark.sparkContext, "TESTDB")
import sqlContext.implicits._
val table = sqlContext.loadEventTable("ClickStreamTable")
table.registerTempTable("ClickStreamTable")
val clickStreamDF = sqlContext.sql("select * from ClickStreamTable")
clickStreamDF.show(5)
+-----------+---------+----------+------------+-----------------+------+--------------------+-------+ | eventId|eventType| timestamp| ipaddress| sessionId|userId| pageUrl|browser| +-----------+---------+----------+------------+-----------------+------+--------------------+-------+ |20170522901| pageView|1496311260|169.34.56.78|y20170522a4499u21|ceaton| /www.cybershop.com| Chrome| |20170522902| pageView|1496311320|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| |20170522903| pageView|1496311440|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| |20170522904| pageView|1496311500|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| |20170522905| pageView|1496311560|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| +-----------+---------+----------+------------+-----------------+------+--------------------+-------+ only showing top 5 rows
val timestamp = clickStreamDF("timestamp")
val next_timestamp = lead(timestamp, 1).over(Window.orderBy(timestamp))
// Calculate time on spent on web pages
val clickStreamWithTimeDF = clickStreamDF.withColumn(
"time", next_timestamp.cast(LongType) - timestamp.cast(LongType))
clickStreamWithTimeDF.show(5)
+-----------+---------+----------+------------+-----------------+------+--------------------+-------+----+ | eventId|eventType| timestamp| ipaddress| sessionId|userId| pageUrl|browser|time| +-----------+---------+----------+------------+-----------------+------+--------------------+-------+----+ |20170522901| pageView|1496311260|169.34.56.78|y20170522a4499u21|ceaton| /www.cybershop.com| Chrome| 60| |20170522902| pageView|1496311320|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| 120| |20170522903| pageView|1496311440|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| 60| |20170522904| pageView|1496311500|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| 60| |20170522905| pageView|1496311560|169.34.56.78|y20170522a4499u21|ceaton|/estore?product_l...| Chrome| 60| +-----------+---------+----------+------------+-----------------+------+--------------------+-------+----+ only showing top 5 rows
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
clickStreamWithTimeDF.registerTempTable("tempData")
val clickStreamWithDateTimeDF = sqlContext.sql(
"select eventId, eventType, cast(from_unixtime(timestamp) as date), " +
"ipaddress, sessionId, userId, pageUrl, browser, time " +
"from tempData").withColumnRenamed(
"CAST(from_unixtime(CAST(timestamp AS BIGINT), yyyy-MM-dd HH:mm:ss) AS DATE)",
"date")
// clickStreamWithDateTimeDF.show(5)
clickStreamWithDateTimeDF.registerTempTable("ClickData")
val clicksDF = sqlContext.sql(
"select pageURL, count(*) as page_hits, sum(time) as total_time " +
"from ClickData where eventType='pageView' group by pageURL")
clicksDF.show(5,false)
+-----------------------------------------------------------------------------+---------+----------+ |pageURL |page_hits|total_time| +-----------------------------------------------------------------------------+---------+----------+ |/www.cybershop.com |9 |600 | |/estore?product_line=smartphones&action=catalog |13 |1260 | |/estore?product_line=smartphones&product=A-phone&action=details |7 |540 | |/estore?product_line=smartphones&product=A-phone&feature=color&action=details|5 |660 | |/estore?product_line=smartphones&product=S-phone&action=details |5 |600 | +-----------------------------------------------------------------------------+---------+----------+ only showing top 5 rows
clicksDF.registerTempTable("WebMetricsData")
val webMetricsDF = sqlContext.sql("select * from WebMetricsData")
webMetricsDF.show(5)
+--------------------+---------+----------+ | pageURL|page_hits|total_time| +--------------------+---------+----------+ | /www.cybershop.com| 9| 600| |/estore?product_l...| 13| 1260| |/estore?product_l...| 7| 540| |/estore?product_l...| 5| 660| |/estore?product_l...| 5| 600| +--------------------+---------+----------+ only showing top 5 rows
clicksDF.registerTempTable("WebMetricsDataTest")
val metricsQuery = """
select
parse_URL(pageURL,'QUERY','product_line') as product_line,
Coalesce(parse_URL(pageURL,'QUERY','action'),'') as action,
Coalesce(parse_URL(pageURL,'QUERY','product'),'') as product,
Coalesce(parse_URL(pageURL,'QUERY','feature'),'') as feature, page_hits, total_time
from WebMetricsData"""
val metricsQuery2 = """
select
parse_URL(pageURL,'QUERY','product_line') as product_line,
parse_URL(pageURL,'QUERY','action') as action,
parse_URL(pageURL,'QUERY','product') as product,
from WebMetricsData"""
val metricsQuery3 = """
select parse_URL(pageURL,'QUERY','product_line') as product_line
from WebMetricsDataTest"""
val webMetricsDF3 = sqlContext.sql(metricsQuery3).filter($"product_line".isNotNull).sort($"product_line".desc)
webMetricsDF3.show(5)
val webMetricsDF = sqlContext.sql(metricsQuery).filter($"product_line".isNotNull).sort($"product_line".desc)
webMetricsDF.show(5)
+------------+ |product_line| +------------+ | videogames| | videogames| | smartphones| | smartphones| | smartphones| +------------+ only showing top 5 rows +------------+-------+-------+---------+---------+----------+ |product_line| action|product| feature|page_hits|total_time| +------------+-------+-------+---------+---------+----------+ | videogames|details| W-game| | 1| 120| | videogames|catalog| | | 6| 4680| | smartphones|details|A-phone|processor| 2| 120| | smartphones|details|A-phone| | 7| 540| | smartphones|details|A-phone| color| 5| 660| +------------+-------+-------+---------+---------+----------+ only showing top 5 rows
val productlineMetrics = webMetricsDF.
select("product_line","page_hits","total_time").
groupBy("product_line").agg(sum("page_hits"), sum("total_time")).
withColumnRenamed("sum(page_hits)","page_hits").
withColumnRenamed("sum(total_time)","total_time").
sort($"page_hits".desc)
productlineMetrics.show()
+--------------+---------+----------+ | product_line|page_hits|total_time| +--------------+---------+----------+ | smartphones| 58| 6360| | computers| 16| 3720| | videogames| 7| 4800| | appliances| 5| 2220| | hometheater| 4| 840| | headphones| 4| 960| |carelectronics| 2| 420| | cameras| 2| 360| +--------------+---------+----------+
%%brunel data('productlineMetrics')
bar at(0,0,50,50) title("Page Views by Product Line")
x(product_line) y(page_hits)
tooltip(#all) color(product_line) legends(none)
axes(x:'product lines',y:'page views') sort(page_hits) interaction(select)|
treemap at(60,5,100,45)
sort(page_hits) size(page_hits) color(product_line) label(product_line) legends(none)
tooltip("page views: ",page_hits) opacity(#selection) |
bar at(0,50,50,100) title("Total Time by Product Line")
x(product_line) y(total_time)
tooltip(#all) color(product_line) legends(none)
axes(x:'product lines',y:'total time') sort(page_hits) interaction(select)|
treemap at(60,55,100,95)
sort(page_hits) size(total_time) color(product_line) label(product_line) legends(none)
tooltip("time on page (sec): ",total_time) opacity(#selection)
:: width=1000, height=600
val productMetrics = webMetricsDF.
select("product_line","product","page_hits","total_time").
filter($"action" === "details").filter($"product_line" === "smartphones").
groupBy("product_line","product").agg(sum("page_hits"), sum("total_time")).
withColumnRenamed("sum(page_hits)","page_hits").
withColumnRenamed("sum(total_time)","total_time")
productMetrics.show()
+------------+-------+---------+----------+ |product_line|product|page_hits|total_time| +------------+-------+---------+----------+ | smartphones|A-phone| 21| 1920| | smartphones|S-phone| 12| 1380| | smartphones|M-phone| 5| 540| | smartphones|L-phone| 3| 180| | smartphones|H-phone| 2| 120| | smartphones|X-phone| 1| 720| +------------+-------+---------+----------+
%%brunel data('productMetrics')
bar at(0,0,50,50) title("Page Views by Smart Phone")
x(product) y(page_hits)
tooltip(page_hits,product) color(product) legends(none)
axes(x:'smart phones',y:'page views') sort(page_hits) interaction (select)|
treemap at(60,5,100,45)
sort(page_hits) size(page_hits) color(product) label(product) legends(none)
tooltip("page views: ",page_hits) opacity(#selection) |
bar at(0,50,50,100)
title("Total Time by Smart Phone")
x(product) y(total_time)
color(product) label(product) tooltip("time on page (sec): ",total_time)
legends(none) sort(page_hits) interaction(select)|
treemap at(60,55,100,95)
sort(page_hits) size(total_time) color(product) label(product) legends(none)
tooltip("time on page (sec): ",total_time) opacity(#selection)
:: width=1000, height=600
val featureMetrics = webMetricsDF.
select("product", "feature", "page_hits", "total_time").
filter($"action" === "details").
filter($"product" === "A-phone").
filter("feature != ''").
groupBy("product","feature").agg(sum("page_hits"), sum("total_time")).
withColumnRenamed("sum(page_hits)","page_hits").
withColumnRenamed("sum(total_time)","total_time")
featureMetrics.show()
+-------+---------------+---------+----------+ |product| feature|page_hits|total_time| +-------+---------------+---------+----------+ |A-phone| color| 5| 660| |A-phone| battery| 1| 120| |A-phone| processor| 2| 120| |A-phone|voice_assistant| 2| 120| |A-phone| camera| 3| 180| +-------+---------------+---------+----------+
%%brunel data('featureMetrics')
bar title("Web Metrics by Feature")
x(feature) y(page_hits)
tooltip(feature,page_hits) color(feature) legends(none)
axes(x:'A-phone features',y:'page views') sort(page_hits) interaction(select)|
stack polar bar
y(total_time) color(feature) label(feature)
tooltip("time on page (sec): ",total_time)
legends(none) sort(page_hits) opacity(#selection)
:: width=1000, height=300
val userClicksQuery ="""
select pageURL, year(date) as year, month(date) as month, weekofyear(date) as week,
day(date) as day, date_format(date, 'E') as dayofweek,
count(*) as page_hits, sum(time) as total_time
from ClickData
where eventType='pageView' and userId='datkins' group by pageURL, date"""
val userClicksDF = sqlContext.sql(userClicksQuery)
userClicksDF.show(5)
+--------------------+----+-----+----+---+---------+---------+----------+ | pageURL|year|month|week|day|dayofweek|page_hits|total_time| +--------------------+----+-----+----+---+---------+---------+----------+ | /www.cybershop.com|2017| 6| 24| 16| Fri| 1| 60| |/estore?product_l...|2017| 6| 24| 16| Fri| 1| 240| |/estore?product_l...|2017| 6| 24| 16| Fri| 1| 240| |/estore?product_l...|2017| 6| 24| 16| Fri| 1| 180| |/estore?product_l...|2017| 6| 24| 16| Fri| 1| 120| +--------------------+----+-----+----+---+---------+---------+----------+ only showing top 5 rows
// Build user web metrics by product_line, products and feature browses
userClicksDF.registerTempTable("UserWebMetricsData")
val metricsQuery = """
select month, week, day, dayofweek,
parse_URL(pageURL,'QUERY','product_line') as product_line,
Coalesce(parse_URL(pageURL,'QUERY','action'),'') as action,
Coalesce(parse_URL(pageURL,'QUERY','product'),'') as product,
Coalesce(parse_URL(pageURL,'QUERY','feature'),'') as feature,
page_hits, total_time
from UserWebMetricsData
where year = '2017'"""
val userWebMetricsDF = sqlContext.sql(metricsQuery).filter($"product_line".isNotNull)
userWebMetricsDF.show(5)
+-----+----+---+---------+------------+-------+-------+---------------+---------+----------+ |month|week|day|dayofweek|product_line| action|product| feature|page_hits|total_time| +-----+----+---+---------+------------+-------+-------+---------------+---------+----------+ | 6| 24| 16| Fri| headphones|details| A-head| | 1| 240| | 6| 24| 16| Fri| headphones|details| B-head| | 1| 240| | 6| 24| 16| Fri| smartphones|details|M-phone| | 1| 180| | 6| 24| 16| Fri| smartphones|details|S-phone| | 1| 120| | 6| 24| 16| Fri| smartphones|details|S-phone|voice_assistant| 1| 60| +-----+----+---+---------+------------+-------+-------+---------------+---------+----------+ only showing top 5 rows
// Metrics for most recent week
val weekMetricsDF = userWebMetricsDF.
groupBy("dayofweek", "day", "product_line", "action", "product", "feature", "page_hits", "total_time").
max("week")
weekMetricsDF.show(5)
+---------+---+------------+-------+-------+---------------+---------+----------+---------+ |dayofweek|day|product_line| action|product| feature|page_hits|total_time|max(week)| +---------+---+------------+-------+-------+---------------+---------+----------+---------+ | Fri| 16| headphones|details| A-head| | 1| 240| 24| | Fri| 16| headphones|details| B-head| | 1| 240| 24| | Fri| 16| smartphones|details|M-phone| | 1| 180| 24| | Fri| 16| smartphones|details|S-phone| | 1| 120| 24| | Fri| 16| smartphones|details|S-phone|voice_assistant| 1| 60| 24| +---------+---+------------+-------+-------+---------------+---------+----------+---------+ only showing top 5 rows
%%brunel data('weekMetricsDF')
title("David's Browsing by Day")
x(day) y(page_hits)
stack bar
sum(page_hits) color(product_line) tooltip(#all)
axes(x:7,y:'page views') legends(none) interaction(select)|
stack polar bar
y(total_time) color(product_line) label(product)
tooltip("<b>day of week: ", dayofweek,
"<p><u>day of month: ", day,
"</u></b><p><i>product line: ", product_line,
"</i><p>product: ", product)
opacity(#selection)
:: width=1000, height=300