(C) 2015 Steve Phelps
map
and reduce
functionsmap
Introduction to Parallel Computing, Blaise Barney, Lawrence Livermore National Laboratory.
Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51(1), 107–113.
Chapters 1 and 3 of Karau, H., Wendell, P., & Zaharia, M. (2015). Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly.
The Map-Reduce programming model was popularised by Google (Dean and Ghemawat 2008).
The first popular open-source implementation was Apache Hadoop, first released in 2011.
Apache Spark was first released in 2014.
It was originally developed by Matei Zaharia as a class project, and later a PhD dissertation, at University of California, Berkeley.
In contrast to Hadoop, Apache Spark:
The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).
When working with Apache Spark we iteratively apply functions to every elelement of these collections in parallel to produce new RDDs.
Consider the following code:
def double_everything_in(data):
result = []
for i in data:
result.append(2 * i)
return result
def quadruple_everything_in(data):
result = []
for i in data:
result.append(4 * i)
return result
double_everything_in([1, 2, 3, 4, 5])
[2, 4, 6, 8, 10]
quadruple_everything_in([1, 2, 3, 4, 5])
[4, 8, 12, 16, 20]
The above code violates the "do not repeat yourself" principle of good software engineering practice.
How can rewrite the code so that it avoids duplication?
def multiply_by_x_everything_in(x, data):
result = []
for i in data:
result.append(x * i)
return result
multiply_by_x_everything_in(2, [1, 2, 3, 4, 5])
[2, 4, 6, 8, 10]
multiply_by_x_everything_in(4, [1, 2, 3, 4, 5])
[4, 8, 12, 16, 20]
def squared(x):
return x*x
def double(x):
return x*2
def square_everything_in(data):
result = []
for i in data:
result.append(squared(i))
return result
def double_everything_in(data):
result = []
for i in data:
result.append(double(i))
return result
square_everything_in([1, 2, 3, 4, 5])
[1, 4, 9, 16, 25]
double_everything_in([1, 2, 3, 4, 5])
[2, 4, 6, 8, 10]
The above code violates the "do not repeat yourself" principle of good software engineering practice.
How can rewrite the code so that it avoids duplication?
def apply_f_to_everything_in(f, data):
result = []
for x in data:
result.append(f(x))
return result
apply_f_to_everything_in(squared, [1, 2, 3, 4, 5])
[1, 4, 9, 16, 25]
apply_f_to_everything_in(double, [1, 2, 3, 4, 5])
[2, 4, 6, 8, 10]
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])
[1, 4, 9, 16, 25]
map
function¶map
which is much faster than our version.list(map(lambda x: x*x, [1, 2, 3, 4, 5]))
[1, 4, 9, 16, 25]
def foldl(f, data, z):
if (len(data) == 0):
print(z)
return z
else:
head = data[0]
tail = data[1:]
print("Folding", head, "with", tail, "using", z)
partial_result = f(z, data[0])
print("Partial result is", partial_result)
return foldl(f, tail, partial_result)
def add(x, y):
return x + y
foldl(add, [1, 2, 3, 4, 5], 0)
Folding 1 with [2, 3, 4, 5] using 0 Partial result is 1 Folding 2 with [3, 4, 5] using 1 Partial result is 3 Folding 3 with [4, 5] using 3 Partial result is 6 Folding 4 with [5] using 6 Partial result is 10 Folding 5 with [] using 10 Partial result is 15 15
15
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0)
Folding 1 with [2, 3, 4, 5] using 0 Partial result is 1 Folding 2 with [3, 4, 5] using 1 Partial result is 3 Folding 3 with [4, 5] using 3 Partial result is 6 Folding 4 with [5] using 6 Partial result is 10 Folding 5 with [] using 10 Partial result is 15 15
15
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)
Folding 1 with [2, 3, 4, 5] using 0 Partial result is -1 Folding 2 with [3, 4, 5] using -1 Partial result is -3 Folding 3 with [4, 5] using -3 Partial result is -6 Folding 4 with [5] using -6 Partial result is -10 Folding 5 with [] using -10 Partial result is -15 -15
-15
(((((0 - 1) - 2) - 3) - 4) - 5)
-15
(1 - (2 - (3 - (4 - (5 - 0)))))
3
def foldr(f, data, z):
if (len(data) == 0):
return z
else:
return f(data[0], foldr(f, data[1:], z))
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)
Folding 1 with [2, 3, 4, 5] using 0 Partial result is -1 Folding 2 with [3, 4, 5] using -1 Partial result is -3 Folding 3 with [4, 5] using -3 Partial result is -6 Folding 4 with [5] using -6 Partial result is -10 Folding 5 with [] using -10 Partial result is -15 -15
-15
foldr(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)
3
reduce
function.¶reduce
function is a left fold.from functools import reduce
reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])
15
reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)
-15
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)
Folding 1 with [2, 3, 4, 5] using 0 Partial result is -1 Folding 2 with [3, 4, 5] using -1 Partial result is -3 Folding 3 with [4, 5] using -3 Partial result is -6 Folding 4 with [5] using -6 Partial result is -10 Folding 5 with [] using -10 Partial result is -15 -15
-15
Functional programming lends itself to parallel programming.
The map
function can easily be parallelised through data-level parallelism,
We can see this by rewriting it so:
def perform_computation(f, result, data, i):
print("Computing the ", i, "th result...")
# This could be scheduled on a different CPU
result[i] = f(data[i])
def my_map(f, data):
result = [None] * len(data)
for i in range(len(data)):
perform_computation(f, result, data, i)
# Wait for other CPUs to finish, and then..
return result
my_map(lambda x: x * x, [1, 2, 3, 4, 5])
Computing the 0 th result... Computing the 1 th result... Computing the 2 th result... Computing the 3 th result... Computing the 4 th result...
[1, 4, 9, 16, 25]
map
function¶from threading import Thread
def schedule_computation_threaded(f, result, data, threads, i):
# Each function evaluation is scheduled on a different core.
def my_job():
print("Processing data:", data[i], "... ")
result[i] = f(data[i])
print("Finished job #", i)
print("Result was", result[i])
threads[i] = Thread(target=my_job)
def my_map_multithreaded(f, data):
n = len(data)
result = [None] * n
threads = [None] * n
print("Scheduling jobs.. ")
for i in range(n):
schedule_computation_threaded(f, result, data, threads, i)
print("Starting jobs.. ")
for i in range(n):
threads[i].start()
print("Waiting for jobs to finish.. ")
for i in range(n):
threads[i].join()
print("All done.")
return result
my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5])
Scheduling jobs.. Starting jobs.. Processing data: 1 ... Finished job # 0 Result was 1 Processing data: 2 ... Finished job # 1 Result was 4 Processing data: 3 ... Finished job # 2 Result was 9 Processing data:Processing data: 5 ... Finished job # 4 Result was 25 Waiting for jobs to finish.. 4 ... Finished job # 3 Result was 16 All done.
[1, 4, 9, 16, 25]
from numpy.random import uniform
from time import sleep
def a_function_which_takes_a_long_time(x):
sleep(uniform(2, 10)) # Simulate some long computation
return x*x
my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5])
Scheduling jobs.. Starting jobs.. Processing data:Processing data: 2 ... 1 ... Processing data: 3 ... Processing data: 4 ... Processing data:Waiting for jobs to finish.. 5 ... Finished job # 2 Result was 9 Finished job # 4 Result was 25 Finished job # 0 Result was 1 Finished job # 3 Result was 16 Finished job # 1 Result was 4 All done.
[1, 4, 9, 16, 25]
The data set, and the state of each stage of the computation, is represented as a set of key-value pairs.
The programmer provides a map function:
$\operatorname{map}(k, v) \rightarrow \; \left< k', v' \right>*$
$\operatorname{reduce}(k', \left< k', v'\right> *) \rightarrow \; \left< k', v'' \right> *$
The $*$ refers to a collection of values.
These collections are not ordered.
In a Map-Reduce computation these collections are resilient distributed data-sets (RDDs):
map
and reduce
functions can work in parallel across
different keys, or different elements of the collection.The underlying framework (e.g. Hadoop or Apache Spark) allocates data and processing to different nodes, without any intervention from the programmer.
In this simple example, the input is a set of URLs, each record is a document.
Problem: compute how many times each word has occurred across data set.
$\left< document1, to \; be \; or \; not \; to \; be \right>$
So our original data-set will be transformed to:
$\left< to, 1 \right>$ $\left< be, 1 \right>$ $\left< or, 1 \right>$ $\left< not, 1 \right>$ $\left< to, 1 \right>$ $\left< be, 1 \right>$
The reduce operation groups values according to their key, and then performs areduce on each key.
The collections are partitioned across different storage units, therefore.
Map-Reduce will fold the data in such a way that it minimises data-copying across the cluster.
Data in different partitions are reduced separately in parallel.
The final result is a reduce of the reduced data in each partition.
Therefore it is very important that our operator is both commutative and associative.
In our case the function is the +
operator
$\left< be, 2 \right>$
$\left< not, 1 \right>$
$\left< or, 1 \right>$
$\left< to, 2 \right>$
Notice that these functions are formulated differently from the standard Python functions of the same name.
The reduce
function works with key-value pairs.
It would be more apt to call it something like reduceByKey
.
To illustrate how the Map-Reduce programming model works, we can implement our own Map-Reduce framework in Python.
This illustrates how a problem can be written in terms of map
and reduce
operations.
Note that these are illustrative functions; this is not how Hadoop or Apache Spark actually implement them.
##########################################################
#
# MiniMapReduce
#
# A non-parallel, non-scalable Map-Reduce implementation
##########################################################
def groupByKey(data):
result = dict()
for key, value in data:
if key in result:
result[key].append(value)
else:
result[key] = [value]
return result
def reduceByKey(f, data):
key_values = groupByKey(data)
return list(map(lambda key:
(key, reduce(f, key_values[key])),
key_values))
data = list(map(lambda x: (x, 1), "to be or not to be".split()))
data
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]
groupByKey(data)
{'to': [1, 1], 'be': [1, 1], 'or': [1], 'not': [1]}
reduceByKey(lambda x, y: x + y, data)
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]
by using the my_map_multithreaded
function we defined earlier.
def reduceByKey_multithreaded(f, data):
key_values = groupByKey(data)
return my_map_multithreaded(
lambda key: (key, reduce(f, key_values[key])), [key for key in key_values])
reduceByKey_multithreaded(lambda x, y: x + y, data)
Scheduling jobs.. Starting jobs.. Processing data: to ... Finished job # 0 Result was ('to', 2) Processing data: be ... Finished job # 1 Result was ('be', 2) Processing data: or ... Finished job # 2 Result was ('or', 1) Processing data: not ... Finished job # 3 Result was ('not', 1) Waiting for jobs to finish.. All done.
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]
also parallelise the reduce operation.
We partition the data into approximately equal subsets.
We then reduce each subset independently on a separate core.
The results can be combined in a final reduce step.
def split_data(data, split_points):
partitions = []
n = 0
for i in split_points:
partitions.append(data[n:i])
n = i
partitions.append(data[n:])
return partitions
data = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
partitioned_data = split_data(data, [3])
partitioned_data
[['a', 'b', 'c'], ['d', 'e', 'f', 'g']]
from threading import Thread
def parallel_reduce(f, partitions):
n = len(partitions)
results = [None] * n
threads = [None] * n
def job(i):
results[i] = reduce(f, partitions[i])
for i in range(n):
threads[i] = Thread(target = lambda: job(i))
threads[i].start()
for i in range(n):
threads[i].join()
return reduce(f, results)
parallel_reduce(lambda x, y: x + y, partitioned_data)
'abcdefg'
The code we have written so far will not allow us to exploit parallelism from multiple computers in a cluster.
Developing such a framework would be a very large software engineering project.
There are existing frameworks we can use:
In this lecture we will cover Apache Spark.
Apache Spark provides an object-oriented library for processing data on the cluster.
It provides objects which represent resilient distributed datasets (RDDs).
RDDs behave a bit like Python collections (e.g. lists).
However:
We process the data by using higher-order functions to map RDDs onto new RDDs.
Each instance of an RDD has at least two methods corresponding to the Map-Reduce workflow:
map
reduceByKey
These methods work in the same way as the corresponding functions we defined earlier to work with the standard Python collections.
There are also additional RDD methods in the Apache Spark API;
words = "to be or not to be".split()
words
['to', 'be', 'or', 'not', 'to', 'be']
SparkContext
class¶When working with Apache Spark we invoke methods on an object which is an instance of the pyspark.context.SparkContext
context.
Typically, an instance of this object will be created automatically for you and assigned to the variable sc
.
The parallelize
method in SparkContext
can be used to turn any ordinary Python collection into an RDD;
words_rdd = sc.parallelize(words)
words_rdd
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
map
or reduceByKey
methods on my_rdd
we can set up a parallel processing computation across the cluster.word_tuples_rdd = words_rdd.map(lambda x: (x, 1))
word_tuples_rdd
PythonRDD[1] at RDD at PythonRDD.scala:53
Notice that we do not have a result yet.
The computation is not performed until we request the final result to be collected.
We do this by invoking the collect()
method:
word_tuples_rdd.collect()
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]
word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)
word_counts_rdd
PythonRDD[6] at RDD at PythonRDD.scala:53
word_counts = word_counts_rdd.collect()
word_counts
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]
It is only when we invoke collect()
that the processing is performed on the cluster.
Invoking collect()
will cause both the map
and reduceByKey
operations to be performed.
If the resulting collection is very large then this can be an expensive operation.
The take
method is similar to collect
, but only returns the first $n$ elements.
This can be very useful for testing.
word_counts_rdd.take(2)
[('to', 2), ('be', 2)]
text = "to be or not to be".split()
rdd = sc.parallelize(text)
counts = rdd.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y)
counts.collect()
[('to', 2), ('be', 2), ('or', 1), ('not', 1)]
Apache Spark offers many more methods for operating on collections of tuples over and above the standard Map-Reduce framework:
sortByKey
, sortBy
, takeOrdered
flatMap
filter
count
intersection
, union
In the previous example, we created an RDD from a Python collection.
This is not typically how we would work with big data.
More commonly we would create an RDD corresponding to data in an
HBase table, or an HDFS file.
The following example creates an RDD from a text file on the native filesystem (ext4);
Each element of the RDD corresponds to a single line of text.
genome = sc.textFile('../../../data/genome.txt')
genome.take(5)
['TTGGCCATGCTGCCCACTCACCTAGAGCGCACAGCTGACACTGAGTCCTCTTCTGAACCTCATCCATGAA', 'CATATTTATGAAATCTTTCCTGGCCCCAAGTGGAAATGCCCCCTCATTTGGGTCCTCACTGAACCCCAGT', 'ACACAACTCTTTTGTACTACTCTATTATGCTGGGGTGTTTTTTTATTGTCTCACCTGATAAACCGTAAGC', 'CCCTTGAAGACAGCAACTCGTTTTTAAGCTCTTTATAACCCCAGAGCCTCGCACAGTACCTGGACCAGAT', 'TAAGGGGTACTTAACAGATGCTTAGTGAAGGAAGGAATGGATTTCTCACCTGGTTGCTTATCTTCTAGAC']
We will use this RDD to calculate the frequencies of sequences of five bases, and then sort the sequences into descending order ranked by their frequency.
First we will define some functions to split the bases into sequences of a certain size:
def group_characters(line, n=5):
result = ''
i = 0
for ch in line:
result = result + ch
i = i + 1
if (i % n) == 0:
yield result
result = ''
def group_and_split(line):
return [sequence for sequence in group_characters(line)]
group_and_split('abcdefghijklmno')
['abcde', 'fghij', 'klmno']
Now we will transform the original text RDD into an RDD containing key-value pairs where the key is the sequence and the value is 1, as per the word-count example.
Notice that if we simply map each line of text, we will obtain multi-dimensional data:
genome.map(group_and_split).take(2)
[['TTGGC', 'CATGC', 'TGCCC', 'ACTCA', 'CCTAG', 'AGCGC', 'ACAGC', 'TGACA', 'CTGAG', 'TCCTC', 'TTCTG', 'AACCT', 'CATCC', 'ATGAA'], ['CATAT', 'TTATG', 'AAATC', 'TTTCC', 'TGGCC', 'CCAAG', 'TGGAA', 'ATGCC', 'CCCTC', 'ATTTG', 'GGTCC', 'TCACT', 'GAACC', 'CCAGT']]
flatMap
¶We will need to flatten this data in order to turn it into a list of base-sequences.
We can use the flatMap
method:
sequences = genome.flatMap(group_and_split)
sequences.take(3)
['TTGGC', 'CATGC', 'TGCCC']
counts = \
sequences.map(
lambda w: (w, 1)).reduceByKey(lambda x, y: x + y)
counts.take(10)
[('TTGGC', 587), ('CATGC', 647), ('TGCCC', 599), ('ACTCA', 775), ('TGACA', 831), ('TTCTG', 1257), ('AACCT', 726), ('TTATG', 819), ('AAATC', 996), ('TGGCC', 718)]
We want to rank each sequence according to its count.
Therefore the key (first element) of each tuple should be the count.
Thefefore we need to reverse the tuples.
def reverse_tuple(key_value_pair):
return (key_value_pair[1], key_value_pair[0])
sequences = counts.map(reverse_tuple)
sequences.take(10)
[(587, 'TTGGC'), (647, 'CATGC'), (599, 'TGCCC'), (775, 'ACTCA'), (831, 'TGACA'), (1257, 'TTCTG'), (726, 'AACCT'), (819, 'TTATG'), (996, 'AAATC'), (718, 'TGGCC')]
sequences_sorted = sequences.sortByKey(False)
top_ten_sequences = sequences_sorted.take(10)
top_ten_sequences
[(37137, 'NNNNN'), (4653, 'AAAAA'), (4223, 'TTTTT'), (2788, 'AAAAT'), (2658, 'ATTTT'), (2283, 'AAATA'), (2276, 'TAAAA'), (2197, 'TTTTA'), (2196, 'TATTT'), (2185, 'AGAAA')]
See this tutorial.
import numpy as np
def sample(p):
x, y = np.random.random(), np.random.random()
return 1 if x*x + y*y < 1 else 0
NUM_SAMPLES = 5000000
count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
r = float(count) / float(NUM_SAMPLES)
print("Pi is approximately %.3f" % (4.0 * r))
Pi is approximately 3.141