# We've setup the notebook so that the hostname of the master is saved # as CLUSTER_URL. master_ui_address = "".join(CLUSTER_URL.split("//")[1].split(":")[0]) print "Master UI located at %s:8080" % master_ui_address application_ui_address = "http://" + sc.appName + ":4040" print "Application UI located at %s" % application_ui_address # The first argument to textFile is a path to the data in HDFS. # The second argument specifies how many pieces to break the file # into; we'll talk more about this later in the tutorial. raw_ratings = sc.textFile("/movielens/large/ratings.dat", 10) # Give our RDD a name so it's easily identifiable in the UI. raw_ratings.setName("raw ratings") raw_ratings.cache() entries = raw_ratings.count() print "%s entries in ratings" % entries # Look at the first 10 items in the dataset. raw_ratings.take(10) raw_ratings.count() def get_tuple(entry): items = entry.split("::") return int(items[0]), int(items[1]), float(items[2]), int(items[3]) ratings = raw_ratings.map(get_tuple) # Set the name of the new RDD, like we did before, so that it's easily # identifiable in the UI. ratings.setName("ratings") ratings.take(10) # Cache ratings in memory and call count() to force Spark to bring it into memory. ratings.cache() ratings.count() # Remove raw_ratings from memory, since we don't need it anymore. raw_ratings.unpersist() # First, create a new RDD make up of the entries of new_ratings # that had a rating of at least 4. count = ratings.filter(lambda x: x[2] >= 4).count() print "%s entries have ratings of at least 4" % count ### YOUR CODE HERE ### YOUR CODE HERE ### YOUR CODE HERE raw_ratings_fewer_partitions = # Read in the ratings data and split it into 5 partitions # Count the entries in the new dataset. def add_counts(entry): rating = entry[0] counts = entry[1] return (rating, sum(counts)) rating_counts = ratings.map(lambda x: (x[2], 1)) aggregated_counts_rdd = rating_counts.groupByKey().map(add_counts) print aggregated_counts_rdd aggregated_counts_list = aggregated_counts_rdd.collect() print aggregated_counts_list import matplotlib.pyplot as plot # Magic command to make matplotlib and ipython play nicely together. %matplotlib inline width = 0.3 rating_values = [x[0] - width / 2 for x in aggregated_counts_list] counts = [x[1] for x in aggregated_counts_list] # The bar() function takes 2 lists: one list of x-coordinates of the left # side of each bar, and one list of bar heights. plot.bar(rating_values, counts, width) ### YOUR CODE HERE count = # Count all of the entries in the movies dataset print "Number of movies: ", count def plot_bars(genre_counts): """ genre_counts should be a list of (genre_name, count) pairs. """ x_coords = range(len(genre_counts)) genre_names = [x[0] for x in genre_counts] counts = [x[1] for x in genre_counts] width = 0.8 plot.bar(x_coords, counts, width) plot.xlabel("Genre") plot.ylabel("Number of Movies") plot.xticks([x + width/2.0 for x in x_coords], genre_names, rotation='vertical') ### YOUR CODE HERE # The function you give to reduceByKey should take two values and produce # a new value. Note that the datatype of the two input values and the output # value need to be the same. def add_two_counts(count1, count2): return count1 + count2 rating_counts = ratings.map(lambda x: (x[2], 1)) aggregated_counts_rdd = rating_counts.reduceByKey(add_two_counts) print aggregated_counts_rdd.collect() ### YOUR CODE HERE ratings_total = ratings.map(lambda x: x[2]).reduce(lambda x, y: x + y) average_rating = ratings_total * 1.0 / ratings.count() print "Average rating:", average_rating ### YOUR CODE HERE # YOUR CODE HERE # YOUR CODE HERE: Create a dataset of (movieID, number of ratings) pairs. ratings_per_movie = # YOUR CODE HERE # YOUR CODE HERE: join average_ratings with movies to get a dataset with movie names and average ratings. ### YOUR CODE HERE. ### YOUR CODE HERE lines_in_book = [ "I am Sam", "I am Sam", "Sam I am", "Do you like", "green eggs and ham?"] # sc.parallelize turns a Python list into an RDD. lines_in_book_rdd = sc.parallelize(lines_in_book) # Notice that here, the function passed to flat map will return a list. words_rdd = lines_in_book_rdd.flatMap(lambda x: x.split(" ")) print words_rdd.collect() list_of_words_rdd = lines_in_book_rdd.map(lambda x: x.split(" ")) print list_of_words_rdd.collect()