# Map-Reduce and Apache Spark¶

(C) 2015 Steve Phelps

## Overview¶

1. Recap of functional programming in Python
2. Python's map and reduce functions
3. Writing parallel code using map
4. The Map-Reduce programming model
5. Using Apache Spark with Python

• Introduction to Parallel Computing, Blaise Barney, Lawrence Livermore National Laboratory.

• Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51(1), 107–113.

• Spark Programming Guide

• Chapters 1 and 3 of Karau, H., Wendell, P., & Zaharia, M. (2015). Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly.

## History¶

• The Map-Reduce programming model was popularised by Google (Dean and Ghemawat 2008).

• The first popular open-source implementation was Apache Hadoop, first released in 2011.

• Apache Spark was first released in 2014.

• It was originally developed by Matei Zaharia as a class project, and later a PhD dissertation, at University of California, Berkeley.

• In contrast to Hadoop, Apache Spark:

• is easy to install and configure.
• provides a much more natural iterative workflow

## Resilient distributed datasets¶

• The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).

• When working with Apache Spark we iteratively apply functions to every elelement of these collections in parallel to produce new RDDs.

## Functional programming¶

Consider the following code:

In :
def double_everything_in(data):
result = []
for i in data:
result.append(2 * i)
return result

result = []
for i in data:
result.append(4 * i)
return result

In :
double_everything_in([1, 2, 3, 4, 5])

Out:
[2, 4, 6, 8, 10]
In :
quadruple_everything_in([1, 2, 3, 4, 5])

Out:
[4, 8, 12, 16, 20]
• The above code violates the "do not repeat yourself" principle of good software engineering practice.

• How can rewrite the code so that it avoids duplication?

In :
def multiply_by_x_everything_in(x, data):
result = []
for i in data:
result.append(x * i)
return result

In :
multiply_by_x_everything_in(2, [1, 2, 3, 4, 5])

Out:
[2, 4, 6, 8, 10]
In :
multiply_by_x_everything_in(4, [1, 2, 3, 4, 5])

Out:
[4, 8, 12, 16, 20]
• Now consider the following code:
In :
def squared(x):
return x*x

def double(x):
return x*2

def square_everything_in(data):
result = []
for i in data:
result.append(squared(i))
return result

def double_everything_in(data):
result = []
for i in data:
result.append(double(i))
return result

In :
square_everything_in([1, 2, 3, 4, 5])

Out:
[1, 4, 9, 16, 25]
In :
double_everything_in([1, 2, 3, 4, 5])

Out:
[2, 4, 6, 8, 10]
• The above code violates the "do not repeat yourself" principle of good software engineering practice.

• How can rewrite the code so that it avoids duplication?

## Using functions as values¶

In :
def apply_f_to_everything_in(f, data):
result = []
for x in data:
result.append(f(x))
return result

In :
apply_f_to_everything_in(squared, [1, 2, 3, 4, 5])

Out:
[1, 4, 9, 16, 25]
In :
apply_f_to_everything_in(double, [1, 2, 3, 4, 5])

Out:
[2, 4, 6, 8, 10]

### Lambda expressions¶

• We can use anonymous functions to save having to define a function each time we want to use map.
In :
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])

Out:
[1, 4, 9, 16, 25]

# Python's map function¶

• Python has a built-in function map which is much faster than our version.
In :
map(lambda x: x*x, [1, 2, 3, 4, 5])

Out:
[1, 4, 9, 16, 25]

## Implementing reduce¶

• The reduce function is an example of a fold.

• There are different ways we can fold data.

• The following implements a left fold.

In :
def foldl(f, data, z):
if (len(data) == 0):
print z
return z
else:
tail = data[1:]
print "Folding", head, "with", tail, "using", z
partial_result = f(z, data)
print "Partial result is", partial_result
return foldl(f, tail, partial_result)

In :
def add(x, y):
return x + y

