CS194-16: Introduction to Data Science
Name: Please put your name
Student ID: Please put your student id
In the first part of this assignment, you explored how to recommend movies to a user using a small version of the movies and ratings datasets. In this part of the assignment, you'll first explore how to tune the parallelism of an application running on a cluster. Then, you'll use your findings to run your code from the first part on a cluster of machines and using a larger dataset. You'll use the cluster and larger dataset to predict what movies to recommend to yourself. For many parts of this assignment, you should be able to use the exact code that you wrote in part 1!
__This version of the homework is for reference only!__ Each student has been emailed the URL and password for an iPython notebook server running on the cluster we've setup for you. You should use the iPython notebook at that URL to complete your homework. If you did not receive login information yet or if your login information doesn't work, please post a private note on Piazza ASAP so that we can resolve the issue.
As mentioned during the lab and in part 1, think carefully before calling
collect() on any datasets. When you're using a small, local dataset, calling
collect() and then using Python to analyze the data locally will work fine, but this will not work when you're using a large dataset that doesn't fit in memory on one machine. Solutions that call
collect() and do local analysis that could have been done with Spark will not receive full credit.
As in the first part, we have created a FAQ at the bottom of this page to help with common problems you run into. If you run into a problem, please check the FAQ before posting on Piazza!
As in the lab (and unlike part 1 of the homework), we've already created a
SparkContext for you that's available with the
sc variable name. The code below prints out the URLs of the master UI and the UI for your application.
# We've setup the notebook so that the hostname of the master is saved # as CLUSTER_URL. master_ui_address = "".join(CLUSTER_URL.split("//").split(":")) print "Master UI located at http://%s:8080" % master_ui_address application_ui_address = "http://" + sc.appName + ":4040" print "Application UI located at %s" % application_ui_address
a) Use the
SparkContext to create an RDD for each of the ratings and movies datasets, which are located at
/movielens/large/movies.dat, respectively (as in the lab). Recall that each entry in the ratings dataset is formatted as UserID::MovieID::Rating::Timestamp and each entry in the movies dataset is formatted as MovieID::Title::Genres. Count the entries in each dataset to ensure you've correctly created the RDD; there should be 10000054 entries in the ratings dataset and 10681 entries in the movies dataset.
ratings = # YOUR CODE HERE movies = # YOUR CODE HERE ratings_count = ratings.count() movies_count = movies.count() print "%s ratings and %s movies in the datasets" % (ratings_count, movies_count)
b) As you did in part 1, use the least significant digit of the rating timestamp to separate 60% of the data into a training set, 20% into a validation set, and the remaining 20% into a test set.
### YOUR CODE HERE training = # YOUR CODE HERE validation = # YOUR CODE HERE test = # YOUR CODE HERE print "Training: %s, validation: %s, test: %s" % (training.count(), validation.count(), test.count())
Now the datasets are 10x bigger, so the training set should have about 6 million entries, the validation set should have about 2 million entries, and the test set should have about 2 million entries.
Before analyzing the movies datasets at scale, let's learn a little about how to optimize the performance. In the lab, we saw that the number of partitions that a dataset is broken into can have a big affect on the response time of jobs run on that dataset. In this part of the assignment, you'll determine the optimal number of partitions to break datasets into.
a) In the next part of this exercise, you'll break the ratings dataset into different numbers of partitions, and then call
count(). First, answer some questions to understand how this will work.
How does the number of tasks relate to the number of partitions? Your answer here
When you call
count(), what does each task do? Your answer here
How do you expect the time to count the entries in the
ratingsdataset to vary with the number of partitions? Your answer here (the goal here is to make a meaningful hypothesis -- it doesn't necessarily need to be correct)
d) Create an RDD for the
ratings dataset with different numbers of partitions, and call
count() on the resulting datasets. Using Spark and matplotlib, make a graph showing the response time of
count() as a function of the number of partitions. You're responsible for choosing the number of partitions to use to produce a meaningful graph of how
count()'s runtime depends on the number of partitions.
sc.textFile will never allow you to use fewer than 2 partitions for the
ratings dataset, even if you pass 1 as the number of partitions. This is because the underlying dataset is stored in HDFS (Hadoop Distributed Filesystem) as two files. As a result, you don't need to measure the time for
count() using just 1 partition.
import matplotlib.pyplot as plot # Magic command to make matplotlib and ipython play nicely together. %matplotlib inline ### YOUR CODE HERE
Explain the shape of the plot. How does this differ from the hypothesis you provided in part (a)? Can you explain the difference? Your answer here
What is the optimal number of partitions for this dataset and why? Your answer here
The ultimate goal of this assignment is to predict what movies to recommend to yourself. In order to do that, you'll first need to add ratings for yourself to the
a) First, generate a unique user ID for yourself. Find the highest user ID in the
ratings dataset and use 1 + this ID as your user ID.
highest_existing_user_id = # YOUR CODE HERE my_user_id = highest_existing_user_id + 1 print "My user ID is %s" % my_user_id
b) Next, you need to create a RDD with some ratings for yourself. Create a new RDD
my_ratings_RDD with at least 10 movie ratings. Each entry should be formatted as
(my_user_id, movieID, rating) (i.e., each entry should be formatted in the same way as the
training RDD). As in the original dataset, ratings should be between 1 and 5 (inclusive).
To help you provide ratings for yourself, we've included some code below to list the names and movie IDs of the 50 most-rated movies (we did this in the lab; the code below is replicated from the lab). Select 10 movies that you've seen from this list and use those (along with your own ratings for those movies) to produce the
my_ratings_RDD. If you haven't seen at least 10 of these movies, you can increase the parameter passed to
take() until there are 10 movies that you have seen (or you can also guess what your rating would be for movies you haven't seen).
# Create a dataset of (movieID, number of ratings) pairs. ratings_per_movie = ratings.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y) # Join average_ratings with movies to get a dataset with movie names and average ratings. ratings_with_names = movies.map(lambda x: (x, x)).join(ratings_per_movie) # map transforms ratings_with_names into an RDD where the key is the number of ratings # and the value is a 2-item tuple with (movie name, number of ratings). We reformat the # dataset in this way so that we can use sortByKey to get the most-rated movies. sorted_ratings = ratings_with_names.map(lambda x: (x, (x, x))).sortByKey(False) print "Most rated movies:" for ratings_tuple in sorted_ratings.take(50): print ratings_tuple
my_ratings_RDD = # YOUR CODE HERE # Remember that in general, you shouldn't use collect()! # We use collect here because we know that the RDD with your # ratings is small. print "My movie ratings: ", my_ratings_RDD.collect()
c) Now that you have ratings for yourself, you need to add your ratings to the
training dataset so that the model you train will incorporate your preferences. Spark's
union method combines two RDDs (see documentation here); use
union to create a new training dataset that includes your ratings and the data in the original training dataset.
training_with_my_ratings = # YOUR CODE HERE print ("The training dataset now has %s more entries than the original training dataset" % (training_with_my_ratings.count() - training.count()))
The new training dataset should have at least 10 more entries than it had originally.
a) In this exercise, you'll run your code from part 1 on the larger dataset. First, we need to think a bit more carefully about how we do things to ensure using the larger dataset doesn't take too long on the cluster. Use Spark's
repartition(numPartitions) function (which re-partitions the dataset into the specified number of partitions) to break the
validation datasets into the optimal number of partitions that you found in part 1.
### YOUR CODE HERE
b) As you did in part 1, train models with ranks of 4, 8, 12, and 16 using the
training_with_my_ratings dataset (remember that you should use this dataset, which includes your
ratings, instead of the original
training dataset). Predict the ratings for the
validation dataset, and use the
compute_error function you wrote in part 1 to compute the error. Which rank produces the best model, based on the error on the
Now that you're running on a cluster, you'll want to think carefully about what datasets you cache in memory. Remember that Spark only saves datasets in memory if you explicitly call
cache() on the dataset, like we did in the lab. You can use Spark's UI (located at port 4040 on the machine where this notebook is running) to see what datasets are cached in memory (you may want to use the
setName() function to set a name for RDDs so they can easily be identified in the UI). If you accidentally cache a dataset that you don't need, you can call
unpersist() on the RDD to drop it from the cache.
from pyspark.mllib.recommendation import ALS best_rank = # YOUR CODE HERE print "The best model was trained with rank %s" % best_rank
Which datasets did you decide to cache in memory and why? Your answer here
d) Like you did in part 1, use the best model to predict the ratings on the
test dataset, and compute the RMSE.
test_rmse = # YOUR CODE HERE print "The model had a RMSE on the test set of %s" % test_rmse
e) Our solution code for part 1 got a RMSE of about 1.1 (subject to a small amount of variance, due to randomness in the ALS implementation) on the test set (you may want to use this to verify that your part 1 code was correct!). How does this compare to the RMSE you got here?
Is the test RMSE larger, smaller, or about the same as the RMSE (about 1.1) from part 1? Why? Your answer here
a) So far, we've only used the
predictAll method to compute the error of the model. Here, use the
predictAll to predict what ratings you'd give to the movies that you didn't already provide ratings for. Print out the 10 movies with the highest predicted ratings.
predicted_10_highest_rated_movies = # YOUR CODE HERE # This should print a list of 10 movie names and the associated # predicted ratings. print "My highest rated movies: ", predicted_10_highest_rated_movies
Are the predictions any good? Do you think (or know!) you'd like these movies? Your answer here
Please answer the questions in this form to help us improve the assignments for this class in the future.
__Don't forget to submit your assignment using glookup!__ We will not automatically collect the notebooks on the cluster; you need to copy the notebook back to your local machine and submit it yourself when you're finished with the homework. To save your iPython notebook to your machine from the cluster, go to "File" > "Download as" > "iPython".
Go to "File" > "Download as" > "iPython".
The UI for the Spark master is on port 8080, and the hostname is stored as part of the
CLUSTER_URL variable pre-loaded into this notebook. From the master UI, you can find the link to your application's UI.
This Page lists all of the operations you can do on a Spark RDD. Spark also has a Scala API (Scala is a programming language similar to Java); the documentation for the Scala functions is sometimes more helpful, and the Python functions work in the same way.
If you get an error that looks like:
org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: java.lang.OutOfMemoryError: Java heap space, it probably means that you've tried to collect too much data on the machine where Python is running. This is likely to happen if you do
collect() on a large dataset. To remedy this problem, you'll need to restart your iPython notebook (go to the main server, at port 8888 of the machine you were assigned, click "Shutdown" on your notebook, and then open it again) and don't do
collect() on a large dataset.
Curious why you're getting a Java error when your program is written in Python? Spark is mostly written in Java (and Scala, a language built on top of Java). We're using
pyspark here, which uses a translation layer to translate between Python and Java. Your Python
SparkContext object is backed by a Java
SparkContext object; all operations you run on Spark datasets are passsed through this Java object. So, if you try to collect a result that's too large, the Java Virtual Machine that's running the Java
SparkContext runs out of memory.
Spark is mostly written in Scala and Java, and the Python version of the code ("pyspark") hooks into the Java implementation in a way that can make error messages very difficult to understand. If you get a hard-to-understand error when you run a Spark operation, we recommend first narrowing down the error so that you know exactly which operation caused the error. For example, if
rdd.groupByKey().map(lambda x: x) fails with an error, separate the
map() calls onto separate lines so you know which one is causing the error. Next, double check the function signature to make sure you're passing the right arguments. Pyspark can fail with a weird error if a RDD operation is given the wrong number or type of arguments. If you're still stumped, try using
take(10) to print out the first 10 entries in the dataset you're calling the RDD operation on. Make sure the function you're calling and the arguments you're passing in make sense given the format of the input dataset.
Are you sure? Some of the Spark operations will take a minute or so to run; look at the top of the iPython notebook to see if it says "Kernel busy". If so, it's busy running your code. Go checkout the Spark UI to see more about what's going on.
Yes. In our solution code, none of the Spark stages take more than about a minute to run (remember that you can see a job's stages by looking at the Spark UI). If you ran something and it's taking forever, double check that you passed in the datasets you inteded to. One common problem from part 1 was that people passed RDDs in the wrong format (e.g., with 3 items instead of just a key and a value) to
take() to look at the datasets passed into the offending function to make sure they're in the right format.
If you end up with a job that's taking forever, you'll need to kill the job; otherwise it will hog all of your cluster resources such that you won't be able to run anything else. Kill the job by shutting down this notebook (which will destroy your
SparkContext and the associated worker process that's doing work) and then re-launching it. You can shutdown your notebook by going to "File" > "Close and Halt" (but be sure to hit save first!).
The model will only provide predictions for users who were included in the training set. Make sure you included your own data when you trained the model.
The ALS algorithm has no notion of bounds on expected ratings, so you may get predicted ratings that are larger than 5. This does not necessarily signify a bug in your code.