Next-Word Predictor Development via Amazon ElasticMapReduce (EMR)

This script takes as input a pointer to a S3 bucket containing a set of text files, and outputs a set of JSON files, each of which contains a set of prior strings (e.g. 1, 2, 3 or 4 prior words separated by spaces) as keys, pointing to arrays of the most common occurances for the next word in the sequence. The JSON files can be used in next-word prediction applications, as demonstrated in this blog post.

Amazon ElasticMapReduce (EMR) is used to manage the Hadoop cluster for the calculations. Two MapReduce steps are carried out in sequence:

  1. The input text is mapped to a series of N-grams (filtered by a vocabulary list such that tokens (words) not on the list are overwritten with the value "##unkn##") and then these N-grams are aggregated by frequency
  2. The aggregated N-grams are then filtered and summarized such that each (N-1)-gram is coupled with an array of the top X most frequent following words

These two steps are run for each value of N considered. (In the configuration below, N = [2,3,4,5] .)

The following are the main configuration parameters for this script. Note that variables 'vocabSize' and 'occCutoff' are parameters that can be tuned to trade off model size and performance.

Also note that AWS credentials are required, as described below, along with a previously-computed list of 1-grams, sorted by frequency, that is used to produce the vocabulary list. (Note that the list of 1-grams can be computed by using this script with the same text input folder, the variable 'Nlist' set to [1], and only using the phase_1 mapper and reducer.)

In [1]:
inputfoldername = 's3://wordpredictor1/input/'

bucketname = 'wordpredictor2'

vocabSize = 1000
occCutoff = 10
numKeep = 5

Nlist = [2,3,4,5]

masterType = "m3.xlarge"
workerType = "m3.xlarge"
numWorkers = 2

maxTime = 5 * 60 * 60
checkTime = 30

Libraries, Keys

Libraries: boto is used for AWS interaction, Paramiko for ssh, and Pandas is only used here for nice table outputs in IPython (could thus be cut out if desired).

In [2]:
import sys, os, time
import ast, json
import pandas as pd
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from boto.emr.connection import EmrConnection
from boto.emr.instance_group import InstanceGroup
from boto.emr.step import StreamingStep

Access keys and passwords: in external files, formatted as lines of "[name] = [value]", as per the AWS rootkey.csv download.

In [3]:
AWSAccessKeyId, AWSSecretKey = ( line.strip().split('=')[1] for line in open('/Users/brian/rootkey.csv','r') )
sshKeyName, instancePass, mysqlPass, myIP = ( line.strip().split('=')[1] for line in open('/Users/brian/passwords.csv','r') )

Vocabulary List

We will pull our vocabulary list from the 'vocabSize' number of most frequent words in a previously-computed list of 1-grams.

In [4]:
vocabList_df = pd.read_table("ngrams1.tsv", nrows=vocabSize, names=["word","occ"])
vocabList_df.head()
Out[4]:
word occ
0 ##s## 9299843
1 ##es## 9299843
2 the 4751890
3 to 2753081
4 and 2411141
In [5]:
vocabList = set(vocabList_df.word.tolist())

MapReduce Scripts

For each N (ie. for each N-gram level), two MapReduce steps are required, hereafter referred to as 'phase_1' and 'phase_2'. The mapper and reducer python scripts (Hadoop streaming is used to allow python) are copied below - they should be saved as files in the working directory, from which they will be copied (with modifications as necessary) to S3 to be used by EMR.

For local testing, note that you can test a mapper outside of Hadoop with the following:

head -50 text.sample.txt > testfile
cat testfile | ./phase1_mapper2.py

Or test both a mapper and a reducer:

cat testfile | ./phase1_mapper2.py | sort | ./reducer.py

phase1_mapper_template.py

#!/usr/bin/python
import sys, re

n = $n$

vocabSet = $vocabList$

for line in sys.stdin:

    # tokenize string
    s = re.sub("[.!?;]+", " ##s## ", line)
    s = "##s## " + s
    s = re.sub("##s##[\s]+##s##", "", s)
    regex = "[^\s,:=<>/\\)\\(\"]+"
    tokens = re.findall(regex, s.lower())

    # replace non-vocab tokens with "##unkn##"
    for i,t in enumerate(tokens):
        if not t in vocabSet:
            tokens[i] = "##unkn##"

    # find n-grams
    ngrams = []
    for i in range(n-1,len(tokens)):
        new = ""
        for j in range(n-1,-1,-1):
            new += tokens[i-j] + " "
        ngrams.append(new.strip())

    # output n-grams
    for l in ngrams:
        print "{0}\t{1}".format(str(l).strip(), 1)