foldl(add, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with  using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15

Out:
15
In :
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with  using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15

Out:
15
In :
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with  using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15

Out:
-15
In :
(((((0 - 1) - 2) - 3) - 4) - 5)

Out:
-15
In :
(1 - (2 - (3 - (4 - (5 - 0)))))

Out:
3
In :
def foldr(f, data, z):
if (len(data) == 0):
return z
else:
return f(data, foldr(f, data[1:], z))

In :
foldl(lambda x, y: x - y,  [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with  using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15

Out:
-15
In :
foldr(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Out:
3

## Python's reduce function.¶

• Python's built-in reduce function is a left fold.
In :
reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])

Out:
15
In :
reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Out:
-15
In :
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with  using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15

Out:
-15

# Functional programming and parallelism¶

• Functional programming lends itself to parallel programming.

• The map function can easily be parallelised through data-level parallelism,

• provided that the function we supply as an argument is free from side-effects
• (which is why we avoid working with mutable data).
• We can see this by rewriting it so:

In :
def perform_computation(f, result, data, i):
print "Computing the ", i, "th result..."
# This could be scheduled on a different CPU
result[i] = f(data[i])

def my_map(f, data):
result = [None] * len(data)
for i in range(len(data)):
perform_computation(f, result, data, i)
# Wait for other CPUs to finish, and then..
return result

In :
my_map(lambda x: x * x, [1, 2, 3, 4, 5])

Computing the  0 th result...
Computing the  1 th result...
Computing the  2 th result...
Computing the  3 th result...
Computing the  4 th result...

Out:
[1, 4, 9, 16, 25]

## A multi-threaded map function¶

In :
from threading import Thread

# Each function evaluation is scheduled on a different core.
def my_job():
print "Processing data:", data[i], "... "
result[i] = f(data[i])
print "Finished job #", i
print "Result was", result[i]

n = len(data)
result = [None] * n
print "Scheduling jobs.. "
for i in range(n):
print "Starting jobs.. "
for i in range(n):
print "Waiting for jobs to finish.. "
for i in range(n):
print "All done."
return result

In :
my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5])

Scheduling jobs..
Starting jobs..
Processing data: 1 ...
Finished job # 0
Result was 1
Processing data: 2 ...
Finished job # 1
Result was 4
Processing data: 3 ...
Finished job # 2
Result was 9
Processing data: 4 ...
Finished job # 3
Result was 16
Processing data: 5 ...
Finished job # 4
Result was 25
Waiting for jobs to finish..
All done.

Out:
[1, 4, 9, 16, 25]
In :
from numpy.random import uniform
from time import sleep

def a_function_which_takes_a_long_time(x):
sleep(uniform(2, 10))  # Simulate some long computation
return x*x

my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5])

Scheduling jobs..
Starting jobs..
Processing data: 1 ...
Processing data: 2 ...
Processing data: 3 ...
Processing data: 4 ...
Processing data: 5 ...
Waiting for jobs to finish..
Finished job # 4
Result was 25
Finished job # 0
Result was 1
Finished job # 3
Result was 16
Finished job # 2
Result was 9
Finished job # 1
Result was 4
All done.

Out:
[1, 4, 9, 16, 25]

## Map Reduce¶

• Map Reduce is a programming model for scalable parallel processing.
• Scalable here means that it can work on big data with very large compute clusters.
• There are many implementations: e.g. Apache Hadoop and Apache Spark.
• We can use Map-Reduce with any programming language:
• Hadoop is written in Java
• Spark is written in Scala, but has a Python interface.
• Functional programming languages such as Python or Scala fit very well with the Map Reduce model:
• However, we don't have to use functional programming.
• A MapReduce implementation will take care of the low-level functionality so that you don't have to worry about:
• network I/O
• network and disk transfer optimisation
• handling of machine failures
• serialization of data
• etc..
• The model is designed to move the processing to where the data resides.

## Typical steps in a Map Reduce Computation¶

1. ETL a big data set.
2. Map operation: extract something you care about from each row
3. "Shuffle and Sort": task/node allocation
4. Reduce operation: aggregate, summarise, filter or transform
5. Write the results.

## Callbacks for Map Reduce¶

• The data set, and the state of each stage of the computation, is represented as a set of key-value pairs.

• The programmer provides a map function:

$\operatorname{map}(k, v) \rightarrow \; \left< k', v' \right>*$

• and a reduce function:

