CS194-16: Introduction to Data Science

Name: Please put your name

Student ID: Please put your student id

Homework 3

Part 1: Predicting Movie Ratings

One of the most common uses of data is to predict what users want. This allows Google to show you relevant ads, Amazon to recommend relevant products, and Netflix to recommend movies that you might like. In this assignment, you'll explore how to recommend movies to a user. We'll start with some basic methods, and then use machine learning to make more sophisticated predictions.

We'll use Spark for this assignment. In part 1 of the assignment, you'll run Spark on your local machine and on a smaller dataset. The purpose of this part of the assignment is to get everything working before adding the complexities of running on many machines. The interface for running local Spark jobs is exactly the same as the interface for running jobs on a cluster, so you'll be using the same functions we used in lab, and all of the code you write locally can be executed on a cluster. In part 2, which will be released after the midterm, you'll run Spark on a cluster that we have running for you (like in the lab). You'll use the cluster to run your code on a larger dataset, and to predict which movies to recommend to yourself!

As mentioned during the lab, think carefully before calling collect() on any datasets. When you're using a small, local dataset, calling collect() and then using Python to analyze the data locally will work fine, but this will not work when you're using a large dataset that doesn't fit in memory on one machine. Solutions that call collect() and do local analysis that could have been done with Spark will not receive full credit.

We have created a FAQ at the bottom of this page (which is an expanded version of the FAQ from the lab) to help with common problems you may run into. If you run into a problem, please check the FAQ before posting on Piazza!

Exercise 0: Setup

a) As mentioned above, for this part of the assignment, you'll run Spark locally rather than on a cluster, for easier debugging. Begin by downloading Spark from this link. Unzip and untar the file so you have a spark-0.9.1-bin-cdh4 folder; this folder contains all of the code needed to run Spark. We need to do a little bit of setup to tell iPython how to find Spark (we set this up for you on the cluster machines, but you need to do it yourself when running in your own VM). We also need to start your own SparkContext (which we also did for you in the lab; the SparkContext was saved as sc in the lab). The SparkContext is like a master for just your application. It requests some resources from the cluster master, and it also breaks down jobs that you submit into stages of tasks. For example, when you call map() on an resilient distributed dataset (RDD; Spark's name for datasets stored in memory), the SparkContext decides how many map tasks to run, and launches the map tasks on the executors allocated by the cluster master.

Fill in the path to the spark folder you just downloaded in the code below, and then execute it to create a SparkContext to use to run jobs.

In [ ]:
# Configure the necessary Spark environment.  pyspark needs SPARK_HOME setup
# so it knows how to start the Spark master and some local workers for you to use.
import os
# Fill this in with the path to the spark-0.9.1-bin-cdh4 folder you just downloaded
# (e.g., /home/saasbook/spark-0.9.1-bin-cdh4)
path_to_spark = # YOUR CODE HERE
os.environ['SPARK_HOME'] = path_to_spark

# Set the python path so that we know where to find the pyspark files.
import sys
path_to_pyspark = os.path.join(path_to_spark, "python")
sys.path.insert(0, path_to_pyspark)

from pyspark import SparkContext
# You can set the app name to whatever you want; this just affects what
# will show up in the UI.
app_name = "i<3datascience"
sc = SparkContext("local", app_name)

Having trouble? Checkout the section in the FAQ that covers issues creating a SparkContext.

Even though you're running Spark locally, Spark still starts the application web UI where you can see your application and what tasks it's running. In a browser in the VM, go to http://localhost:4040 to see the UI for your application. There's no Master UI running here (the UI we saw at port 8080 during the lab) because Spark doesn't use a master when you run in local mode.

b) Next, download the datafiles that you'll need for the assignment from https://github.com/amplab/datascience-sp14/raw/master/hw3/part1files.tar.gz. You'll do all of your analysis on the ratings.dat and movies.dat datasets located in the part1files folder that you just downloaded. These are smaller versions of the datasets we used in lab 8. As in the lab, each entry in the ratings dataset is formatted as UserID::MovieID::Rating::Timestamp and each entry in the movies dataset is formatted as MovieID::Title::Genres. Read these two datasets into memory. You can count the number of entries in each dataset to ensure that you've loaded them correctly; the ratings dataset should have 100K entries and the movies dataset should have 1682 entries.

