Nessie Spark3 Demo with Deltalake

This demo showcases how to use Nessie Python API along with Spark3 with Deltalake

Initialize Pyspark + Nessie environment

To get started, we will first have to do a few setup steps that give us everything we need to get started with Nessie. The nessiedemo lib configures all required dependencies, downloads and configures Spark. In case you're interested in the detailed setup steps for Spark, you can check out the docs or also directly have a look into the source code of the nessiedemo lib here.

In [ ]:
# install the nessiedemo lib, which configures all required dependencies
!pip install nessiedemo
In [ ]:
# Setup the Demo: installs the required Python dependencies, downloads the sample datasets and
# downloads + starts the Nessie-Quarkus-Runner.
from nessiedemo.demo import setup_demo
demo = setup_demo("nessie-0.6-delta-spark3.yml", ["nba"])

# This is separate, because NessieDemo.prepare() via .start() implicitly installs the required dependencies.
# Downloads Spark and sets up SparkSession, SparkContext, JVM-gateway
from nessiedemo.delta_spark import delta_spark_for_demo
spark, sc, jvm, demo_delta_spark = delta_spark_for_demo(demo, spark_version=3)

Set up Nessie branches (via Nessie CLI)

Once all dependencies are configured, we can get started with Nessie by using the Nessie CLI, which allows interaction with Nessie branches and tables. We'll get started with the following steps:

  • Create a new branch named dev
  • List all branches

It is worth mentioning that we don't have to explicitly create a main branch, since it's the default branch.

In [ ]:
# Create a new `dev` branch
!nessie branch dev

# Switch Spark session to the `dev` branch, which will be used later on
demo_delta_spark.change_ref("dev")
In [ ]:
# List all branches with their revisions. We should see the `main` & `dev` branches.
!nessie --verbose branch

Create tables under dev branch

Once we created the dev branch and verified that it exists, we can use the spark_dev session (that points to the dev branch) to create some tables and add some data.

We create two tables under the dev branch using the spark_dev session:

  • salaries
  • totals_stats
In [ ]:
# Load the dataset
dataset = demo.fetch_dataset("nba")

# Creating `salaries` table
spark.sql("CREATE TABLE IF NOT EXISTS nessie_nba_salaries (Season STRING, Team STRING, Salary STRING, Player STRING) "
          "USING delta LOCATION '{}'".format(demo_delta_spark.table_path("nessie_nba_salaries")))
salaries_df = spark.read.csv(dataset["salaries.csv"], header=True)
salaries_df.write.option('hadoop.nessie.ref', 'dev').format("delta")\
    .mode("overwrite").save(demo_delta_spark.table_path("nessie_nba_salaries"))

# Creating `totals_stats` table
spark.sql("CREATE TABLE IF NOT EXISTS nessie_nba_totals_stats ("
          "Season STRING, Age STRING, Team STRING, ORB STRING, DRB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING, "
          "TOV STRING, PTS STRING, Player STRING, RSorPO STRING) "
          "USING delta LOCATION '{}'".format(demo_delta_spark.table_path("nessie_nba_totals_stats")))
totals_stats_df = spark.read.csv(dataset["totals_stats.csv"], header=True)
totals_stats_df.write.option('hadoop.nessie.ref', 'dev').format("delta")\
    .mode("overwrite").save(demo_delta_spark.table_path("nessie_nba_totals_stats"))
In [ ]:
# The `spark` session points to the `dev` branch.
# Unlike Iceberg, Delta does not support referencing other Nessie references (branches, tags) using the `@reference` syntax.

spark.sql("select * from nessie_nba_salaries").toPandas()

Check generated tables

Since we have been working solely on the dev branch, where we created 2 tables and added some data, let's verify that the main branch was not altered by our changes.

In [ ]:
# There are no tables on the `main` branch
!nessie contents --list
In [ ]:
# We should see the `salaries` & `totals_stats` tables on the dev branch
!nessie contents --list --ref dev

We can also verify that the dev and main branches point to different commits

In [ ]:
# List all branches with their revisions, where the revision of `main` should be different from `dev`
!nessie --verbose branch

Dev promotion into main

Once we are done with our changes on the dev branch, we would like to merge those changes into main. We merge dev into main via the Nessie CLI. Both branches should be at the same revision after merging/promotion.

In [ ]:
# Merge `dev` into `main`
!nessie merge dev -b main --force
In [ ]:
# List all branches with their revisions, where the revision of main=dev
!nessie --verbose branch

Create etl branch