$\operatorname{reduce}(k', \left< k', v'\right> *) \rightarrow \; \left< k', v'' \right> *$

• The $*$ refers to a collection of values.

• These collections are not ordered.

## Resilient Distributed Data¶

• In a Map-Reduce computation these collections are resilient distributed data-sets (RDDs):

• The data is distributed across nodes in a cluster of computers.
• No data is lost if a single node fails.
• Data is typically stored in HBase tables, or HDFS files.
• The map and reduce functions can work in parallel across different keys, or different elements of the collection.
• The underlying framework (e.g. Hadoop or Apache Spark) allocates data and processing to different nodes, without any intervention from the programmer.

## Word Count Example¶

• In this simple example, the input is a set of URLs, each record is a document.

• Problem: compute how many times each word has occurred across data set.

## Word Count: Map¶

• The input to $\operatorname{map}$ is a mapping:
• Key: URL
• Value: Contents of document

$\left< document1, to \; be \; or \; not \; to \; be \right>$

• In this example, our $\operatorname{map}$ function will process a given URL, and produces a mapping:
• Key: word
• Value: 1
• So our original data-set will be transformed to:

$\left< to, 1 \right>$ $\left< be, 1 \right>$ $\left< or, 1 \right>$ $\left< not, 1 \right>$ $\left< to, 1 \right>$ $\left< be, 1 \right>$

## Word Count: Reduce¶

• The reduce operation groups values according to their key, and then performs areduce on each key.

• The collections are partitioned across different storage units, therefore.

• Map-Reduce will fold the data in such a way that it minimises data-copying across the cluster.

• Data in different partitions are reduced separately in parallel.

• The final result is a reduce of the reduced data in each partition.

• Therefore it is very important that our operator is both commutative and associative.

• In our case the function is the + operator

$\left< be, 2 \right>$
$\left< not, 1 \right>$
$\left< or, 1 \right>$
$\left< to, 2 \right>$

## Map and Reduce compared with Python¶

• Notice that these functions are formulated differently from the standard Python functions of the same name.

• The reduce function works with key-value pairs.

• It would be more apt to call it something like reduceByKey.

## MiniMapReduce¶

• To illustrate how the Map-Reduce programming model works, we can implement our own Map-Reduce framework in Python.

• This illustrates how a problem can be written in terms of map and reduce operations.

• Note that these are illustrative functions; this is not how Hadoop or Apache Spark actually implement them.

In :
##########################################################
#
#   MiniMapReduce
#
# A non-parallel, non-scalable Map-Reduce implementation
##########################################################

def groupByKey(data):
result = dict()
for key, value in data:
if key in result:
result[key].append(value)
else:
result[key] = [value]
return result

def reduceByKey(f, data):
key_values = groupByKey(data)
return map(lambda key:
(key, reduce(f, key_values[key])),
key_values)


## Word-count using MiniMapReduce¶

In :
data = map(lambda x: (x, 1), "to be or not to be".split())
data

Out:
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]
In :
groupByKey(data)

Out:
{'be': [1, 1], 'not': , 'or': , 'to': [1, 1]}
In :
reduceByKey(lambda x, y: x + y, data)

Out:
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

## Parallelising MiniMapReduce¶

• We can easily turn our Map-Reduce implementation into a parallel, multi-threaded framework by using the my_map_multithreaded function we defined earlier.

• This will allow us to perform map-reduce computations that exploit parallel processing using multiple cores on a single computer.

In :
def reduceByKey_multithreaded(f, data):
key_values = groupByKey(data)
lambda key: (key, reduce(f, key_values[key])), key_values.keys())

In :
reduceByKey_multithreaded(lambda x, y: x + y, data)

Scheduling jobs..
Starting jobs..
Processing data: not ...
Finished job # 0
Result was ('not', 1)
Processing data: to ...
Finished job # 1
Result was ('to', 2)
Processing data: or ...
Finished job # 2
Result was ('or', 1)
Processing data: be ...
Finished job # 3
Result was ('be', 2)
Waiting for jobs to finish..
All done.

Out:
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

## Parallelising the reduce step¶

• Provided that our operator is both associative and commutative we can also parallelise the reduce operation.

• We partition the data into approximately equal subsets.

• We then reduce each subset independently on a separate core.

• The results can be combined in a final reduce step.

### Partitioning the data¶

In :
def split_data(data, split_points):
partitions = []
n = 0
for i in split_points:
partitions.append(data[n:i])
n = i
partitions.append(data[n:])
return partitions

data = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
partitioned_data = split_data(data, )
partitioned_data

