inputfoldername = 's3://wordpredictor1/input/' bucketname = 'wordpredictor2' vocabSize = 1000 occCutoff = 10 numKeep = 5 Nlist = [2,3,4,5] masterType = "m3.xlarge" workerType = "m3.xlarge" numWorkers = 2 maxTime = 5 * 60 * 60 checkTime = 30 import sys, os, time import ast, json import pandas as pd from boto.s3.connection import S3Connection from boto.s3.key import Key from boto.emr.connection import EmrConnection from boto.emr.instance_group import InstanceGroup from boto.emr.step import StreamingStep AWSAccessKeyId, AWSSecretKey = ( line.strip().split('=')[1] for line in open('/Users/brian/rootkey.csv','r') ) sshKeyName, instancePass, mysqlPass, myIP = ( line.strip().split('=')[1] for line in open('/Users/brian/passwords.csv','r') ) vocabList_df = pd.read_table("ngrams1.tsv", nrows=vocabSize, names=["word","occ"]) vocabList_df.head() vocabList = set(vocabList_df.word.tolist()) phase1_mapper_template_file = 'phase1_mapper_template.py' phase1_reducer_file = 'phase1_reducer.py' phase2_mapper_template_file = 'phase2_mapper_template.py' phase2_reducer_template_file = 'phase2_reducer_template.py' s3_conn = S3Connection(AWSAccessKeyId, AWSSecretKey) bucket = s3_conn.get_bucket(bucketname) # add mappers directory to local if it does not exist if not os.path.exists('mappers'): os.makedirs('mappers') # write a separate phase1 mapper function for each value of n for n in Nlist: mf = "mappers/phase1_mapper" + str(n) + ".py" with open(mf,'w') as mff: with open(phase1_mapper_template_file,'r') as mtf: mff.write(mtf.read().replace("$n$", str(n)).replace("$vocabList$", str(vocabList))) k = bucket.new_key("phase1_mapper" + str(n) + ".py") o = k.set_contents_from_filename(mf) k = bucket.new_key('phase1_reducer.py') o = k.set_contents_from_filename(phase1_reducer_file) # write a separate phase2 mapper function for each value of n for n in Nlist: mf = "mappers/phase2_mapper" + str(n) + ".py" with open(mf,'w') as mff: with open(phase2_mapper_template_file,'r') as mtf: mff.write(mtf.read().replace("$n$", str(n)).replace("$occCutoff$", str(occCutoff))) k = bucket.new_key("phase2_mapper" + str(n) + ".py") o = k.set_contents_from_filename(mf) with open("phase2_reducer.py",'w') as mff: with open(phase2_reducer_template_file,'r') as mtf: mff.write(mtf.read().replace("$numKeep$", str(numKeep))) k = bucket.new_key("phase2_reducer.py") o = k.set_contents_from_filename("phase2_reducer.py") k = bucket.new_key("phase1_output/") o = k.set_contents_from_string('') k = bucket.new_key("phase2_output/") o = k.set_contents_from_string('') emr_conn = EmrConnection(AWSAccessKeyId, AWSSecretKey) instance_groups = [] instance_groups.append(InstanceGroup( num_instances = 1, role = "MASTER", type = masterType, market = "ON_DEMAND", name = "Main node")) instance_groups.append(InstanceGroup( num_instances = numWorkers, role = "CORE", type = workerType, market = "ON_DEMAND", name = "Worker nodes")) steps = [] for n in Nlist: steps.append( StreamingStep( name = "phase1_" + str(n), mapper = "s3://" + bucketname + "/phase1_mapper" + str(n) + ".py", combiner = "s3://" + bucketname + "/phase1_reducer.py", reducer = "s3://" + bucketname + "/phase1_reducer.py", input = inputfoldername, output = "s3://" + bucketname + "/phase1_output/n" + str(n) + "/") ) steps.append( StreamingStep( name = "phase2_" + str(n), mapper = "s3://" + bucketname + "/phase2_mapper" + str(n) + ".py", reducer = "s3://" + bucketname + "/phase2_reducer.py", input = "s3://" + bucketname + "/phase1_output/n" + str(n) + "/", output = "s3://" + bucketname + "/phase2_output/n" + str(n) + "/") ) cluster_id = emr_conn.run_jobflow( name = "ngramcalc", instance_groups = instance_groups, log_uri = "s3://" + bucketname + "/logs/", steps = steps, ec2_keyname = sshKeyName, ami_version = "latest") count = 0 current_state = "" while count < maxTime: time.sleep(checkTime) job_desc = emr_conn.describe_jobflow(cluster_id) if job_desc.state != current_state: current_state = job_desc.state print current_state if current_state == 'COMPLETED' or current_state == 'FAILED': count = maxTime else: count += checkTime # make output directory if not already there if not os.path.exists('output'): os.makedirs('output') # download all of the results to that directory for n in Nlist: outfilelist = bucket.list("phase2_output/n" + str(n) + "/") for key in outfilelist: key.get_contents_to_filename("output/" + key.name.replace("/",".")) filenames = os.listdir('output') for n in Nlist: catlist = "cat" for f in filenames: if ("output.n" + str(n) + ".part") in f: catlist += " output/" + f catlist += " > ngrams" + str(n) + ".tsv" os.system(catlist) pd.read_table("ngrams" + str(Nlist[0]) + ".tsv", nrows=10, names=["preface","sum","output"]) for n in Nlist: outobj = {} with open("ngrams" + str(n) + ".tsv","r") as f: for line in f: outobj[line.split("\t")[0]] = [ast.literal_eval(line.split("\t")[2].strip()), int(line.split("\t")[1].strip())] with open("ngrams" + str(n) + ".json","w") as f: f.write(json.dumps(outobj))