%classpath config resolver maven-public1 http://nuc.local:8081/repository/maven-public/
%%classpath add mvn
ch.pschatzmann:news-digest:LATEST
org.apache.spark:spark-sql_2.11:2.4.0
org.vegas-viz:vegas_2.11:LATEST
org.vegas-viz:vegas-spark_2.11:LATEST
Added new repo: maven-public1
... and we import the relevant packages or Classes.
import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import vegas._
import vegas.render.StaticHTMLRenderer
import vegas.DSL.ExtendedUnitSpecBuilder
import vegas.sparkExt._
import jupyter.Displayer;
import jupyter.Displayers;
import ch.pschatzmann.news._
import scala.collection.JavaConverters._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import vegas._ import vegas.render.StaticHTMLRenderer import vegas.DSL.ExtendedUnitSpecBuilder import vegas.sparkExt._ import jupyter.Displayer import jupyter.Displayers import ch.pschatzmann.news._
We deinfe a minumum higth for the charts via CSS
%%html
<style>
#frame-myID {
min-height: 300pt;
}
</style>
...and we define and register a Displayer for the Vegas Charts.
import scala.collection.JavaConverters._
class VegasDisplayer[ExtendedUnitSpecBuilder] extends Displayer[ExtendedUnitSpecBuilder] {
def display(plot:ExtendedUnitSpecBuilder):java.util.Map[String, String] = {
var json = plot.asInstanceOf[vegas.DSL.SpecBuilder].toJson
val str = StaticHTMLRenderer(json).frameHTML("myID")
return Map("text/html" -> str).asJava
}
}
Displayers.register(classOf[ExtendedUnitSpecBuilder],new VegasDisplayer())
null
and we create a Spark session.
val spark = SparkSession.builder()
.appName("SolrQuery")
.master("local[*]")
.config("spark.ui.enabled", "false")
.getOrCreate()
val sc = spark.sparkContext
org.apache.spark.SparkContext@39d5d2e
We can load the data form Solr into a Spark Dataframe. We just display the first 10 records to confirm that the functionality is working.
%%time
import spark.implicits._
import scala.collection.JavaConverters._
val store = new SolrDocumentStore()
val rdd = sc.parallelize(store.pagedDocuments("publisher_t:(nytimes, guardian)").asScala)
.map(page => page.values.asScala)
.flatMap(l => l)
val df = spark.createDataFrame(rdd, classOf[Document])
df.show(10)
+--------+--------------------+--------------------+------+--------------------+-------------------+-----------+--------+ |author_t| content_t| id| keys| link_t| publishDate_t|publisher_t| type_t| +--------+--------------------+--------------------+------+--------------------+-------------------+-----------+--------+ | null| In brief|20000101025359-17...| null|https://www.thegu...|2000-01-01 02:53:59| guardian|document| | null|Now we are gettin...|20000101025359-18...|[BB.L]|https://www.thegu...|2000-01-01 02:53:59| guardian|document| | null|Y2K force outstri...|20000101025400+13...| null|https://www.thegu...|2000-01-01 02:54:00| guardian|document| | null|Big Macs, small h...|20000101041848+91...| null|https://www.thegu...|2000-01-01 04:18:48| guardian|document| | null|New York ends mil...|20000101120000-10...| null|https://www.thegu...|2000-01-01 12:00:00| guardian|document| | null|Can the dot.com d...|20000102120000+12...| null|https://www.thegu...|2000-01-02 12:00:00| guardian|document| | null|Only two cheers f...|20000102125007+19...| null|https://www.thegu...|2000-01-02 12:50:07| guardian|document| | null|Telecoms hostilit...|20000103010644-13...| null|https://www.thegu...|2000-01-03 01:06:44| guardian|document| | null|Europeans press f...|20000103010645+14...| null|https://www.thegu...|2000-01-03 01:06:45| guardian|document| | null|Union seeks Kitem...|20000103010645-58...| null|https://www.thegu...|2000-01-03 01:06:45| guardian|document| +--------+--------------------+--------------------+------+--------------------+-------------------+-----------+--------+ only showing top 10 rows CPU times: user 0 ns, sys: 343 µs, total: 343 µs Wall Time: 7 s
org.apache.spark.sql.SparkSession$implicits$@36d7fd4a
We use the data frame in order to group the data by year-month and publisher and calculate the count of records:
%%time
val byMonth = df.withColumn("month", expr("substring(publishDate_t, 1, 7)"))
.groupBy("month","publisher_t").count()
.orderBy("month","publisher_t")
byMonth.persist.show(10)
+-------+-----------+-----+ | month|publisher_t|count| +-------+-----------+-----+ |2000-01| guardian| 428| |2000-01| nytimes| 7865| |2000-02| guardian| 521| |2000-02| nytimes| 7367| |2000-03| guardian| 588| |2000-03| nytimes| 7863| |2000-04| guardian| 493| |2000-04| nytimes| 7621| |2000-05| guardian| 509| |2000-05| nytimes| 7686| +-------+-----------+-----+ only showing top 10 rows CPU times: user 0 ns, sys: 297 µs, total: 297 µs Wall Time: 1234 s
null
The dataframe can be fed directly into Vegas in order to create a Barchart
%%time
val areaChart = Vegas()
.withDataFrame(byMonth)
.mark(Bar)
.configCell(height = 300)
.encodeX("month", Ordinal)
.encodeY("count", Quantitative, AggOps.Sum, enableBin=false)
.encodeColor(field="publisher_t", dataType=Nominal)
CPU times: user 0 ns, sys: 280 µs, total: 280 µs Wall Time: 2 s
We can produce the barchart by year with the same approach
%%time
val byYear = byMonth.withColumn("year", expr("substring(month, 0, 4)"))
.groupBy("year","publisher_t").sum("count")
.orderBy("year","publisher_t")
val areaChart = Vegas()
.withDataFrame(byYear)
.mark(Bar)
.configCell(height = 300)
.encodeX("year", Ordinal)
.encodeY("sum(count)", Quantitative, AggOps.Sum, enableBin=false)
.encodeColor(field="publisher_t", dataType=Nominal)
CPU times: user 0 ns, sys: 329 µs, total: 329 µs Wall Time: 5 s