Out:
[['a', 'b', 'c'], ['d', 'e', 'f', 'g']]

### Reducing across partitions in parallel¶

In :
from threading import Thread

def parallel_reduce(f, partitions):

n = len(partitions)
results = [None] * n

def job(i):
results[i] = reduce(f, partitions[i])

for i in range(n):

for i in range(n):

return reduce(f, results)

parallel_reduce(lambda x, y: x + y, partitioned_data)

Out:
'abcdefg'

## Map-Reduce on a cluster of computers¶

• The code we have written so far will not allow us to exploit parallelism from multiple computers in a cluster.

• Developing such a framework would be a very large software engineering project.

• There are existing frameworks we can use:

• In this lecture we will cover Apache Spark.

## Apache Spark¶

• Apache Spark provides an object-oriented library for processing data on the cluster.

• It provides objects which represent resilient distributed datasets (RDDs).

• RDDs behave a bit like Python collections (e.g. lists).

• However:

• the underlying data is distributed across the nodes in the cluster, and
• the collections are immutable.

## Apache Spark and Map-Reduce¶

• We process the data by using higher-order functions to map RDDs onto new RDDs.

• Each instance of an RDD has at least two methods corresponding to the Map-Reduce workflow:

• map
• reduceByKey
• These methods work in the same way as the corresponding functions we defined earlier to work with the standard Python collections.

• There are also additional RDD methods in the Apache Spark API;

• Apache Spark is a super-set of Map-Reduce.

## Word-count in Apache Spark¶

In :
words = "to be or not to be".split()
words

Out:
['to', 'be', 'or', 'not', 'to', 'be']

### The SparkContext class¶

• When working with Apache Spark we invoke methods on an object which is an instance of the pyspark.context.SparkContext context.

• Typically, an instance of this object will be created automatically for you and assigned to the variable sc.

• The parallelize method in SparkContext can be used to turn any ordinary Python collection into an RDD;

• normally we would create an RDD from a large file or an HBase table.
In :
words_rdd = sc.parallelize(words)
words_rdd

Out:
ParallelCollectionRDD at parallelize at PythonRDD.scala:423

### Mapping an RDD¶

• Now when we invoke the map or reduceByKey methods on my_rdd we can set up a parallel processing computation across the cluster.
In :
word_tuples_rdd = words_rdd.map(lambda x: (x, 1))
word_tuples_rdd

Out:
PythonRDD at RDD at PythonRDD.scala:43
• Notice that we do not have a result yet.

• The computation is not performed until we request the final result to be collected.

• We do this by invoking the collect() method:

In :
word_tuples_rdd.collect()

Out:
[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]

### Reducing an RDD¶

• However, we require additional processing:
In :
word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)
word_counts_rdd

Out:
PythonRDD at RDD at PythonRDD.scala:43
• Now we request the final result:
In :
word_counts = word_counts_rdd.collect()
word_counts

Out:
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

### Lazy evaluation¶

• It is only when we invoke collect() that the processing is performed on the cluster.

• Invoking collect() will cause both the map and reduceByKey operations to be performed.

• If the resulting collection is very large then this can be an expensive operation.

### The head of an RDD¶

• The take method is similar to collect, but only returns the first $n$ elements.

• This can be very useful for testing.

In :
word_counts_rdd.take(2)

Out:
[('not', 1), ('to', 2)]

### The complete word-count example¶

In :
text = "to be or not to be".split()
rdd = sc.parallelize(text)
counts = rdd.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y)
counts.collect()

Out:
[('not', 1), ('to', 2), ('or', 1), ('be', 2)]

• Apache Spark offers many more methods for operating on collections of tuples over and above the standard Map-Reduce framework:

• Sorting: sortByKey, sortBy, takeOrdered
• Mapping: flatMap
• Filtering: filter
• Counting: count
• Set-theoretic: intersection, union
• Many others: see the Transformations section of the programming guide

## Creating an RDD from a text file¶

• In the previous example, we created an RDD from a Python collection.

• This is not typically how we would work with big data.

• More commonly we would create an RDD corresponding to data in an HBase table, or an HDFS file.

• The following example creates an RDD from a text file on the native filesystem (ext4);

• With bigger data, you would use an HDFS file, but the principle is the same.
• Each element of the RDD corresponds to a single line of text.

