from mrjob.job import MRJob import string class MRWordCounts(MRJob): def mapper(self, _, line): # yield each word in the line for word in line.split(): yield (word.strip(string.punctuation).lower(), 1) def combiner(self, word, counts): # optimization: sum the words we've seen so far yield (word, sum(counts)) def reducer(self, word, counts): # optimization: sum the words we've seen so far yield (word, sum(counts)) if __name__ == '__main__':
# Command line: python simple.txt > simple_word_counts.txt
# Command line (in Flux): python Stats_507/ -r hadoop hdfs:///var/stats507w19/darwin.txt > Stats_507/darwin_word_counts.txt
from mrjob.job import MRJob import functools class MRSummaryStats(MRJob): def mapper(self, _, line): # yield label as the key, and a tuple of values necessary for computation of mean and sample variance label, value = line.split() value = float(value) yield (label, (1, value, value**2)) def combiner(self, label, values): # calculate our sample mean and sample variance n, sumX, sumX2 = functools.reduce(lambda x,y:[x[i] + y[i] for i in range(len(y))], values) yield label, (n, sumX / n, (sumX2 / n) - (sumX / n)**2) def reducer(self, label, values): # calculate our sample mean and sample variance n, sumX, sumX2 = functools.reduce(lambda x,y:[x[i] + y[i] for i in range(len(y))], values) yield label, (n, sumX / n, (sumX2 / n) - (sumX / n)**2) if __name__ == '__main__':
# Command Line: python populations_small.txt > summary_small.txt
from pyspark import SparkConf,SparkContext import sys import itertools, functools # At first are the configuration settings so we can run it through the terminal # then we set up configuration for the PySpark job if len(sys.argv) != 3: print('Usage: ' + sys.argv[0] + ' <in> <out>') # [0] will be the .py file sys.exit(1) inputlocation = sys.argv[1] # [1] will be the input file(i.e. the txt files here) outputlocation = sys.argv[2] # [2] where we save the output of the program # Set up the configuration and job context conf = SparkConf().setAppName('Triangles') sc = SparkContext(conf = conf) # Read Files ---------------------------------------------------------------------------------------------------- data = sc.textFile(inputlocation) # Split line, convert to ints, format as (key,value) pairs where node n is the key, and its friends are the values data = line: [num for num in line.split()]).map(lambda numList: [int(num) for num in numList]).map(lambda numList: (numList[0], numList[1:])) # Get all combinations of pairs of friends for each key. (n choose 2) combs = w: [w[0], list(itertools.combinations(w[1], 2))]) # Create all possible triangles triangles = combs.flatMapValues(lambda x: x).map(lambda w: (w[0], w[1][0], w[1][1])) # Sort each tuple triangles = w: tuple(sorted(w))) # Set up to be used as dictionary and count by key/ filter out the "fake" triangles that only show up once triangles = w: (w, 0)) triangles = triangles.countByKey() triangles = list(filter(lambda x: x[1] > 1, triangles.items())) # Convert list to RDD again in order to sort output and convert data to string triangles = sc.parallelize(triangles) data_final = w: w[0]).collect() data_final = sorted(data_final) # Convert to space-separated data_final = sc.parallelize(data_final) data_final = w: w.strip('(),')) data_final = w: w.replace(',', '')) # Save Output ---------------------------------------------------------------------------------------------------- data_final.saveAsTextFile(outputlocation) sc.stop() # Let Spark know that the job is done.
# COMMAND LINE: # spark-submit --master yarn --queue stats507w19 Stats_507/ /var/stats507w19/fof/friends.simple Stats_507/small_triangle_list
# COMMAND LINE: # spark-submit --master yarn --queue stats507w19 Stats_507/ /var/stats507w19/fof/friends1000 Stats_507/big_triangle_list