# %load ../../preconfig.py
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(color_codes=True)
plt.rcParams['axes.grid'] = False
import numpy as np
#import pandas as pd
#import sklearn
#import itertools
import logging
logger = logging.getLogger()
"big-data" analysis:
manage immense amounts of data quickly.
data is extremely regular $\to$ exploit parallelism.
new software stack:
"distributed file system" $\to$ MapReduce
When designing MapReduce algorithms, we often find that the greatest cost is in the communication.
commodity hardware + network
cluster computing: the new parallel-computing architecture.
racks: compute nodes are stored on racks.
communication:
The nodes on a single rack are connected by a network, typically gigabit Ethernet. And racks are connected by another level of network or a switch.
The bandwidth of inter-rack communication is somewhat greater than the intrarack Ethernet.
plt.imshow(plt.imread('./res/fig2_1.png'))
<matplotlib.image.AxesImage at 0x10983f278>
the solution for components failure (loss of node or rack):
Files must be stored redundantly.
Computations must be divided into tasks.
DFS: distributed file system
Google File System (GFS)
Hadoop Distributed File System (HDFS)
CloudStore
It is typically used as follows:
Files can be enormous, possibly a terabyte in size.
Files are rarely updated.
manage:
Files are divided into chunks.
Chunks are replicated at different compute nodes of different racks.
master node or name node: another small file to find the chunks of a file.
master node is iteself replicated, and a directory for the file systme as a whole knows where to find its copies.
The directory itself can be replicated, and all participants using the DFS know where the directory copies are.
All you need to write are two functions, called Map and Reduce.
a MapReduce computation executes as follows:
Map function: Map tasks turn the chunk given into a sequence of key-value pairs.
The key-value pairs from each Map task are collected by a master controller and sorted by key.
divide by key: all pairs with same key $\to$ same Reduce task.
Reduce function: The Reduce tasks work on one key at a time, and combine all the values associated with that key in some way.
plt.imshow(plt.imread('./res/fig2_2.png'))
<matplotlib.image.AxesImage at 0x10a9de978>
The Map function takes an input element as its argument and produces zero or more key-value pairs.
Group: the key-value pairs are groued by key, which is performed by the system.
partition: hash keys to reduce tasks.
reducer: the application of the Reduce function to a single key and its associated list of values.
a Reduce task executes one or more reducers.
why not to execute each reducer a separate Reduce task for maximum parallelism?
There is overhead associated with each task we create.
number of Reduce tasks $<$ number of reducers.
There is often significant variation in the lengths of the value lists for different keys, so different reducers take different amount of time. $\to$ skew.
if keys are sent randomly to less Reduce tasks $\to$ expect that time is average. $\to$ number of Reduce tasks $<$ compute nodes.
If a Reduce function is associative and commutative, (the values to be combined can be combined in any order, with the same result), we can push some of what the reducers do to the Map tasks (combiner).
plt.imshow(plt.imread('./res/fig2_3.png'))
<matplotlib.image.AxesImage at 0x108d39e80>
Master fails $\to$ restart.
Map worker fails $\to$ all the Map tasks assigned to the worker will have to redone. and also inform each Reduce task about the change.
Reduce worker fails. $\to$ resheduled on another reduce worker later.
Operations that can use MapReduce effectively:
very large matrix-vector multiplicatons
relational-algebra opertions
We first assume that $n$ is large, but not so larget that $\mathbf{v}$ cannot fit in main memory and thus be available to every Map task.
Map:
Reduce: simply sums all the values and produces pair $(i, x_i)$.
We can divide the matrix into vertical stripes of equal width and divide the vector into an equal number of horizontal stripes, of the same height.
plt.imshow(plt.imread('./res/fig2_4.png'))
<matplotlib.image.AxesImage at 0x10968b160>
Each Map task is assigned a chunk from one of the stripes of the matrix and gets the entire corresponding stripe of the vector.
particular application (PageRank calculation) has an additional constraint that the result vector should be partitioned in the same way as the input vector.
We shall see there that the best strategy involves partitioning the matrix $\mathbf{M}$ into square blocks, rahter than stripes.
There are many operations on data that can be described easily in terms of the common database-query primitives.
a relation is a table with column headers called attributes.
Rows of the relation are called tuples.
The set of attributes of a relation is called its schema.
$R(A_1, A_2, \dotsc, A_n)$: the relation name is $R$ and its attributes are $A_1, A_2, \dotsc, A_n$.
plt.imshow(plt.imread('./res/fig2_5.png'))
<matplotlib.image.AxesImage at 0x10a023e10>
relational algebra: several standard operations on relations.
Selection $\sigma{C} (R)$: select tuples that satisfy $C$
Projection $\pi_{S} (R)$: produces subset $S$ of the attributed
Union, Intersection, and Difference
Union:
Intersection:
Difference $S - R$:
Natural Join $R \bowtie S$
Grouping and Aggregation $\gamma_{X} (R)$: where $X$ consists of:
Let $R(A, B, C)$, for $\gamma_{A, \theta(B)} (R)$:
$\mathbf{M} \times \mathbf{N}$
grouping and aggregation
1st MapReduce
2nd MapReduce
Map:
Reduce: Each key $(i, k)$ will have an associated list with all the values $(M, j, m_{ij})$ and $(N, j, n_{jk})$, for all possible values of $j$.
Ex 2.3.5
some extensions and modifications, share the same characteristics:
idea: two-step workflow (Map, Reduce) $\to$ any collection of functions
two experimental systems
advantage: without need to store the temporary file that is output of one MapReduce job in the distributed file system.
plt.imshow(plt.imread('./res/fig2_6.png'))
<matplotlib.image.AxesImage at 0x109512160>
Many large-scale computations are really recursions.
mutually recursive tasks: modify input (flow graphs that are not acyclic), so it is not feasible to simple start when some node failed.
solution A: split to two step $\to$ backup data
Example 2.6
plt.imshow(plt.imread('./res/fig2_7.png'))
<matplotlib.image.AxesImage at 0x109ea37f0>
solution B: backup entire stats of each task checkpoints, so recovery the backup point if fail.
checkpoints is triggered at fixed supersteps.
def calc_prob(p, t):
return 10 - 9 * ((1 - p)**t)
p = np.linspace(0, 1, 100)
y1 = calc_prob(p, 10)
y2 = calc_prob(p, 100)
plt.plot(p, y1, 'b', p, y2, 'g')
[<matplotlib.lines.Line2D at 0x10b2f1c18>, <matplotlib.lines.Line2D at 0x10b2f1e10>]
suppose that supersteps should be $n$, the time of one execting a superstep is $t$.
for many applications, the bottleneck is moving data among tasks.
The communication cost of a task is the size of the input to the task. we shall often use the number of tuples as a measure size, rather than bytes.
The communication cost of an algorithm is the sum of the communication cost of all the tasks implementing that algorithm. We shall focus on the communication cost as the way to measure the efficiency of an algorithm, since the exceptions, where execution time of tasks dominates, are rare in practice.
We count only input size, and not output size.
Besides communication cost, we must also be aware of the importance of wall-clock time, the time it takes a prallel algorithm to finish.
The algorithms shall have the property that the work is divided fairly among the tasks.
example: $R(A, B) \bowtie S(B, C) \bowtie T(C, D)$.
Suppose that the relation $R, S, T$ have sizes $r, s$, and $t$, respectively. and for simplicity, suppose $p$ is the probability that any two tuples in each relations agree on the item they share.
$\operatorname{join} \left ( \operatorname{join} \left ( R(A, B) \bowtie S(B, C) \right ) \bowtie T(C, D) \right )$ or exchange the sequence of join.
1st MapReduce $\operatorname{join} \left ( R(A, B) \bowtie S(B, C) \right )$ $O(t + prs)$
2nd MapReduce $O(r + s + t + prs)$
Assume:
We plan to use $k$ reducers for the job.
$b, c$ represents the number of buckets into which we shall hash $B-$ and $C-$values, respectively.
$h(B) \to b$.
$g(C) \to c$.
we require $b c = k$.
So, the reducer corresponding to bucket pair $(i, j)$ is responsible for joining the tuples $R(u, v), S(v, w)$, and $T(w, x)$ whenever $h(v) = i$ and $g(w) = j$.
$S(v, w)$ to reducer $(h(v), g(w))$. communication cost: $s$
$R(u, v)$ to $c$ reducers $(h(v), c)$. communication cost: $c r$
$T(w, x)$ to $b$ reducers $(b, g(w))$. communication cost: $b t$
There is also a fixed cost $r + s + t$ to make each tuple of each relation be input to one of the Map tasks.
The problem arises: $$\operatorname{arg \, min}_{b, c} s + cr + bt \text{where} bc = k$$ We get the solution: $c = \sqrt{kt / r}$ and $b = \sqrt{kr / t}$. So $s + cr + bt = s + 2 \sqrt{k r t}$.
In all, the total communication cost is $r + 2s + t + 2 \sqrt{k r t}$.
#todo
our desire in the section:
to shrink the wall-clock time
to execute each reducer in main memory
two parameters that characterize families of MapReduce algorithms:
reducer size$q$: the upper bound on the number of values that are allowed to appear in the list associated with a single key. It can be selected with at least two goals in mind:
replication rate$r$: the number of key-value pairs producted by all the Map tasks on all the inputs, divided by the number of inputs. It is the average communication from Map tasks to Reduce tasks per input.
we are given a large set of element $X$ and a similarity measure $s(x, y)$ which is symmetric. The output of the algorithm is those pairs whose similarity exceeds a given threshold $t$.
eg: discover similar images in a collection of one million images.
solution:
$(\{i, j\}, [P_i, P_j])$ this algorithm will fail completely: the reducer size is small, however, the replication rate is 999,999 $\to$ the communication cost is extremelly large.
We can group pictures into $g$ groups, each of $10^6 / g$ pictures.
In this section, we hope to prove lower bounds on the replication rate. The first step is to introduce a graph model of problems.
For each problem solvable by a MapReduce algorithm there is:
A set of inputs.
A set of outputs.
A many-many relationship between the inputs and outputs, which describes which inputs are necessary to produce which outputs.
plt.imshow(plt.imread('./res/fig2_9.png'))
<matplotlib.image.AxesImage at 0x118fdc748>
mapping schema expresses how outputs are produced by the various reducers.
A mapping schema for a given problem with a given reducer size $q$ is an assignment of inputs to one or more reducers, such that:
No reducer is assigned more than $q$ inputs.
For every output of the problem, there is at least one reducer that is assigned all the inputs that are related to that output. We say this reducer covers the output.
The only way the absense of some inputs makes a difference is that we may wish to rethink the desired value of the reducer size $q$ when we select an algorithm from the family of possible algorithms.
minimum possible communication is known if we can prove a matching lower bound.
Here is an outline of the technique:
Prove an upper bound $g(q)$ on how many outputs a reducer with $q$ inputs can cover.
Determine the total number of outputs produced by the problem.
Suppose that there are $k$ reducers, and the $i$th reducer has $q_i < q$ inputs. Observe that $\sum_{i=1}^k g(q_i)$ must be no less than the number of outputs computed in step (2).
Manipulate the inequality from (3) to get a lower bound on $\sum_{i=1}^k q_i$. Often, the trick used is to replace some factors of $q_i$ by their upper bound $q$, but leave a single factor of $q_i$ in the term for $i$.
Since $\sum_{i=1}^k q_i$ is the total communication from Map tasks to Reduce tasks, divide to lower bound from (4) on this quantity by the number of inputs. The result is a lower bound on the replication rate.