Basic Spark Example

Almond comes with a Spark integration module called almond-spark, which allows you to connect to a Spark cluster and to run Spark calculations interactively from a Jupyter notebook.

It is based on ammonite-spark, adding Jupyter specific features such as progress bars and cancellation for running Spark computations.

ammonite-spark handles loading Spark in a clever way, and does not rely on a specific Spark distribution. Because of that, you can use it with any Spark 2.x version. The only limitation is that the Scala version of Spark and the running Almond kernel must match, so make sure your kernel uses the same Scala version as your Spark cluster. Spark 2.0.x - 2.3.x requires Scala 2.11. Spark 2.4.x supports both Scala 2.11 and 2.12.

For more information, see the README of ammonite-spark.

To use it, just import Spark 2.x, the almond-spark dependency will be added automatically.

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.3` // Or use any other 2.x version here
// import $ivy.`sh.almond::almond-spark:_` // Added automatically on importing Spark

import org.apache.spark.sql._
Out[1]:
import $ivy.$                                   // Or use any other 2.x version here

import $ivy.$                              


import org.apache.spark.sql._

Usually you want to disable logging in order to avoid polluting your cell outputs.

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
Out[2]:
import org.apache.log4j.{Level, Logger}

Then create a SparkSession using the NotebookSparkSessionBuilder provided by almond-spark.

Running in local mode

This will run Spark in the same JVM as your kernel.

In [3]:
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}
Loading spark-stubs
Getting spark JARs
Creating SparkSession
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Out[3]:
spark: SparkSession = [email protected]

When running this, you should see that the cell output contains a link to the Spark UI.

Note the use of NotebookSparkSession.builder(), instead of SparkSession.builder() that you would use when e.g. writing a Spark job.

The builder returned by NotebookSparkSession.builder() extends the one of SparkSession.builder(), so that you can call .appName("foo"), .config("key", "value"), etc. on it.

Connecting to a Real Cluster

Of course you can also connect to a real Spark cluster. ammonite-spark currently supports standalone and yarn clusters. Mesos and Kubernetes aren't supported yet. See the ammonite-spark README for details.

Using with a Standalone Cluster

Simply set the master to spark://… when building the session, e.g.

In [3]:
// val spark = {
//   NotebookSparkSession.builder()
//     .master("spark://localhost:7077")
//     .config("spark.executor.instances", "4")
//     .config("spark.executor.memory", "2g")
//     .getOrCreate()
// }

Make sure the version of Spark used to start the master and executors matches the one loaded in the notebook session (via e.g. import $ivy.`org.apache.spark::spark-sql:X.Y.Z`), and that the machine running the kernel can access / is accessible from all nodes of the standalone cluster.

Using with a YARN Cluster

Set the master to "yarn" when building the session, e.g.

In [3]:
// val spark = {
//   NotebookSparkSession.builder()
//     .master("yarn")
//     .config("spark.executor.instances", "4")
//     .config("spark.executor.memory", "2g")
//     .getOrCreate()
// }

Ensure the configuration directory of the cluster is set in HADOOP_CONF_DIR or YARN_CONF_DIR in the environment, or is available at /etc/hadoop/conf. This directory should contain files like core-site.xml, hdfs-site.xml, … Ensure also that the machine you run Ammonite on can indeed act as the driver (it should have access to and be accessible from the YARN nodes, etc.).

Now that we have a SparkSession, we can get a SparkContext from it run Spark calculations.

In [4]:
def sc = spark.sparkContext
Out[4]:
defined function sc

And then create an RDD and run some calculations.

In [5]:
val rdd = sc.parallelize(1 to 100000000, 100)
Out[5]:
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at cmd4.sc:1
In [6]:
val n = rdd.map(_ + 1).sum()
sum at cmd5.sc:1
100 / 100
Out[6]:
n: Double = 5.00000015E15

When you execute a Spark action like sum you should see a progress bar, showing the progress of the running Spark job. If you're using the Jupyter classic UI, you can also click on (kill) to cancel the job.

In [7]:
val n = rdd.map(n => (n % 10, n)).reduceByKey(_ + _).collect()
map at cmd6.sc:1
100 / 100
collect at cmd6.sc:1
100 / 100
Out[7]:
n: Array[(Int, Int)] = Array(
  (0, 1432236160),
  (1, 1342236160),
  (2, 1352236160),
  (3, 1362236160),
  (4, 1372236160),
  (5, 1382236160),
  (6, 1392236160),
  (7, 1402236160),
  (8, 1412236160),
  (9, 1422236160)
)

Syncing Dependencies

If extra dependencies are loaded, via import $ivy.`…` after the SparkSession has been created, you should call NotebookSparkSession.sync() for the newly added JARs to be passed to the Spark executors.

In [8]:
import $ivy.`org.typelevel::cats-core:1.6.0`

NotebookSparkSession.sync() // cats should be available on workers
Out[8]:
import $ivy.$                               


res7_1: SparkSession = [email protected]

Datasets and Dataframes

If you try to create a Dataset or a Dataframe from some data structure containing a case class and you're getting an org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class ... when calling .toDS/.toDF, try the following workaround:

Add org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) in the same cell where you define case classes involved.

In [9]:
import spark.implicits._

org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);

case class Person(id: String, value: Int)

val ds = List(Person("Alice", 42), Person("Bob", 43), Person("Charlie", 44)).toDS
Out[9]:
import spark.implicits._


defined class Person
ds: Dataset[Person] = [id: string, value: int]

This workaround won't be neccessary anymore in future Spark versions.

Rich Display of Datasets and Dataframes

As of now, almond-spark doesn't include native rich display capabilities for Datasets and Dataframes. So by default, we only have ascii rendering of tables.

In [10]:
ds.show()
+-------+-----+
|     id|value|
+-------+-----+
|  Alice|   42|
|    Bob|   43|
|Charlie|   44|
+-------+-----+

It's not too hard to add your own displayer though. Here's an example:

In [11]:
// based on a snippet by Ivan Zaitsev
// https://github.com/almond-sh/almond/issues/180#issuecomment-364711999
implicit class RichDF(val df: DataFrame) {
  def showHTML(limit:Int = 20, truncate: Int = 20) = {
    import xml.Utility.escape
    val data = df.take(limit)
    val header = df.schema.fieldNames.toSeq
    val rows: Seq[Seq[String]] = data.map { row =>
      row.toSeq.map { cell =>
        val str = cell match {
          case null => "null"
          case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
          case array: Array[_] => array.mkString("[", ", ", "]")
          case seq: Seq[_] => seq.mkString("[", ", ", "]")
          case _ => cell.toString
        }
        if (truncate > 0 && str.length > truncate) {
          // do not show ellipses for strings shorter than 4 characters.
          if (truncate < 4) str.substring(0, truncate)
          else str.substring(0, truncate - 3) + "..."
        } else {
          str
        }
      }: Seq[String]
    }

    publish.html(s"""
      <table class="table">
        <tr>
        ${header.map(h => s"<th>${escape(h)}</th>").mkString}
        </tr>
        ${rows.map { row =>
          s"<tr>${row.map { c => s"<td>${escape(c)}</td>" }.mkString}</tr>"
        }.mkString
        }
      </table>""")
  }
}
Out[11]:
defined class RichDF
In [12]:
ds.toDF.showHTML()
idvalue
Alice42
Bob43
Charlie44