Lab 8: MapReduce, mrjob, and EC2

In this week's lab, we will mostly ignore statistics and instead focus on some practical issues that you will encouter on Homework 4. Section 4 of that homework includes new python techniques (classes, inheritance), an unfamiliar approach to breaking up large computing problems (MapReduce), code that has to be run outside the friendly confines of an ipython notebook, and then you are asked to put it all to use on Amazon's Elastic Compute Cloud (EC2). This sounds very complicated, but the end result is a simpler algorithm for that problem of calculating similarity scores, as well as the ability to expand to arbitrarily large data sets.

1. Classes and generators in python

On previous homeworks, nearly all of the coding has been done by writing python functions plus a small amount of code that calls the functions you have written. Included below is the code for the mrjob word_count example that was covered in lecture (the canonical MapReduce example). There are a lot of new features here!

Below is the code for a simple MapReduce algorithm to count the number of words in a text file. This is one of the simplest examples of a problem that can be solved using MapReduce (I even took it from the Section "Writing your first job" in the mrjob documentation). If you try to run the cell in this notebook, it will not work! We will get to running programs with mrjob soon, but for now it will just serve as reference for some topics we want to cover.

In [ ]:
from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    MRWordFrequencyCount.run()

1.1 Classes

Classes are the basis of object-oriented programming in python. For all of the problems on previous homework assignments, we have written functions to do calculations, draw figures, etc. To use mrjob, we have to switch gears and use a different style of programming.

As you can see in the example above, the MRWordFrequencyCount class is defined with an indented block similar to a function definition, except with class instead of def. Instead of a list of arguments, the item in parentheses (MRJob) is a base class that our newly defined class will inherit most of its features from. Even though there is very little code written above for MRWordFrequencyCount, it knows how to do many complex operations (running a mapper and a reducer, submitting jobs to EC2, etc.) because it inherited these abilities from the base class.

There are two methods, mapper and reducer, that have been written specifically for MRWordFrequencyCount. These methods are also defined for the MRJob base class, but the methods defined here supercede the inherted ones. A class method is similar to a function (as you might guess, since it is also defined with a def statement), but the first argument to a class method will always be self, a reference back to the object to which the method belongs. The always-present self argument allows the method to access other members of the same object (both data and methods). However, when you actually call a class method, you don't have to supply anything for the self argument -- it is implicit. For example, to call the reducer method defined above, you would use:

In [ ]:
# Call reducer method of MRWordFrequencyCount object using some key and values.
MRWordFrequencyCount.reducer(my_key, my_values) # Did not specify 'self' argument

The next mrjob example -- Writing your second job -- processes text to find the most commonly used word. That algorithm involves two MapReduce steps, so it is necessary to write a MRMostUsedWord.steps method to override the inherited method. Notice that the self is used repeatedly to specify the function references inside the list returned by the steps method.

In [ ]:
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)

    def steps(self):
        return [
            self.mr(mapper=self.mapper_get_words,
                    combiner=self.combiner_count_words,
                    reducer=self.reducer_count_words),
            self.mr(reducer=self.reducer_find_max_word)
        ]


if __name__ == '__main__':
    MRMostUsedWord.run()

1.2 Generators

Generators are necessary to understand all of those yield statements popping up in the mapper and reducer methods. The main issue, in the case of industrial-strength MapReduce, is that you don't have enough memory to store all of your data at once. This is true even after you have split your data between many compute nodes. So instead of getting an enormous list of data, the mapper and reducer functions both receive and emit generators.

When you run a function, it chugs along until it hits a return statement, at which point it returns some results and then it is done. A generator does its specified calculations until it hits a yield statement. It passes along whatever values it was supposed to yield and then it pauses and waits for someone to tell it to continue. It continues until it reaches another yield, and so on.

Not only are mapper and reducer generators, their (key, value) inputs are also generators. This means that for each step of the mapper, it pulls in one (key, value) pair, does some processing, and then emits one or more key value pairs, which move along to a combiner or a shuffler or whatever. This is how MapReduce avoids ever having to load huge datasets into limited memory.

A common stumbling block with generators is the fact that once you have iterated through an entire generator, it is done. You can see an example of this mistake by trying to run the code block below.

In [ ]:
# This function converts a list into a generator.
def example_generator(list):
    for item in list:
        yield item
        
# Create a generator.
my_generator = example_generator([0, 1, 2, 3, 4])

# Iterating over the generator works great the first time.
print "generator iteration 1"
print "---------------------"
for value in my_generator:
    print value
    
# ...but it doesn't work the second time.
print "\n"
print "generator iteration 2"
print "---------------------"
for value in my_generator:
    print value

1.3 What does \_\_name\_\_ == '\_\_main\_\_'</span> mean??

Python is really into namespaces (see, for example, The Zen of Python). The \_\_name\_\_ keyword tells you what namespace it is in. For example, if we import numpy</span>, then all of the numpy features are in the numpy namespace.