In :
genome = sc.textFile('/tmp/genome.txt')


## Genome example¶

• We will use this RDD to calculate the frequencies of sequences of five bases, and then sort the sequences into descending order ranked by their frequency.

• First we will define some functions to split the bases into sequences of a certain size:

In :
def group_characters(line, n=5):
result = ''
i = 0
for ch in line:
result = result + ch
i = i + 1
if (i % n) == 0:
yield result
result = ''

def group_and_split(line):
return [sequence for sequence in group_characters(line)]

In :
group_and_split('abcdefghijklmno')

Out:
['abcde', 'fghij', 'klmno']
• Now we will transform the original text RDD into an RDD containing key-value pairs where the key is the sequence and the value is 1, as per the word-count example.

• Notice that if we simply map each line of text, we will obtain multi-dimensional data:

In :
genome.map(group_and_split).take(2)

Out:
[[u'CAGGG',
u'GCACA',
u'GTCTC',
u'GGCTC',
u'ACTTC',
u'GACCT',
u'CTGCC',
u'TCCCC',
u'AGTTC',
u'AAGTG',
u'ATTCT',
u'CCTGC',
u'CTCAG',
u'TCTCC'],
[u'TGAGT',
u'AGCTG',
u'GGATG',
u'ACAGG',
u'AGTGG',
u'AGCAT',
u'GCCTA',
u'GCTAA',
u'TCTTT',
u'GTATT',
u'TCTAG',
u'TAGAG',
u'ATGCG',
u'GTTTT']]

### Flattening an RDD using flatMap¶

• We will need to flatten this data in order to turn it into a list of base-sequences.

• We can use the flatMap method:

In :
sequences = genome.flatMap(group_and_split)
sequences.take(3)

Out:
[u'CAGGG', u'GCACA', u'GTCTC']
In :
counts = \
sequences.map(
lambda w: (w, 1)).reduceByKey(lambda x, y: x + y)
counts.take(10)

Out:
[(u'TGTCA', 1),
(u'GCCCA', 3),
(u'CCAAG', 5),
(u'GCCCC', 4),
(u'CATGT', 1),
(u'AGATT', 1),
(u'TGTTT', 1),
(u'CCTAT', 4),
(u'TCAGT', 1),
(u'CAGCG', 2)]
• We want to rank each sequence according to its count.

• Therefore the key (first element) of each tuple should be the count.

• Thefefore we need to reverse the tuples.

In :
def reverse_tuple(key_value_pair):
return (key_value_pair, key_value_pair)

In :
sequences = counts.map(reverse_tuple)
sequences.take(10)

Out:
[(1, u'TGTCA'),
(3, u'GCCCA'),
(5, u'CCAAG'),
(4, u'GCCCC'),
(1, u'CATGT'),
(1, u'AGATT'),
(1, u'TGTTT'),
(4, u'CCTAT'),
(1, u'TCAGT'),
(2, u'CAGCG')]

### Sorting an RDD¶

• Now we can sort the RDD in descending order of key:
In :
sequences_sorted = sequences.sortByKey(False)
top_ten_sequences = sequences_sorted.take(10)
top_ten_sequences

Out:
[(15, u'AAAAA'),
(9, u'GCAGG'),
(8, u'ACAAA'),
(7, u'GGCCA'),
(7, u'AATTA'),
(7, u'AGGTT'),
(7, u'AGGGA'),
(7, u'CCAGG'),
(7, u'GAGCC'),
(7, u'AAAAC')]

## Calculating $\pi$ using Spark¶

• We can estimate an approximate value for $\pi$ using the following Monte-Carlo method:
1. Inscribe a circle in a square
2. Randomly generate points in the square
3. Determine the number of points in the square that are also in the circle
4. Let $r$ be the number of points in the circle divided by the number of points in the square, then $\pi \approx 4 r$.
• Note that the more points generated, the better the approximation

See this tutorial.

In :
import numpy as np

def sample(p):
x, y = np.random.random(), np.random.random()
return 1 if x*x + y*y < 1 else 0

NUM_SAMPLES = 1000000

count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
r = float(count) / float(NUM_SAMPLES)
print "Pi is approximately %f" % (4.0 * r)

Pi is approximately 3.142800