phase1_reducer.py

#!/usr/bin/python
import sys

oldKey = None
s = 0

for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 2:
        continue

    thisKey, thisVal = data_mapped

    if oldKey and oldKey != thisKey:
        print str(oldKey).strip(), "\t", str(s)
        oldKey = thisKey
        s = 0

    oldKey = thisKey
    s += int(thisVal)

if oldKey != None:
    print str(oldKey).strip(), "\t", str(s)

phase2_mapper_template.py

#!/usr/bin/python
import sys, re

n = $n$
occCutoff = $occCutoff$

for line in sys.stdin:

    l = line.strip().split()
    if len(l) != n + 1:
        continue

    # filter lines to only include those with more than occCutoff occurances
    if int(l[n]) > occCutoff and not "#" in l[n-1]:

        # format output to be tab delimited as : preface - word - occurances
        preface = l[0]
        for i in range(1,n-1):
            preface += " " + l[i]
        print preface + "\t" + l[n-1] + "\t" + l[n]

phase2_reducer_template.py

#!/usr/bin/python
import sys

numKeep = $numKeep$

oldKey = None
topX = []
sum = 0

for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 3:
        continue

    thisKey, word, occ = data_mapped

    if oldKey and oldKey != thisKey:
        print str(oldKey).strip() + "\t" + str(sum) + "\t" + str(topX)
        topX = []
        sum = 0

    topX.append((word,int(occ)))
    if len(topX) > numKeep:
        topX = sorted(topX,key=lambda x: x[1],reverse=True)
        o = topX.pop()

    oldKey = thisKey
    sum += int(occ)

if oldKey != None:
    print str(oldKey).strip() + "\t" + str(sum) + "\t" + str(topX)
In [6]:
phase1_mapper_template_file = 'phase1_mapper_template.py'
phase1_reducer_file = 'phase1_reducer.py'
phase2_mapper_template_file = 'phase2_mapper_template.py'
phase2_reducer_template_file = 'phase2_reducer_template.py'

Setting up S3 Folders and Files

In this section, we set up the necessary files and folders on S3 that will be referenced and used by EMR in the next section. (General notes on the use of boto for S3 can be found here.)

In [7]:
s3_conn = S3Connection(AWSAccessKeyId, AWSSecretKey)
bucket = s3_conn.get_bucket(bucketname)
In [8]:
# add mappers directory to local if it does not exist
if not os.path.exists('mappers'):
    os.makedirs('mappers')

# write a separate phase1 mapper function for each value of n
for n in Nlist:
    mf = "mappers/phase1_mapper" + str(n) + ".py"
    with open(mf,'w') as mff:
        with open(phase1_mapper_template_file,'r') as mtf:
            mff.write(mtf.read().replace("$n$", str(n)).replace("$vocabList$", str(vocabList)))
    k = bucket.new_key("phase1_mapper" + str(n) + ".py")
    o = k.set_contents_from_filename(mf)
In [9]:
k = bucket.new_key('phase1_reducer.py')
o = k.set_contents_from_filename(phase1_reducer_file)
In [10]:
# write a separate phase2 mapper function for each value of n
for n in Nlist:
    mf = "mappers/phase2_mapper" + str(n) + ".py"
    with open(mf,'w') as mff:
        with open(phase2_mapper_template_file,'r') as mtf:
            mff.write(mtf.read().replace("$n$", str(n)).replace("$occCutoff$", str(occCutoff)))
    k = bucket.new_key("phase2_mapper" + str(n) + ".py")
    o = k.set_contents_from_filename(mf)
In [11]:
with open("phase2_reducer.py",'w') as mff:
    with open(phase2_reducer_template_file,'r') as mtf:
        mff.write(mtf.read().replace("$numKeep$", str(numKeep)))
k = bucket.new_key("phase2_reducer.py")
o = k.set_contents_from_filename("phase2_reducer.py")
In [12]:
k = bucket.new_key("phase1_output/")
o = k.set_contents_from_string('')
k = bucket.new_key("phase2_output/")
o = k.set_contents_from_string('')

Connecting to EMR, Configuring Jobs, Launching Cluster

This section configures and launches the computations on EMR. (See notes on using boto to configure jobs and connecting to EMR and launching jobs.)