In [ ]:
import numpy as np
print np.__name__

import matplotlib.pyplot as plt
print plt.__name__

If you try to import the above file containing the definition for MRMostUsedWord, then python will interpret the file all the way down until it hits that last if statement. \_\_name\_\_ will evaluate to MRMostUsedWord (or whatever the name was of the file we imported) and the line inside the if statement will be ignored. On the other hand, if you run this code from the command line, python will interpret it without importing it and \_\_name\_\_ will be the python top level namespace, which is '\_\_main\_\_', so MRMostUsedWord.run() gets called.

In (many) fewer words: it tells you to run the job only when invoked from the command line.

Try copying the code for MRMostUsedWord to a file, named MRMostUsedWord.py, and then running it on any old text file you might have lying around. The invokation will be somthing like this (modify based on your particular python installation):

In [ ]:
python MRMostUsedWord.py some_file.txt > most_used_word.out

2. Setting up your Amazon Web Services account

There is quite a bit of overhead involved in setting up an AWS account and keeping an eye on the jobs that you end up running. In lab, we will run through an example account activation including:

  • Account creation
  • Signing up for Elastic MapReduce
  • Storing security credentials in your mrjob.conf file
  • Redeeming account credits
  • Billing alerts
  • Checking on running jobs using the console

These documents (also linked from HW4) are very useful: Instructions for Amazon Setup notebook, Elastic MapReduce Quickstart

Once you have this all set up and working, then mrjob makes it very easy to run a MapReduce job with EMR. Using the same MRMostUsedWord example as above, the command line invokation to run with EMR is:

In [ ]:
python MRMostUsedWord.py -r emr some_file.txt > most_used_word.out

3. MapReduce exercises

MapReduce schematic
[Image from [https://developers.google.com/appengine/docs/python/dataprocessing/](https://developers.google.com/appengine/docs/python/dataprocessing/)]

Below are two practice problems to get the hang of writing MapReduce algorithms. Remember, you will be writing these programs in separate files that you run from the command line. You are welcome to try out EC2, but these are small datasets and it will generally be much faster to run locally.

3.1 Anagram finder

First, grab the file word_list.txt. This contains a list of six-letter words that I dumped from my spellchecker. To keep things simple, all of the words consist of lower-case letters only.

In [ ]:
word_list = [word.strip() for word in open("word_list.txt").readlines()]
print "{0} words in list".format(len(word_list))
print "First ten words: {0}".format(", ".join(word_list[0:10]))

Use mrjob to write a class that finds all anagrams in word_list.txt.

UPDATE: My solution to exercise 3.1

3.2 Friends don't let friends root for the Cardinals

Cardinals v. Red Sox

For the next problem, download the file baseball_friends.csv. Each row of this csv file contains the following:

  • A person's name
  • The team that person is rooting for -- either "Cardinals" or "Red Sox"
  • A list of that person's friends, which could have arbitrary length

Let's take a look at one line:

In [ ]:
friends = open("baseball_friends.csv").readlines()
print friends[0].strip()
print len(friends[0].split(",")) - 2

This line tells us that Aaden is a Red Sox friend and he has 65 friends, who are all listed here. For this problem, it's safe to assume that all of the names are unique and that the friendship structure is symmetric (i.e. if Alannah shows up in Aaden's friends list, then Aaden will show up in Alannah's friends list).

Write an mrjob class that lists each person's name, their favorite team, the number of Red Sox fans they are friends with, and the number of Cardinals fans they are friends with.

After running that program, we can look at the results to get an idea of the absurdly simple model that I used to generate the input csv file. You might need to modify the code below if the format of your output file doesn't quite match mine.

In [ ]:
import pandas as pd
import json

# Read results.
result_file = "baseball_friends.out"
result = [[json.loads(field) for field in line.strip().split('\t')] for line in open(result_file)]

# Break out columns.
names = [x[0] for x in result]
teams = [x[1][0] for x in result]
redsox_count = [x[1][1] for x in result]
cardinals_count = [x[1][2] for x in result]

# Combine in data frame.
result = pd.DataFrame(index=names, data={'teams': teams, 'redsox_count': redsox_count, 
                                         'cardinals_count': cardinals_count})
In [ ]:
%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib import rcParams
rcParams['figure.figsize'] = (10, 6)
rcParams['font.size'] = 14

# Average number of friends by affiliation.
print result.groupby('teams').mean()

# Histogram the affiliations of people who are friends of Red Sox fans.
plt.hist(result.redsox_count[result.teams == "Red Sox"], label="Red Sox friend Red Sox")
plt.hist(result.cardinals_count[result.teams == "Red Sox"], label="Red Sox friend Cardinals")
plt.xlabel('number of friends')
plt.ylabel('count')
plt.legend(loc=0)