Notebook
from mrjob.job import MRJob import string class MRWordCounts(MRJob): def mapper(self, _, line): # yield each word in the line for word in line.split(): yield (word.strip(string.punctuation).lower(), 1) def combiner(self, word, counts): # optimization: sum the words we've seen so far yield (word, sum(counts)) def reducer(self, word, counts): # optimization: sum the words we've seen so far yield (word, sum(counts)) if __name__ == '__main__': MRWordCounts.run()
# Command line: python mr_word_count.py simple.txt > simple_word_counts.txt
# Command line (in Flux): python Stats_507/mr_word_count.py -r hadoop hdfs:///var/stats507w19/darwin.txt > Stats_507/darwin_word_counts.txt
from mrjob.job import MRJob import functools class MRSummaryStats(MRJob): def mapper(self, _, line): # yield label as the key, and a tuple of values necessary for computation of mean and sample variance label, value = line.split() value = float(value) yield (label, (1, value, value**2)) def combiner(self, label, values): # calculate our sample mean and sample variance n, sumX, sumX2 = functools.reduce(lambda x,y:[x[i] + y[i] for i in range(len(y))], values) yield label, (n, sumX / n, (sumX2 / n) - (sumX / n)**2) def reducer(self, label, values): # calculate our sample mean and sample variance n, sumX, sumX2 = functools.reduce(lambda x,y:[x[i] + y[i] for i in range(len(y))], values) yield label, (n, sumX / n, (sumX2 / n) - (sumX / n)**2) if __name__ == '__main__': MRSummaryStats.run()
# Command Line: python mr_summary_stats.py populations_small.txt > summary_small.txt
# Command Line: python Stats_507/mr_summary_stats.py -r hadoop hdfs:///var/stats507w19/populations_large.txt > Stats_507/summary_large.txt[israeldi@flux-hadoop-login2 ~]$ python Stats_507/mr_summary_stats.py -r hadoop hdfs:///var/stats507w19/populations_large.txt > Stats_507/summary_large.txt Using configs in /etc/mrjob.conf Looking for hadoop binary in $PATH... Found hadoop binary: /usr/bin/hadoop Using Hadoop version 2.7.3.2.6.3.0 Looking for Hadoop streaming jar in /home/hadoop/contrib... Looking for Hadoop streaming jar in /usr/lib/hadoop-mapreduce... Found Hadoop streaming jar: /usr/lib/hadoop-mapreduce/hadoop-streaming.jar Creating temp directory /tmp/mr_summary_stats.israeldi.20190405.205834.347413 Copying local files to hdfs:///user/israeldi/tmp/mrjob/mr_summary_stats.israeldi.20190405.205834.347413/files/... Running step 1 of 1... packageJobJar: [] [/usr/hdp/2.6.3.0-235/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.3.0-235.jar] /tmp/streamjob7344297405908316377.jar tmpDir=null Connecting to ResourceManager at fladoop-rm01.arc-ts.umich.edu/10.164.5.157:8050 Connecting to Application History server at fladoop-rm01.arc-ts.umich.edu/10.164.5.157:10200 Connecting to ResourceManager at fladoop-rm01.arc-ts.umich.edu/10.164.5.157:8050 Connecting to Application History server at fladoop-rm01.arc-ts.umich.edu/10.164.5.157:10200 Created HDFS_DELEGATION_TOKEN token 139614 for israeldi on 10.164.5.158:8020 Got dt for hdfs://fladoop-nn02.arc-ts.umich.edu:8020; Kind: HDFS_DELEGATION_TOKEN, Service: 10.164.5.158:8020, Ident: (HDFS_DELEGATION_TOKEN token 139614 for israeldi) Total input paths to process : 1 Adding a new node: /default-rack/10.164.1.144:1019 Adding a new node: /default-rack/10.164.1.140:1019 Adding a new node: /default-rack/10.164.1.143:1019 Adding a new node: /default-rack/10.164.1.141:1019 Adding a new node: /default-rack/10.164.1.142:1019 Adding a new node: /default-rack/10.164.1.145:1019 number of splits:2 Submitting tokens for job: job_1547074859606_16460 Kind: HDFS_DELEGATION_TOKEN, Service: 10.164.5.158:8020, Ident: (HDFS_DELEGATION_TOKEN token 139614 for israeldi) Timeline service address: http://fladoop-rm01.arc-ts.umich.edu:8188/ws/v1/timeline/ Submitted application application_1547074859606_16460 The url to track the job: http://fladoop-rm01.arc-ts.umich.edu:8088/proxy/application_1547074859606_16460/ Running job: job_1547074859606_16460 Job job_1547074859606_16460 running in uber mode : false map 0% reduce 0% map 8% reduce 0% map 13% reduce 0% map 19% reduce 0% map 24% reduce 0% map 29% reduce 0% map 35% reduce 0% map 40% reduce 0% map 45% reduce 0% map 51% reduce 0% map 56% reduce 0% map 61% reduce 0% map 66% reduce 0% map 67% reduce 0% map 83% reduce 0% map 100% reduce 0% map 100% reduce 100% Job job_1547074859606_16460 completed successfully Output directory: hdfs:///user/israeldi/tmp/mrjob/mr_summary_stats.israeldi.20190405.205834.347413/output Counters: 49 File Input Format Counters Bytes Read=153028684 File Output Format Counters Bytes Written=1056 File System Counters FILE: Number of bytes read=2476 FILE: Number of bytes written=489934 FILE: Number of large read operations=0 FILE: Number of read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=153028946 HDFS: Number of bytes written=1056 HDFS: Number of large read operations=0 HDFS: Number of read operations=9 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Rack-local map tasks=2 Total megabyte-milliseconds taken by all map tasks=277889024 Total megabyte-milliseconds taken by all reduce tasks=22872064 Total time spent by all map tasks (ms)=135688 Total time spent by all maps in occupied slots (ms)=271376 Total time spent by all reduce tasks (ms)=5584 Total time spent by all reduces in occupied slots (ms)=22336 Total vcore-milliseconds taken by all map tasks=135688 Total vcore-milliseconds taken by all reduce tasks=5584 Map-Reduce Framework CPU time spent (ms)=150370 Combine input records=10000000 Combine output records=50 Failed Shuffles=0 GC time elapsed (ms)=731 Input split bytes=262 Map input records=10000000 Map output bytes=409729324 Map output materialized bytes=2482 Map output records=10000000 Merged Map outputs=2 Physical memory (bytes) snapshot=3372437504 Reduce input groups=25 Reduce input records=50 Reduce output records=25 Reduce shuffle bytes=2482 Shuffled Maps =2 Spilled Records=100 Total committed heap usage (bytes)=3726114816 Virtual memory (bytes) snapshot=13179109376 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 job output is in hdfs:///user/israeldi/tmp/mrjob/mr_summary_stats.israeldi.20190405.205834.347413/output Streaming final output from hdfs:///user/israeldi/tmp/mrjob/mr_summary_stats.israeldi.20190405.205834.347413/output... Removing HDFS temp directory hdfs:///user/israeldi/tmp/mrjob/mr_summary_stats.israeldi.20190405.205834.347413... Removing temp directory /tmp/mr_summary_stats.israeldi.20190405.205834.347413...
from pyspark import SparkConf,SparkContext import sys import itertools, functools # At first are the configuration settings so we can run it through the terminal # then we set up configuration for the PySpark job if len(sys.argv) != 3: print('Usage: ' + sys.argv[0] + ' <in> <out>') # [0] will be the .py file sys.exit(1) inputlocation = sys.argv[1] # [1] will be the input file(i.e. the txt files here) outputlocation = sys.argv[2] # [2] where we save the output of the program # Set up the configuration and job context conf = SparkConf().setAppName('Triangles') sc = SparkContext(conf = conf) # Read Files ---------------------------------------------------------------------------------------------------- data = sc.textFile(inputlocation) # Split line, convert to ints, format as (key,value) pairs where node n is the key, and its friends are the values data = data.map(lambda line: [num for num in line.split()]).map(lambda numList: [int(num) for num in numList]).map(lambda numList: (numList[0], numList[1:])) # Get all combinations of pairs of friends for each key. (n choose 2) combs = data.map(lambda w: [w[0], list(itertools.combinations(w[1], 2))]) # Create all possible triangles triangles = combs.flatMapValues(lambda x: x).map(lambda w: (w[0], w[1][0], w[1][1])) # Sort each tuple triangles = triangles.map(lambda w: tuple(sorted(w))) # Set up to be used as dictionary and count by key/ filter out the "fake" triangles that only show up once triangles = triangles.map(lambda w: (w, 0)) triangles = triangles.countByKey() triangles = list(filter(lambda x: x[1] > 1, triangles.items())) # Convert list to RDD again in order to sort output and convert data to string triangles = sc.parallelize(triangles) data_final = triangles.map(lambda w: w[0]).collect() data_final = sorted(data_final) # Convert to space-separated data_final = sc.parallelize(data_final) data_final = data_final.map(str).map(lambda w: w.strip('(),')) data_final = data_final.map(lambda w: w.replace(',', '')) # Save Output ---------------------------------------------------------------------------------------------------- data_final.saveAsTextFile(outputlocation) sc.stop() # Let Spark know that the job is done.
# COMMAND LINE: # spark-submit --master yarn --queue stats507w19 Stats_507/ps_fof.py /var/stats507w19/fof/friends.simple Stats_507/small_triangle_list
# COMMAND LINE: # spark-submit --master yarn --queue stats507w19 Stats_507/ps_fof.py /var/stats507w19/fof/friends1000 Stats_507/big_triangle_list