In [3]:
spark.version
Out[3]:
2.4.3
In [4]:
import java.sql.Date
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
In [5]:
val devicesDf = Seq(
    (Date.valueOf("2019-01-01"), "notebook", 600.00),
    (Date.valueOf("2019-05-10"), "notebook", 1200.00),
    (Date.valueOf("2019-03-05"), "small phone", 100.00),
    (Date.valueOf("2019-02-20"), "camera",150.00),
    (Date.valueOf("2019-01-20"), "small phone", 300.00),
    (Date.valueOf("2019-02-15"), "large phone", 700.00),
    (Date.valueOf("2019-07-01"), "camera", 300.00),
    (Date.valueOf("2019-04-01"), "small phone", 50.00)
).toDF("purchase_date", "device", "price")
devicesDf = [purchase_date: date, device: string ... 1 more field]
Out[5]:
[purchase_date: date, device: string ... 1 more field]
In [6]:
%%dataframe
devicesDf.sort("purchase_date")
Out[6]:
purchase_datedeviceprice
2019-01-01notebook600.0
2019-01-20small phone300.0
2019-02-15large phone700.0
2019-02-20camera150.0
2019-03-05small phone100.0
2019-04-01small phone50.0
2019-05-10notebook1200.0
2019-07-01camera300.0

average value per group

In [7]:
%%dataframe
:paste

devicesDf
    .withColumn("average_price_in_group", mean("price") over Window.partitionBy("device"))
Out[7]:
purchase_datedevicepriceaverage_price_in_group
2019-02-15large phone700.0700.0
2019-03-05small phone100.0150.0
2019-01-20small phone300.0150.0
2019-04-01small phone50.0150.0
2019-01-01notebook600.0900.0
2019-05-10notebook1200.0900.0
2019-02-20camera150.0225.0
2019-07-01camera300.0225.0

where row is largest in group

In [22]:
%%dataframe
:paste

devicesDf.withColumn("max_price_in_group", max("price") over Window.partitionBy("device"))
Out[22]:
purchase_datedevicepricemax_price_in_group
2019-02-15large phone700.0700.0
2019-03-05small phone100.0300.0
2019-01-20small phone300.0300.0
2019-04-01small phone50.0300.0
2019-01-01notebook600.01200.0
2019-05-10notebook1200.01200.0
2019-02-20camera150.0300.0
2019-07-01camera300.0300.0
In [19]:
%%dataframe
:paste

devicesDf
    .withColumn("max_price_in_group", max("price") over Window.partitionBy("device"))
    .filter($"price" === $"max_price_in_group")
Out[19]:
purchase_datedevicepricemax_price_in_group
2019-02-15large phone700.0700.0
2019-01-20small phone300.0300.0
2019-05-10notebook1200.01200.0
2019-07-01camera300.0300.0

where row is most recent value in group

In [16]:
%%dataframe
:paste

devicesDf
    .withColumn("most_recent_purchase_in_group", max("purchase_date") over Window.partitionBy("device"))
Out[16]:
purchase_datedevicepricemost_recent_purchase_in_group
2019-02-15large phone700.02019-02-15
2019-03-05small phone100.02019-04-01
2019-01-20small phone300.02019-04-01
2019-04-01small phone50.02019-04-01
2019-01-01notebook600.02019-05-10
2019-05-10notebook1200.02019-05-10
2019-02-20camera150.02019-07-01
2019-07-01camera300.02019-07-01
In [17]:
%%dataframe
:paste

devicesDf
    .withColumn("most_recent_purchase_in_group", max("purchase_date") over Window.partitionBy("device"))
    .filter($"purchase_date" === $"most_recent_purchase_in_group")
Out[17]:
purchase_datedevicepricemost_recent_purchase_in_group
2019-02-15large phone700.02019-02-15
2019-04-01small phone50.02019-04-01
2019-05-10notebook1200.02019-05-10
2019-07-01camera300.02019-07-01

get median value

median = percentile 50

what's the lowest price over percentile 50?

In [10]:
%%dataframe
:paste
devicesDf
    .withColumn("percentile", percent_rank() over Window.orderBy("price"))
Out[10]:
purchase_datedevicepricepercentile
2019-04-01small phone50.00.0
2019-03-05small phone100.00.14285714285714285
2019-02-20camera150.00.2857142857142857
2019-01-20small phone300.00.42857142857142855
2019-07-01camera300.00.42857142857142855
2019-01-01notebook600.00.7142857142857143
2019-02-15large phone700.00.8571428571428571
2019-05-10notebook1200.01.0
In [11]:
%%dataframe
:paste
devicesDf
    .withColumn("percentile", percent_rank() over Window.orderBy("price"))
    .filter($"percentile" >= 0.5)
    .limit(1)
Out[11]:
purchase_datedevicepricepercentile
2019-01-01notebook600.00.7142857142857143

get percentile

what's the lowest price over percentile 85?

In [12]:
%%dataframe
:paste
devicesDf
    .withColumn("percentile", percent_rank() over Window.orderBy("price"))
Out[12]:
purchase_datedevicepricepercentile
2019-04-01small phone50.00.0
2019-03-05small phone100.00.14285714285714285
2019-02-20camera150.00.2857142857142857
2019-01-20small phone300.00.42857142857142855
2019-07-01camera300.00.42857142857142855
2019-01-01notebook600.00.7142857142857143
2019-02-15large phone700.00.8571428571428571
2019-05-10notebook1200.01.0
In [13]:
%%dataframe
:paste
devicesDf
    .withColumn("percentile", percent_rank() over Window.orderBy("price"))
    .filter($"percentile" >= 0.85)
    .limit(1)
Out[13]:
purchase_datedevicepricepercentile
2019-02-15large phone700.00.8571428571428571

cumulative sum

cumulative sum requires an ordered window

In [14]:
%%dataframe
:paste
devicesDf
    .withColumn("cumulative_sum", sum("price") over Window.orderBy("purchase_date"))
Out[14]:
purchase_datedevicepricecumulative_sum
2019-01-01notebook600.0600.0
2019-01-20small phone300.0900.0
2019-02-15large phone700.01600.0
2019-02-20camera150.01750.0
2019-03-05small phone100.01850.0
2019-04-01small phone50.01900.0
2019-05-10notebook1200.03100.0
2019-07-01camera300.03400.0

get row number

row_number requires an ordered window

In [15]:
%%dataframe
:paste
devicesDf
    .withColumn("row_number", row_number() over Window.orderBy("purchase_date"))
Out[15]:
purchase_datedevicepricerow_number
2019-01-01notebook600.01
2019-01-20small phone300.02
2019-02-15large phone700.03
2019-02-20camera150.04
2019-03-05small phone100.05
2019-04-01small phone50.06
2019-05-10notebook1200.07
2019-07-01camera300.08