Note that when you create a new dataset using sc.textFile, you can give an absolute path to the dataset on your filesystem, e.g. `/Users/kay/part1files/ratings.dat'.

In [ ]:
### YOUR CODE HERE

ratings_count = # YOUR CODE HERE
movies_count = # YOUR CODE HERE
print "%s ratings and %s movies in the datasets" % (ratings_count, movies_count)

Exercise 1: Basic Recommendations

a) One way to recommend movies is to always recommend the movies with the highest average rating. Use Spark to find the name and the average rating of the 5 movies with the highest average rating.

In [ ]:
### YOUR CODE HERE

highest_rated_movies = # YOUR CODE HERE
print "5 highest rated movies (and associated average ratings): ", highest_rated_movies

b) The movies you found may seem a bit suspicious. How many ratings does each of those movies have?

In [ ]:
### YOUR CODE HERE

highest_rated_movies_rating_counts = # YOUR CODE HERE
print "Number of ratings for each of the 5 highest rated movies: ", highest_rated_movies_rating_counts

c) How can you improve your recommendations? Improve upon your recommendations in part (a) to recommend 5 movies that you expect to be well-liked. You are not expected to use any sophisticated machine learning techniques here; using just the Spark operations we learned in lab is sufficient.

In [ ]:
### YOUR CODE HERE

recommended_movies = # YOUR CODE HERE.
print "5 recommended movies are: ", recommended_movies

Describe how you improved on the recommendations in part (a) in at most 4 sentences. Your answer here

Exercise 2: Collaborative Filtering

You've learned about many of the basic transformations and actions that Spark allows us to do on distributed datasets. Spark also exposes some higher level functionality; in particular, machine learning using a component of Spark called MLlib. In this assignment, you'll use MLlib to make personalized movie recommendations for you using the movie data we've been analyzing.

We're going to use a technique called collaborative filtering. The basic idea of collaborative filtering is that we start with a matrix whose entries are movie ratings. Each row represents a user and each column represents a particular movie (shown in red in the diagram below).

We don't know all of the entries in this matrix, which is precisely why we need collaborative filtering. For each user, we have ratings for only a subset of the movies. With collaborative filtering, the idea is to approximate the ratings matrix by factorizing it as the product of two matrices: one that describes properties of each user (shown in green), and one that describes properties of each movie (shown in blue).

factorization

We want to select these two matrices such that the error for the users/movie pairs where we know the correct ratings is minimized. The ALS (alternating least squares) algorithm does this by first randomly filling the users matrix with values and then optimizing the value of the movies such that the error is minimized. Then, it holds the movies matrix constrant and optimizes the value of the user's matrix. This alternation between which matrix to optimize is the reason for the "alternating" in the name.

This optimization is what's being shown on the right in the image above. Given a fixed set of user factors (i.e., values in the users matrix), we use the known ratings to find the best values for the movie factors using the optimization written at the bottom of the figure. Then we "alternate" and pick the best user factors given fixed movie factors.

For a simple example of what the users and movies matrices might look like, check out the slides from lecture 9.

a) Before jumping into the machine learning, you need to break up the dataset into a training set (which we'll use to train models), a validation set (which we'll use to choose the best model), and a test set. One way that people often partition data is using the time stamp: using the least significant digit of the timestamp is an essentially random way to split the dataset into multiple groups. Use the least significant digit of the rating timestamp to separate 60% of the data into a training set, 20% into a validation set, and the remaining 20% into a test set.

In [ ]:
### YOUR CODE HERE
training = # YOUR CODE HERE 
validation = # YOUR CODE HERE
test = # YOUR CODE HERE

print "Training: %s, validation: %s, test: %s" % (training.count(), validation.count(), test.count())

After splitting the dataset, your training set should have about 60K entries and the validation and test sets should have about 20K entries (the exact number of entries in each dataset will vary slightly depending on the method you used to split the data into the 3 sets).

b) In the next part, you'll generate a few different models, and will need a way to decide which model is best. We'll use the root mean squared error (RMSE) to compute the error of each model. The root mean squared error is the square root of the average value of (actual rating - predicted rating)^2 for all users and movies for which we have the actual rating.

If your model perfectly predicts the user ratings, what will the root mean squared error be? Your answer here

If all of the predicted ratings are off by one (they're 1 higher or lower than the actual ratings), what will the RMSE be? Your answer here

c) Write a function to compute the sum of squared error given a predicted and actual RDD.

In [ ]:
# Hint: you may want to use the math module to compute the square root!
import math

def compute_error(predicted, actual):
    """ Compute the root mean squared error between predicted and actual.
    
    Params:
      predicted: An RDD of predicted ratings for each movie and each user where each entry is in the form (user, movie, rating).
      actual: An RDD of actual ratings where each entry is in the form (user, movie, rating).
    """
    ### YOUR CODE HERE

# sc.parallelize turns a Python list into a Spark RDD.
test_predicted = sc.parallelize([
    (1, 1, 5),
    (1, 2, 3),
    (1, 3, 4),
    (2, 1, 3),
    (2, 2, 2),
    (2, 3, 4)])
test_actual = sc.parallelize([
     (1, 2, 3),
     (1, 3, 5),
     (2, 1, 5),
     (2, 2, 1)])
# The error for the test datasets should be 1.2247
print "Error for test datasets: %s" % compute_error(test_predicted, test_actual)

d) In this part, we'll use ALS.train to train a bunch of models, and we'll select the best model. The most important parameter to ALS is the rank, which is the number of columns in the Users matrix (green in the diagram above) or the number of rows in the Movies matrix. In general, a lower rank will mean higher error on the training dataset, but a high rank may lead to overfitting. Train models with ranks of 4, 8, 12, and 16 using the training dataset, predict the ratings for the validation dataset, and use the compute_error function you wrote in part (b) to compute the error. Which rank produces the best model, based on the error on the validation dataset? Note that the values here will be more meaningful when we run on a cluster with a larger dataset.

To create the model, use ALS.train(training_rdd, rank), which takes two parameters: an RDD in the format (user, movie, rating) to use to train the model, and an integer rank. To predict rating values, call model.predictAll with the validation dataset, where model is the model generated with ALS.train. predictAll accepts an RDD with each entry in the format (user, movie) and outputs an RDD with each entry in the format (user, movie, rating).

In [ ]:
from pyspark.mllib.recommendation import ALS

best_rank = # YOUR CODE HERE

print "The best model was trained with rank %s" % best_rank

e) So far, we used the training and validation datasets to select the best model. Since we used these two datasets to determine what model is best, we can't use them to test how good the model is (otherwise we'd be vulnerable to overfitting). To decide how good our model is, we need to use the test dataset. Use the model you created in part (c) to predict the ratings for the test dataset and compute the RMSE.

In [ ]:
test_rmse = # YOUR CODE HERE

print "The model had a RMSE on the test set of %s" % test_rmse

You now have code to predict how users will rate movies! In the next part of the assignment, you'll run this code on a larger dataset using a cluster of machines, like we did in the lab. You'll use the larger dataset to generate a better model, and then will use that model to predict what movies to recommend to yourself. Until then, good luck on the midterm!

Frequently Asked Questions

I'm not sure where to start. Some parts of this homework seem impossible to do with the functions we learned about in lab!

With the exception of the ML functions that we introduce in this assignment, you should be able to complete all parts of this homework using only the functions we learned in lab (although you're welcome to use more features of Spark if you like!). You may need to use the functions introduced at the end of the lab (which we didn't get to in class); if you're having trouble, try walking through the last sections of the lab to see if they help.

How do I get to the UI?

The UI for your application is on port 4040 on your local machine (so go to http://localhost:4040). There is no master web UI when you run locally because there is no Spark master -- just a single worker process to run your tasks.

What are all of the operations I can do on a Spark dataset (RDD)?

If you click the "RDD" link on this Page, it lists all of the operations you can do on a Spark RDD. Spark also has a Scala API (Scala is a programming language similar to Java); the documentation for the Scala functions is sometimes more helpful, and the Python functions work in the same way.

How do I use matplotlib?

There are lots of good examples on the matplotlib website. For example, this page shows how to plot a single line.

Why am I getting an OutOfMemoryError?

If you get an error that looks like: org.apache.spark.SparkException: Job aborted: Exception while deserializing and fetching task: java.lang.OutOfMemoryError: Java heap space, it probably means that you've tried to collect too much data on the machine where Python is running. This is likely to happen if you do collect() on a large dataset. The best way to remedy this problem is to restart your iPython notebook (go to the main server, at port 8888 of the machine you were assigned, click "Shutdown" on your notebook, and then open it again) and don't do collect() on a large dataset.

Curious why you're getting a Java error when your program is written in Python? Spark is mostly written in Java (and Scala, a language built on top of Java). We're using pyspark here, which uses a translation layer to translate between Python and Java. Your Python SparkContext object is backed by a Java SparkContext object; all operations you run on Spark datasets are passsed through this Java object. So, if you try to collect a result that's too large, the Java Virtual Machine that's running the Java SparkContext runs out of memory.

Python / Spark is giving me a crazy weird error!

Spark is mostly written in Scala and Java, and the Python version of the code ("pyspark") hooks into the Java implementation in a way that can make error messages very difficult to understand. If you get a hard-to-understand error when you run a Spark operation, we recommend first narrowing down the error so that you know exactly which operation caused the error. For example, if rdd.groupByKey().map(lambda x: x[1]) fails with an error, separate the groupByKey() and map() calls onto separate lines so you know which one is causing the error. Next, double check the function signature to make sure you're passing the right arguments. Pyspark can fail with a weird error if a RDD operation is given the wrong number or type of arguments. If you're still stumped, try using take(10) to print out the first 10 entries in the dataset you're calling the RDD operation on. Make sure the function you're calling and the arguments you're passing in make sense given the format of the input dataset.

I ran some code and nothing happened!

Are you sure? Some of the Spark operations will take a minute or so to run; look at the top of the iPython notebook to see if it says "Kernel busy". If so, it's busy running your code. Go checkout the Spark UI to see more about what's going on.

My code is taking forever to run. Did I do something wrong?

Probably. In our solution code, none of the Spark jobs take more than a minute to run. If you ran something and it's taking forever, double check that you passed in the datasets you inteded to. If all else failed, create a small sample dataset and try your code on that to make sure things are working.

If you end up with a job that's taking forever, you can kill the job by shutting down this notebook (which will destroy your SparkContext and the associated worker process that's doing work) and then re-launching it. Be sure to hit save first!

I'm having trouble creating SparkContext...

I'm getting an error that says "Exception AttributeError: "'SparkContext' object has no attribute '_jsc'"".

When you try to create a SparkContext, you may get an error that ends with a red box with text that looks like: Exception AttributeError: "'SparkContext' object has no attribute '_jsc'" in <bound method SparkContext.__del__ of <pyspark.context.SparkContext object at 0x10c876890>> ignored. This is a benign error that happens when the Spark Context tries to shut down, but it signals that there was an error when creating the SparkContext. Look at the error messages above this one to see what the real problem is.

I'm getting an error that says "ImportError: No module named pyspark"

This means that you didn't give the correct path to Spark when setting the path_to_spark variable. Ensure that the path listed here matches the path to the Spark folder you downloaded. When you change to the correct path, you may need to shutdown and restart your notebook for all of the path setup to work correctly again.

I'm getting an error that says "ValueError: Cannot run multiple SparkContexts at once"

You've created multiple SparkContexts, likely by executing the code to create a new Spark Context multiple times. Either (a) use the SparkContext that you created earlier or (b) shutdown your notebook, restart it, and then re-run the relevant code only a single time.

I'm getting an error that says "ValueError: invalid literal for int() with base 10: ''"

This error typically occurs because something went wrong when trying to launch the Java backend for your SparkContext. One way this can occur is if JAVA_HOME isn't setup properly (we set this up earlier in this class, but this may be an issue if you're using a new VM). If you look in the terminal where you launched ipython notebook, there is typically more log output that will tell you what went wrong.

Can you explain more about how flatMap() works?

Let's look at an example: suppose you have an RDD where each entry lines of text in a book, and you want to make a new RDD where each entry is a single word. You could use flatMap() to do this as follows:

In [ ]:
lines_in_book = [
  "I am Sam",
  "I am Sam",
  "Sam I am",
  "Do you like",
  "green eggs and ham?"]
# sc.parallelize turns a Python list into an RDD.
lines_in_book_rdd = sc.parallelize(lines_in_book)

# Notice that here, the function passed to flat map will return a list.
words_rdd = lines_in_book_rdd.flatMap(lambda x: x.split(" "))

print words_rdd.collect()

The resulting RDD will have a list of words. The function we passed into flatMap returned a list of words for each entry in the original RDD, and flatMap combines all of these lists of words into a single list. Let's do this same thing with map to see what's different.

In [ ]:
list_of_words_rdd = lines_in_book_rdd.map(lambda x: x.split(" "))
print list_of_words_rdd.collect()

Notice that now the resulting RDD has a list of lists.

Another way to think about this is that map() always returns a new RDD with the same number of entries as the original RDD: each entry in the original RDD is mapped to one entry in the new RDD. With flatMap(), each entry in the original RDD maps to a list of 0 or more entries, so the new RDD isn't necessarily the same size as the old RDD (it might be larger or smaller).