I decided to build a repository of news headlines: I loaded all 'New York Times' headlines since the year 2000 and all Business related News from the 'Guardian' into a Sorl Search engine. More details can be found in my prior blog.
It has never been the intention to process all documents in one run but the goal was to search for the relevant articles with the help of the search engine and then process only the relevant headlines.
Out of curosity however, I investigated the performance of different alternatives to access all the data. In the examples that you can find below, I just try to count all entries!
I was looking at the following alternatives:
I don't have a clustered environement and everything is containerized in Docker on a simple Intel NUC with an Intel(R) Core(TM) i3-6100U CPU @ 2.30GHz, 4 cores.
I am installing all components that will be used in this document with the help of Maven
%classpath config resolver maven-public1 http://software.pschatzmann.ch/repository/maven-public/
%%classpath add mvn
ch.pschatzmann:news-digest:LATEST
org.apache.spark:spark-sql_2.11:2.4.0
Added new repo: maven-public1
import ch.pschatzmann.news._
import ch.pschatzmann.news._
%%time
import scala.collection.JavaConverters._
val store = new SolrDocumentStore()
var it = store.stream("publisher_t:(guardian,nytimes)")
.iterator().asScala
(it take 20).toSeq
CPU times: user 0 ns, sys: 469 µs, total: 469 µs Wall Time: 3 s
[[In brief, Now we are getting down to business, Y2K force outstrips single currency operation, Big Macs, small horizons, New York ends millennium with record-breaking figures, Can the dot.com delirium survive the new century?, Only two cheers for capitalism, I'm afraid, Telecoms hostilities heighten, Europeans press for £1bn missiles, Union seeks Kitemark for partnerships, Monday briefing, Lloyd's site falls victim to hacker, Electrolux hit by £18m trading scam, City briefing, Thus may escape telecom reckoning, Movers, Sale of the centuries but festive retail fortunes are mixed, Wickes on the mend as it adds 'calibre', Focus, Thomson sets sights on Racal]]
Next we process all available data:
%%time
import scala.collection.JavaConverters._
val store = new SolrDocumentStore()
var allValues = store.stream("publisher_t:(guardian,nytimes)")
.iterator().asScala
.toSeq
allValues.size
CPU times: user 0 ns, sys: 235 µs, total: 235 µs Wall Time: 1680 s
2164417
Now we render 1 for each entry which is then totaled up to get the number of records
%%time
var count = store.stream("publisher_t:(guardian,nytimes)")
.iterator().asScala
.map(i => 1)
.sum
count
CPU times: user 0 ns, sys: 252 µs, total: 252 µs Wall Time: 1614 s
2164417
And we can also return the size directly as result
%%time
import scala.collection.JavaConverters._
val store = new SolrDocumentStore()
var count = store.stream("publisher_t:(guardian,nytimes)")
.iterator().asScala
.toSeq
.size
count
CPU times: user 0 ns, sys: 232 µs, total: 232 µs Wall Time: 1595 s
2164417
Solr is providing some stream processing functionality which is more efficient then the regular paging. Here is some functionality which is using the solr stream processing:
The sequential processing of all 2.1 Mio records is taking around half an hour and it does not really matter a lot, how you implement the counting!
Initially I just provided the result as a Java Stream. In order to support paralell processing, I extended the interface to return PagedDocuments. This basically just represents a page number for each paged document that is only evalated when the user requests the related documents:
%%time
val store = new SolrDocumentStore()
store.pagedDocuments("publisher_t:(guardian,nytimes)")
CPU times: user 0 ns, sys: 276 µs, total: 276 µs Wall Time: 1 s
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216]
Here is the logic which is based on pagedDocuments. We covert the returned paged documents list into a parallel Scala list, determine the size of each page and get the sum of all sizes:
%%time
import scala.collection.JavaConverters._
val store = new SolrDocumentStore()
var count = store.pagedDocuments("publisher_t:(guardian,nytimes)")
.asScala.par
.map(pagedDocuments => pagedDocuments.values().size)
.sum
count
CPU times: user 0 ns, sys: 266 µs, total: 266 µs Wall Time: 1228 s
2164423
...or as an alternative we use flatMap before counting
%%time
import scala.collection.JavaConverters._
val store = new SolrDocumentStore()
var count = store.pagedDocuments("publisher_t:(guardian,nytimes)")
.asScala.toSeq.par
.map(pagedDocuments => pagedDocuments.values().asScala.toSeq)
.flatMap(list => list)
.size
count
CPU times: user 0 ns, sys: 350 µs, total: 350 µs Wall Time: 1284 s
2164423
It is possible to process 2.1 Mio documents in roughly 26 minutes. By using parallel collections we can even speed up the processing to 20 minutes - but at the cost of a much higher cpu usage.
My initial expectation was, that it does not really make a lot of sense to use Spark w/o a cluster. But I wanted to get a feeling of how local Spark compares to local Scala.
So first we create a Spark session.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("SolrQuery")
.master("local[*]")
.config("spark.ui.enabled", "false")
.getOrCreate()
val sc = spark.sparkContext
org.apache.spark.SparkContext@429ee51c
We parallelize the Sequence which has been constructed outside of Spark and perform the count in Spark
%%time
import scala.collection.JavaConverters._
import spark.implicits._
val store = new SolrDocumentStore()
val rdd = sc.parallelize(store.stream("publisher_t:(guardian,nytimes)").iterator.asScala.toSeq)
rdd.count
CPU times: user 0 ns, sys: 481 µs, total: 481 µs Wall Time: 1807 s
org.apache.spark.sql.SparkSession$implicits$@3f8bb431
We parallelize the lazy pages and sum the returnd page sizes. This logic supports parallel fetching of data from Solr
%%time
import scala.collection.JavaConverters._
val store = new SolrDocumentStore()
val count = sc.parallelize(store.pagedDocuments("publisher_t:(guardian,nytimes)").asScala)
.map(page => page.values.size)
.sum
count
CPU times: user 0 ns, sys: 918 µs, total: 918 µs Wall Time: 1230 s
2164437.0
In the next version we use flatMap and then perform a count all in Spark.
%%time
import scala.collection.JavaConverters._
val store = new SolrDocumentStore()
val count = sc.parallelize(store.pagedDocuments("publisher_t:(guardian,nytimes)").asScala)
.map(page => page.values.asScala)
.flatMap(l => l)
.count
count
CPU times: user 0 ns, sys: 740 µs, total: 740 µs Wall Time: 1222 s
2164437
And finaly we provide an example which returns the data as Spark Dataframe: We parallelize the lazy pages, then we map the values of each page and we flatten the structure to get an RDD of Document objects. Finally we convert the RDD to a DataFrame.
%%time
import spark.implicits._
import scala.collection.JavaConverters._
import org.apache.spark.rdd.RDD
val store = new SolrDocumentStore()
val rdd = sc.parallelize(store.pagedDocuments("publisher_t:(guardian,nytimes)").asScala)
.map(page => page.values.asScala)
.flatMap(l => l)
val df = spark.createDataFrame(rdd, classOf[Document])
df.show
+--------+--------------------+--------------------+------+--------------------+-------------------+-----------+--------+ |author_t| content_t| id| keys| link_t| publishDate_t|publisher_t| type_t| +--------+--------------------+--------------------+------+--------------------+-------------------+-----------+--------+ | null| In brief|20000101025359-17...| null|https://www.thegu...|2000-01-01 02:53:59| guardian|document| | null|Now we are gettin...|20000101025359-18...|[BB.L]|https://www.thegu...|2000-01-01 02:53:59| guardian|document| | null|Y2K force outstri...|20000101025400+13...| null|https://www.thegu...|2000-01-01 02:54:00| guardian|document| | null|Big Macs, small h...|20000101041848+91...| null|https://www.thegu...|2000-01-01 04:18:48| guardian|document| | null|New York ends mil...|20000101120000-10...| null|https://www.thegu...|2000-01-01 12:00:00| guardian|document| | null|Can the dot.com d...|20000102120000+12...| null|https://www.thegu...|2000-01-02 12:00:00| guardian|document| | null|Only two cheers f...|20000102125007+19...| null|https://www.thegu...|2000-01-02 12:50:07| guardian|document| | null|Telecoms hostilit...|20000103010644-13...| null|https://www.thegu...|2000-01-03 01:06:44| guardian|document| | null|Europeans press f...|20000103010645+14...| null|https://www.thegu...|2000-01-03 01:06:45| guardian|document| | null|Union seeks Kitem...|20000103010645-58...| null|https://www.thegu...|2000-01-03 01:06:45| guardian|document| | null| Monday briefing|20000103022640-37...| null|https://www.thegu...|2000-01-03 02:26:40| guardian|document| | null|Lloyd's site fall...|20000103120000+18...| null|https://www.thegu...|2000-01-03 12:00:00| guardian|document| | null|Electrolux hit by...|20000104011623-18...| null|https://www.thegu...|2000-01-04 01:16:23| guardian|document| | null| City briefing|20000104011624+38...| null|https://www.thegu...|2000-01-04 01:16:24| guardian|document| | null|Thus may escape t...|20000105012702+71...| null|https://www.thegu...|2000-01-05 01:27:02| guardian|document| | null| Movers|20000105012702-19...| null|https://www.thegu...|2000-01-05 01:27:02| guardian|document| | null|Sale of the centu...|20000105012703+13...| null|https://www.thegu...|2000-01-05 01:27:03| guardian|document| | null|Wickes on the men...|20000105012703+16...| null|https://www.thegu...|2000-01-05 01:27:03| guardian|document| | null| Focus|20000105012703+68...| null|https://www.thegu...|2000-01-05 01:27:03| guardian|document| | null|Thomson sets sigh...|20000105012703-13...| null|https://www.thegu...|2000-01-05 01:27:03| guardian|document| +--------+--------------------+--------------------+------+--------------------+-------------------+-----------+--------+ only showing top 20 rows CPU times: user 0 ns, sys: 201 µs, total: 201 µs Wall Time: 7 s
org.apache.spark.sql.SparkSession$implicits$@5dcc44b8