In this section we'll be simulating what a nightly ETL job might do in terms of changes.

  • Create a branch etl out of main
  • add data to salaries
  • alter the schema of totals_stats
  • create table allstar_games_stats
  • query the tables in etl
  • query the tables in main
  • promote etl branch to main
In [ ]:
# Create the `etl` branch based on `main`
!nessie branch etl main

# Switch Spark session to the `etl` branch, which will be used later on
demo_delta_spark.change_ref("etl")
In [ ]:
# add some salaries for Kevin Durant
from pyspark.sql import Row
Salary = Row("Season", "Team", "Salary", "Player")
kevin_durant = spark.createDataFrame([
    Salary("2017-18", "Golden State Warriors", "$25000000", "Kevin Durant"),
    Salary("2018-19", "Golden State Warriors", "$30000000", "Kevin Durant"),
    Salary("2019-20", "Brooklyn Nets", "$37199000", "Kevin Durant"),
    Salary("2020-21", "Brooklyn Nets", "$39058950", "Kevin Durant")])
kevin_durant.write.option('hadoop.nessie.ref', 'etl').format("delta")\
    .mode("append").save(demo_delta_spark.table_path("nessie_nba_salaries"))
In [ ]:
# Creating `allstar_games_stats` table and viewing the contents
spark.sql("CREATE TABLE IF NOT EXISTS nessie_nba_allstar_games_stats ("
          "Season STRING, Age STRING, Team STRING, ORB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING, TOV STRING, "
          "PF STRING, PTS STRING, Player STRING) "
          "USING delta LOCATION '{}'".format(demo_delta_spark.table_path("nessie_nba_allstar_games_stats")))
allstar_games_stats_df = spark.read.csv(dataset["allstar_games_stats.csv"], header=True)
allstar_games_stats_df.write.option('hadoop.nessie.ref', 'etl').format("delta")\
    .mode("overwrite").save(demo_delta_spark.table_path("nessie_nba_allstar_games_stats"))

spark.sql("select * from nessie_nba_allstar_games_stats").toPandas()
In [ ]:
# Since we have been working on the `etl` branch, the `allstar_games_stats` table is not on the `main` branch
!nessie contents --list
In [ ]:
# We should see `allstar_games_stats` on the `etl` branch
!nessie contents --list --ref etl
In [ ]:
# Now merge the `etl` branch into `main`
!nessie merge etl -b main --force
In [ ]:
# The `etl` and `main` branch should have the same revision
!nessie --verbose branch

Create experiment branch

As a data analyst we might want to carry out some experiments with some data, without affecting main in any way. As in the previous examples, we can just get started by creating an experiment branch off of main and carry out our experiment, which could consist of the following steps:

  • drop totals_stats table
  • add data to salaries table
  • compare experiment and main tables
In [ ]:
# Create the `experiment` branch from `main`
!nessie branch experiment main

# Switch Spark session to the `experiment` branch
demo_delta_spark.change_ref("experiment")
In [ ]:
# Drop the `totals_stats` table on the `experiment` branch
spark.sql("DROP TABLE IF EXISTS nessie_nba_totals_stats")
In [ ]:
# add some salaries for Dirk Nowitzki
Salary = Row("Season", "Team", "Salary", "Player")
dirk_nowitzki = spark.createDataFrame([
    Salary("2015-16", "Dallas Mavericks", "$8333333", "Dirk Nowitzki"),
    Salary("2016-17", "Dallas Mavericks", "$25000000", "Dirk Nowitzki"),
    Salary("2017-28", "Dallas Mavericks", "$5000000", "Dirk Nowitzki"),
    Salary("2018-19", "Dallas Mavericks", "$5000000", "Dirk Nowitzki")])
dirk_nowitzki.write.option('hadoop.nessie.ref', 'experiment').format("delta")\
    .mode("append").save(demo_delta_spark.table_path("nessie_nba_salaries"))
In [ ]:
# We should see the salaries and `allstar_games_stats` tables only (since we just dropped `totals_stats`)
!nessie contents --list --ref experiment
In [ ]:
# `main` hasn't been changed and still has the `totals_stats` table
!nessie contents --list

Let's take a look at the contents of the salaries table on the experiment branch.

In [ ]:
# Switch to the `experiment` branch.
demo_delta_spark.change_ref("experiment")
spark.sql("select count(*) from nessie_nba_salaries").toPandas()

Now compare to the contents of the salaries table on the main branch.

In [ ]:
# Switch back to the `main` branch.
demo_delta_spark.change_ref("main")
spark.sql("select count(*) from nessie_nba_salaries").toPandas()
In [ ]:
 
In [ ]: