Here are some examples of how to use Spark to examine successful logins from a simple log. The goal is to get a light idea of what Spark can do.
Pre-requisite: Prior knowledge of what a Resilient Distributed Dataset (RDD) means. A good reference article by Vishnu Viswanath is available at http://datalakes.com/blogs/rdds-simplified/
Since Spark is to be used, let's start by "paralellize-ing" our data. This means data will be submitted, as a list, to a "spark context" for manipulations. More specifically, the data will be distributed across spark nodes as an RDD.
Note that a 'case case' helps to identify each login as a data point; a data point is a simple login item.
case class Login (loginId: Long, userId: Long, loginDate: String, name: String)
val allLogins = sc.parallelize(List(
Login( 0, 50, "2016-02-01", "Joe Apple"),
Login( 1, 50, "2016-02-01", "Joe Apple"),
Login( 2, 55, "2016-02-01", "Jane Apple"),
Login( 3, 60, "2016-02-01", "Bob Banana"),
Login( 4, 50, "2016-02-02", "Joe Apple"),
Login( 5, 55, "2016-02-02", "Jane Apple"),
Login( 6, 65, "2016-02-02", "Pete Pear"),
Login( 7, 50, "2016-02-03", "Joe Apple"),
Login( 8, 55, "2016-02-03", "Jane Apple"),
Login( 9, 70, "2016-02-03", "Candy Coconut"),
Login(10, 55, "2016-02-03", "Jane Apple"),
Login(11, 60, "2016-02-03", "Bob Banana"),
Login(12, 65, "2016-02-03", "Pete Pear"),
Login(13, 70, "2016-02-03", "Candy Coconut"),
Login(14, 70, "2016-02-04", "Candy Coconut")
))
It can be noted that there are other several ways to get paralleized RDDs. The spark context can load a directory's set of text files line-by-line with sc.textFile() or the whole file (not line-by-line) with sc.wholeTextFile(). Another means for loading data, from sequenced data, into an RDD is to use sequenceFile().
There are a few simple methods to help explore data. Here are some of these methods.
The count method shows how many login items are in this list. Sanity check passes.
allLogins.count
15
The take method shows pulls the 1st 3 items in this list.
allLogins.take(3)
Array(Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(2,55,2016-02-01,Jane Apple))
The collect method shows pulls the all the items in this list. The Spark Shell may only display the first 20 items ... so it's not always possible to see all the data.
allLogins.collect
Array(Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(2,55,2016-02-01,Jane Apple), Login(3,60,2016-02-01,Bob Banana), Login(4,50,2016-02-02,Joe Apple), Login(5,55,2016-02-02,Jane Apple), Login(6,65,2016-02-02,Pete Pear), Login(7,50,2016-02-03,Joe Apple), Login(8,55,2016-02-03,Jane Apple), Login(9,70,2016-02-03,Candy Coconut), Login(10,55,2016-02-03,Jane Apple), Login(11,60,2016-02-03,Bob Banana), Login(12,65,2016-02-03,Pete Pear), Login(13,70,2016-02-03,Candy Coconut), Login(14,70,2016-02-04,Candy Coconut))
The sample method pulls a sampling of login items are in this list. The method may resample previously sampled items. The first parameter, true, means overwrite / remove duplicate samples. The latter parameter is the ratio of samples compared to the full data size. Note: A bit easier on the eyes, methos can be moved to cover multiple lines.
allLogins.
sample(true, 0.3).
collect
Array(Login(1,50,2016-02-01,Joe Apple), Login(6,65,2016-02-02,Pete Pear), Login(8,55,2016-02-03,Jane Apple))
The println method helps print list of logins. Is this really a Spark context function?
allLogins.
collect.
foreach(println)
Login(0,50,2016-02-01,Joe Apple) Login(1,50,2016-02-01,Joe Apple) Login(2,55,2016-02-01,Jane Apple) Login(3,60,2016-02-01,Bob Banana) Login(4,50,2016-02-02,Joe Apple) Login(5,55,2016-02-02,Jane Apple) Login(6,65,2016-02-02,Pete Pear) Login(7,50,2016-02-03,Joe Apple) Login(8,55,2016-02-03,Jane Apple) Login(9,70,2016-02-03,Candy Coconut) Login(10,55,2016-02-03,Jane Apple) Login(11,60,2016-02-03,Bob Banana) Login(12,65,2016-02-03,Pete Pear) Login(13,70,2016-02-03,Candy Coconut) Login(14,70,2016-02-04,Candy Coconut)
The sortby method sorts logins. Using the underscore charcter means take the default item. (This is a Scala syntax 'trick'.)
allLogins.
sortBy(_.name, true).
collect
Array(Login(3,60,2016-02-01,Bob Banana), Login(11,60,2016-02-03,Bob Banana), Login(9,70,2016-02-03,Candy Coconut), Login(13,70,2016-02-03,Candy Coconut), Login(14,70,2016-02-04,Candy Coconut), Login(2,55,2016-02-01,Jane Apple), Login(5,55,2016-02-02,Jane Apple), Login(8,55,2016-02-03,Jane Apple), Login(10,55,2016-02-03,Jane Apple), Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(4,50,2016-02-02,Joe Apple), Login(7,50,2016-02-03,Joe Apple), Login(6,65,2016-02-02,Pete Pear), Login(12,65,2016-02-03,Pete Pear))
Sidebar: The following is the same effective code using a longer explicit syntax version (without the underscore character).
allLogins.
sortBy(login => login.name, true).
collect
Array(Login(3,60,2016-02-01,Bob Banana), Login(11,60,2016-02-03,Bob Banana), Login(9,70,2016-02-03,Candy Coconut), Login(13,70,2016-02-03,Candy Coconut), Login(14,70,2016-02-04,Candy Coconut), Login(2,55,2016-02-01,Jane Apple), Login(5,55,2016-02-02,Jane Apple), Login(8,55,2016-02-03,Jane Apple), Login(10,55,2016-02-03,Jane Apple), Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(4,50,2016-02-02,Joe Apple), Login(7,50,2016-02-03,Joe Apple), Login(6,65,2016-02-02,Pete Pear), Login(12,65,2016-02-03,Pete Pear))
Sidebar (continued): Which way is better? Sometimes it's better to understand code, do shortcuts or explicit syntax solve that issue?
The map method allows the adding or removal of login data to each data point. Putting the map together with print is useful for formatting the data.
allLogins.
sortBy(_.name, true).
map(_.name).
collect.
foreach(println)
Bob Banana Bob Banana Candy Coconut Candy Coconut Candy Coconut Jane Apple Jane Apple Jane Apple Jane Apple Joe Apple Joe Apple Joe Apple Joe Apple Pete Pear Pete Pear
The distinct method allows the remove duplicated data. Here is a bit of a cleaner approach.
allLogins.
sortBy(_.name, true).
map(_.name).
distinct.
collect.
foreach(println)
Candy Coconut Joe Apple Pete Pear Bob Banana Jane Apple
Now that some preliminary items have been covered, it's time to take a deeper look at processing the Spark example login data.
The filter method will selects wanted (or drops unwanted) items from the RDD. Here, filter is applied to keep logins on the date of interest. Collect will gather lazy RDD data by forcing an operation.
allLogins.
filter(_.loginDate == "2016-02-01").
collect
Array(Login(0,50,2016-02-01,Joe Apple), Login(1,50,2016-02-01,Joe Apple), Login(2,55,2016-02-01,Jane Apple), Login(3,60,2016-02-01,Bob Banana))
The map methods, as seen before, will change, or transform, each data point. In the following, items such as userId, loginID, loginDate are dropped from each data point and only name is maintained. By using distinct, only unique names are maintained; duplicates are dropped.
allLogins.
filter(_.loginDate == "2016-02-01").
map(_.name).
distinct.
collect
Array(Jane Apple, Bob Banana, Joe Apple)
The same techique can be used to find userIDs. In this example, the user names cannot be guaranteed to be unique. Therefore, using the user IDs makes more sense than using the user names. Surely, there are several persons named 'Bob Banana' logging in. :)
allLogins.
filter(_.loginDate == "2016-02-01").
map(_.userId).
distinct.
collect
Array(50, 60, 55)
An simple change to the filter method allows 3 dates to remain in the RDD. Fortunatey the date format is easy to use in this case.
allLogins.
filter(login => ("2016-02-01" <= login.loginDate) && (login.loginDate <= "2016-02-03")).
map(_.userId).
distinct.
collect
Array(65, 50, 60, 70, 55)
It would be nice to know the dates as well. Changing the map method to emit a tuple helps show the user and the date.
allLogins.
filter(login => ("2016-02-01" <= login.loginDate) && (login.loginDate <= "2016-02-03")).
map(login => (login.userId, login.loginDate)).
distinct.
collect
Array((50,2016-02-03), (60,2016-02-03), (65,2016-02-03), (55,2016-02-01), (65,2016-02-02), (50,2016-02-01), (70,2016-02-03), (60,2016-02-01), (55,2016-02-03), (50,2016-02-02), (55,2016-02-02))
Changing the map to (1) remove the date, leaving just the user ID, and (2) emit a '1' provides a list of users (by IDs) that have logged in at least once on any of those three days. (The '1' is more like a true that the user logged in at least once and isn't really how many times the user logged in each day.) The produced list is very similar to the list previous step with the dates removed and the '1' added. This step is just an interim step. (See the following step.)
allLogins.
filter(login => ("2016-02-01" <= login.loginDate) && (login.loginDate <= "2016-02-03")).
map(login => (login.userId, login.loginDate)).
distinct.
map(x => (x._1, 1)).
collect
Array((50,1), (60,1), (65,1), (55,1), (65,1), (50,1), (70,1), (60,1), (55,1), (50,1), (55,1))
Building on the step above, use reduceByKey to add each day logged-in for each user(by ID).
allLogins.
filter(login => ("2016-02-01" <= login.loginDate) && (login.loginDate <= "2016-02-03")).
map(login => (login.userId, login.loginDate)).
distinct.
map(x => (x._1, 1)).
reduceByKey(_+_).
collect
Array((65,2), (50,3), (60,2), (70,1), (55,3))
Ok ... Since the requirement is for 3 days of logging in, use the filter method to look for '3' and count how many users met this criteria.
allLogins.
filter(login => ("2016-02-01" <= login.loginDate) && (login.loginDate <= "2016-02-03")).
map(login => (login.userId, login.loginDate)).
distinct.
map(x => (x._1, 1)).
reduceByKey(_+_).
filter(_._2 == 3).
count
2
I hope the this example and information are a helpful introduction to Spark. Spark has many more features -- like streaming, machine learning, data frames (an alternative to Hive & Pig) and Graphx.
There is a part 2 of Vishnu's RDD blog that provides insights on how some of the methods shown above are carried out in RDDs. This can be found at: https://www.linkedin.com/pulse/spark-rdds-simplified-part-2-vishnu-viswanath
Various articles exist on the web. One book that is rather easy to follow is Big Data Analysis with Spark by Mohammed Guller.
Some courses exist on Spark and Big Data. The number of courses and disparate content make finding courses difficult. I found the Galvanize (formerly Zipfian Academy) Data Engineering course helpful in my quest.
I used Asim Jalis's blog, https://github.com/asimjalis/apache-toree-quickstart, to install Toree and write this article.
You can contact me at stephanwarren on ymail.com