In [13]:
emr_conn = EmrConnection(AWSAccessKeyId, AWSSecretKey)
In [14]:
instance_groups = []
instance_groups.append(InstanceGroup(
                num_instances = 1,
                role = "MASTER",
                type = masterType,
                market = "ON_DEMAND",
                name = "Main node"))
instance_groups.append(InstanceGroup(
                num_instances = numWorkers,
                role = "CORE",
                type = workerType,
                market = "ON_DEMAND",
                name = "Worker nodes"))
In [15]:
steps = []
for n in Nlist:
    steps.append( StreamingStep(
                    name = "phase1_" + str(n),
                    mapper = "s3://" + bucketname + "/phase1_mapper" + str(n) + ".py",
                    combiner = "s3://" + bucketname + "/phase1_reducer.py",
                    reducer = "s3://" + bucketname + "/phase1_reducer.py",
                    input = inputfoldername,
                    output = "s3://" + bucketname + "/phase1_output/n" + str(n) + "/") )
    steps.append( StreamingStep(
                    name = "phase2_" + str(n),
                    mapper = "s3://" + bucketname + "/phase2_mapper" + str(n) + ".py",
                    reducer = "s3://" + bucketname + "/phase2_reducer.py",
                    input = "s3://" + bucketname + "/phase1_output/n" + str(n) + "/",
                    output = "s3://" + bucketname + "/phase2_output/n" + str(n) + "/") )
In [16]:
cluster_id = emr_conn.run_jobflow(
                name = "ngramcalc",
                instance_groups = instance_groups,
                log_uri = "s3://" + bucketname + "/logs/",
                steps = steps,
                ec2_keyname = sshKeyName,
                ami_version = "latest")

Periodically Checking for Job Completion

The following periodically checks the status of the EMR job and waits for completion or failure before moving on. Note that with the current configuration of this script, the EMR job requires about 39 minutes.

In [17]:
count = 0
current_state = ""
while count < maxTime:
    time.sleep(checkTime)
    job_desc = emr_conn.describe_jobflow(cluster_id)
    if job_desc.state != current_state:
        current_state = job_desc.state
        print current_state
    if current_state == 'COMPLETED' or current_state == 'FAILED':
        count = maxTime
    else:
        count += checkTime
STARTING
BOOTSTRAPPING
RUNNING
SHUTTING_DOWN
COMPLETED

Reading Results from S3

The results are then downloaded from S3.

In [18]:
# make output directory if not already there
if not os.path.exists('output'):
    os.makedirs('output')

# download all of the results to that directory
for n in Nlist:
    outfilelist = bucket.list("phase2_output/n" + str(n) + "/")
    for key in outfilelist:
        key.get_contents_to_filename("output/" + key.name.replace("/","."))

And combined to produce a single file for each Ngram level.

In [19]:
filenames = os.listdir('output')
for n in Nlist:
    catlist = "cat"
    for f in filenames:
        if ("output.n" + str(n) + ".part") in f:
            catlist += " output/" + f
    catlist += " > ngrams" + str(n) + ".tsv"
    os.system(catlist)
In [20]:
pd.read_table("ngrams" + str(Nlist[0]) + ".tsv", nrows=10, names=["preface","sum","output"])
Out[20]:
preface sum output
0 ' 5612 [('i', 656), ('and', 633), ('he', 464), ('said...
1 -- 46879 [('and', 4705), ('the', 4027), ('a', 3080), ('...
2 000 20160 [('in', 2426), ('to', 1862), ('people', 1619),...
3 16 5032 [('years', 464), ('and', 431), ('percent', 278...
4 2010 6638 [('and', 1028), ('the', 512), ('to', 334), ('w...
5 30 21261 [('p', 4837), ('a', 2476), ('minutes', 2093), ...
6 5 26158 [('million', 2562), ('percent', 2303), ('minut...
7 according 24553 [('a', 13), ('to', 24479), ('the', 61)]
8 added 10294 [('to', 1834), ('a', 1600), ('that', 1352), ('...
9 alone 6199 [('in', 996), ('and', 620), ('with', 370), ('i...

Exporting to JSON

The results are then output to JSON for use in this blog post or in similar applications.

In [23]:
for n in Nlist:
    outobj = {}
    with open("ngrams" + str(n) + ".tsv","r") as f:
        for line in f:
            outobj[line.split("\t")[0]] = [ast.literal_eval(line.split("\t")[2].strip()), int(line.split("\t")[1].strip())]
    with open("ngrams" + str(n) + ".json","w") as f:
        f.write(json.dumps(outobj))