Almond comes with a Spark integration module called almond-spark, which allows you to connect to a Spark cluster and to run Spark calculations interactively from a Jupyter notebook.
It is based on ammonite-spark, adding Jupyter specific features such as progress bars and cancellation for running Spark computations.
ammonite-spark handles loading Spark in a clever way, and does not rely on a specific Spark distribution. Because of that, you can use it with any Spark 2.x version. The only limitation is that the Scala version of Spark and the running Almond kernel must match, so make sure your kernel uses the same Scala version as your Spark cluster. Spark 2.0.x - 2.3.x requires Scala 2.11. Spark 2.4.x supports both Scala 2.11 and 2.12.
For more information, see the README of ammonite-spark.
To use it, just import Spark 2.x, the almond-spark dependency will be added automatically.
import $ivy.`org.apache.spark::spark-sql:2.4.3` // Or use any other 2.x version here
// import $ivy.`sh.almond::almond-spark:_` // Added automatically on importing Spark
import org.apache.spark.sql._
import $ivy.$ // Or use any other 2.x version here import $ivy.$ import org.apache.spark.sql._
Usually you want to disable logging in order to avoid polluting your cell outputs.
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
import org.apache.log4j.{Level, Logger}
Then create a SparkSession
using the NotebookSparkSessionBuilder
provided by almond-spark.
This will run Spark in the same JVM as your kernel.
val spark = {
NotebookSparkSession.builder()
.master("local[*]")
.getOrCreate()
}
Loading spark-stubs Getting spark JARs Creating SparkSession
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
spark: SparkSession = org.apache.spark.sql.SparkSession@6d754fd1
When running this, you should see that the cell output contains a link to the Spark UI.
Note the use of NotebookSparkSession.builder()
, instead of SparkSession.builder()
that you would use when e.g. writing a Spark job.
The builder returned by NotebookSparkSession.builder()
extends the one of SparkSession.builder()
,
so that you can call .appName("foo")
, .config("key", "value")
, etc. on it.
Of course you can also connect to a real Spark cluster. ammonite-spark currently supports standalone and yarn clusters. Mesos and Kubernetes aren't supported yet. See the ammonite-spark README for details.
Simply set the master to spark://…
when building the session, e.g.
// val spark = {
// NotebookSparkSession.builder()
// .master("spark://localhost:7077")
// .config("spark.executor.instances", "4")
// .config("spark.executor.memory", "2g")
// .getOrCreate()
// }
Make sure the version of Spark used to start the master and executors matches the one loaded in the notebook session
(via e.g. import $ivy.`org.apache.spark::spark-sql:X.Y.Z`
), and that the machine running the kernel can access / is
accessible from all nodes of the standalone cluster.
Set the master to "yarn"
when building the session, e.g.
// val spark = {
// NotebookSparkSession.builder()
// .master("yarn")
// .config("spark.executor.instances", "4")
// .config("spark.executor.memory", "2g")
// .getOrCreate()
// }
Ensure the configuration directory of the cluster is set in HADOOP_CONF_DIR
or YARN_CONF_DIR
in the environment, or is available at /etc/hadoop/conf
. This directory should contain files like core-site.xml
, hdfs-site.xml
, … Ensure also that the machine you run Ammonite on can indeed act as the driver (it should have access to and be accessible from the YARN nodes, etc.).
Now that we have a SparkSession
, we can get a SparkContext
from it run Spark calculations.
def sc = spark.sparkContext
defined function sc
And then create an RDD
and run some calculations.
val rdd = sc.parallelize(1 to 100000000, 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at cmd4.sc:1
val n = rdd.map(_ + 1).sum()
n: Double = 5.00000015E15
When you execute a Spark action like sum
you should see a progress bar, showing the progress of the running Spark job. If you're using the Jupyter classic UI, you can also click on (kill) to cancel the job.
val n = rdd.map(n => (n % 10, n)).reduceByKey(_ + _).collect()
n: Array[(Int, Int)] = Array( (0, 1432236160), (1, 1342236160), (2, 1352236160), (3, 1362236160), (4, 1372236160), (5, 1382236160), (6, 1392236160), (7, 1402236160), (8, 1412236160), (9, 1422236160) )
If extra dependencies are loaded, via import $ivy.`…`
after the SparkSession
has been created, you should call NotebookSparkSession.sync()
for the newly added JARs to be passed to the Spark executors.
import $ivy.`org.typelevel::cats-core:1.6.0`
NotebookSparkSession.sync() // cats should be available on workers
import $ivy.$ res7_1: SparkSession = org.apache.spark.sql.SparkSession@6d754fd1
If you try to create a Dataset
or a Dataframe
from some data structure containing a case class and you're getting an org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class ...
when calling .toDS
/.toDF
, try the following workaround:
Add org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
in the same cell where you define case classes involved.
import spark.implicits._
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);
case class Person(id: String, value: Int)
val ds = List(Person("Alice", 42), Person("Bob", 43), Person("Charlie", 44)).toDS
import spark.implicits._ defined class Person ds: Dataset[Person] = [id: string, value: int]
This workaround won't be neccessary anymore in future Spark versions.
As of now, almond-spark doesn't include native rich display capabilities for Datasets and Dataframes. So by default, we only have ascii rendering of tables.
ds.show()
+-------+-----+ | id|value| +-------+-----+ | Alice| 42| | Bob| 43| |Charlie| 44| +-------+-----+
It's not too hard to add your own displayer though. Here's an example:
// based on a snippet by Ivan Zaitsev
// https://github.com/almond-sh/almond/issues/180#issuecomment-364711999
implicit class RichDF(val df: DataFrame) {
def showHTML(limit:Int = 20, truncate: Int = 20) = {
import xml.Utility.escape
val data = df.take(limit)
val header = df.schema.fieldNames.toSeq
val rows: Seq[Seq[String]] = data.map { row =>
row.toSeq.map { cell =>
val str = cell match {
case null => "null"
case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
case array: Array[_] => array.mkString("[", ", ", "]")
case seq: Seq[_] => seq.mkString("[", ", ", "]")
case _ => cell.toString
}
if (truncate > 0 && str.length > truncate) {
// do not show ellipses for strings shorter than 4 characters.
if (truncate < 4) str.substring(0, truncate)
else str.substring(0, truncate - 3) + "..."
} else {
str
}
}: Seq[String]
}
publish.html(s"""
<table class="table">
<tr>
${header.map(h => s"<th>${escape(h)}</th>").mkString}
</tr>
${rows.map { row =>
s"<tr>${row.map { c => s"<td>${escape(c)}</td>" }.mkString}</tr>"
}.mkString
}
</table>""")
}
}
defined class RichDF
ds.toDF.showHTML()
id | value |
---|---|
Alice | 42 |
Bob | 43 |
Charlie | 44 |