#!/usr/bin/env python # coding: utf-8 # # Map-Reduce and Apache Spark # # (C) 2015 [Steve Phelps](http://sphelps.net/) # # ## Overview # # 1. Recap of functional programming in Python # 2. Python's `map` and `reduce` functions # 3. Writing parallel code using `map` # 4. The Map-Reduce programming model # 5. Using Apache Spark with Python # # ## Reading # # - [Introduction to Parallel Computing](https://computing.llnl.gov/tutorials/parallel_comp/#DesignPartitioning), Blaise Barney, Lawrence Livermore National Laboratory. # # - Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified Data Processing on Large Clusters. Communications of the ACM, 51(1), 107–113. # # - [Spark Programming Guide](https://spark.apache.org/docs/latest/programming-guide.html) # # - Chapters 1 and 3 of Karau, H., Wendell, P., & Zaharia, M. (2015). Learning Spark: Lightning-Fast Big Data Analysis. O’Reilly. # # # ## History # # - The Map-Reduce programming model was popularised by Google (Dean and Ghemawat 2008). # # - The first popular open-source implementation was Apache Hadoop, first released in 2011. # # - Apache Spark was first released in 2014. # # - It was originally developed by [Matei Zaharia](http://people.csail.mit.edu/matei) as a class project, and later a PhD dissertation, at University of California, Berkeley. # # - In contrast to Hadoop, Apache Spark: # # - is easy to install and configure. # - provides a much more natural *iterative* workflow # # ## Resilient distributed datasets # # - The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD). # # - When working with Apache Spark we iteratively apply functions to every elelement of these collections in parallel to produce *new* RDDs. # # ## Functional programming # # Consider the following code: # In[1]: def double_everything_in(data): result = [] for i in data: result.append(2 * i) return result def quadruple_everything_in(data): result = [] for i in data: result.append(4 * i) return result # In[2]: double_everything_in([1, 2, 3, 4, 5]) # In[3]: quadruple_everything_in([1, 2, 3, 4, 5]) # - The above code violates the ["do not repeat yourself"](https://en.wikipedia.org/wiki/Don't_repeat_yourself_) principle of good software engineering practice. # # - How can rewrite the code so that it avoids duplication? # In[4]: def multiply_by_x_everything_in(x, data): result = [] for i in data: result.append(x * i) return result # In[5]: multiply_by_x_everything_in(2, [1, 2, 3, 4, 5]) # In[6]: multiply_by_x_everything_in(4, [1, 2, 3, 4, 5]) # - Now consider the following code: # In[7]: def squared(x): return x*x def double(x): return x*2 def square_everything_in(data): result = [] for i in data: result.append(squared(i)) return result def double_everything_in(data): result = [] for i in data: result.append(double(i)) return result # In[8]: square_everything_in([1, 2, 3, 4, 5]) # In[9]: double_everything_in([1, 2, 3, 4, 5]) # - The above code violates the ["do not repeat yourself"](https://en.wikipedia.org/wiki/Don't_repeat_yourself_) principle of good software engineering practice. # # - How can rewrite the code so that it avoids duplication? # ## Using functions as values # # In[10]: def apply_f_to_everything_in(f, data): result = [] for x in data: result.append(f(x)) return result # In[11]: apply_f_to_everything_in(squared, [1, 2, 3, 4, 5]) # In[12]: apply_f_to_everything_in(double, [1, 2, 3, 4, 5]) # ### Lambda expressions # # - We can use anonymous functions to save having to define a function each time we want to use map. # In[13]: apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5]) # # Python's `map` function # # - Python has a built-in function `map` which is much faster than our version. # # # In[15]: list(map(lambda x: x*x, [1, 2, 3, 4, 5])) # ## Implementing reduce # # - The `reduce` function is an example of a [fold](https://en.wikipedia.org/wiki/Fold_%28higher-order_function%29). # # - There are different ways we can fold data. # # - The following implements a *left* fold. # # In[18]: def foldl(f, data, z): if (len(data) == 0): print(z) return z else: head = data[0] tail = data[1:] print("Folding", head, "with", tail, "using", z) partial_result = f(z, data[0]) print("Partial result is", partial_result) return foldl(f, tail, partial_result) # In[19]: def add(x, y): return x + y foldl(add, [1, 2, 3, 4, 5], 0) # In[20]: foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0) # In[21]: foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0) # In[22]: (((((0 - 1) - 2) - 3) - 4) - 5) # - Subtraction is neither [commutative](https://en.wikipedia.org/wiki/Commutative_property) nor [associative](https://en.wikipedia.org/wiki/Associative_property), so the order in which apply the fold matters: # In[23]: (1 - (2 - (3 - (4 - (5 - 0))))) # In[24]: def foldr(f, data, z): if (len(data) == 0): return z else: return f(data[0], foldr(f, data[1:], z)) # In[25]: foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0) # In[26]: foldr(lambda x, y: x - y, [1, 2, 3, 4, 5], 0) # ## Python's `reduce` function. # # - Python's built-in `reduce` function is a *left* fold. # In[29]: from functools import reduce reduce(lambda x, y: x + y, [1, 2, 3, 4, 5]) # In[30]: reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0) # In[31]: foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0) # # Functional programming and parallelism # # - Functional programming lends itself to [parallel programming](https://computing.llnl.gov/tutorials/parallel_comp/#Models). # # - The `map` function can easily be parallelised through [data-level parallelism](https://en.wikipedia.org/wiki/Data_parallelism), # - provided that the function we supply as an argument is *free from* [side-effects](https://en.wikipedia.org/wiki/Side_effect_%28computer_science%29) # - (which is why we avoid working with mutable data). # # - We can see this by rewriting it so: # # In[33]: def perform_computation(f, result, data, i): print("Computing the ", i, "th result...") # This could be scheduled on a different CPU result[i] = f(data[i]) def my_map(f, data): result = [None] * len(data) for i in range(len(data)): perform_computation(f, result, data, i) # Wait for other CPUs to finish, and then.. return result # In[34]: my_map(lambda x: x * x, [1, 2, 3, 4, 5]) # ## A multi-threaded `map` function # In[38]: from threading import Thread def schedule_computation_threaded(f, result, data, threads, i): # Each function evaluation is scheduled on a different core. def my_job(): print("Processing data:", data[i], "... ") result[i] = f(data[i]) print("Finished job #", i) print("Result was", result[i]) threads[i] = Thread(target=my_job) def my_map_multithreaded(f, data): n = len(data) result = [None] * n threads = [None] * n print("Scheduling jobs.. ") for i in range(n): schedule_computation_threaded(f, result, data, threads, i) print("Starting jobs.. ") for i in range(n): threads[i].start() print("Waiting for jobs to finish.. ") for i in range(n): threads[i].join() print("All done.") return result # In[39]: my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5]) # In[40]: from numpy.random import uniform from time import sleep def a_function_which_takes_a_long_time(x): sleep(uniform(2, 10)) # Simulate some long computation return x*x my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5]) # ## Map Reduce # # - Map Reduce is a _programming model_ for scalable parallel processing. # - Scalable here means that it can work on big data with very large compute clusters. # - There are many implementations: e.g. Apache Hadoop and Apache Spark. # - We can use Map-Reduce with any programming language: # - Hadoop is written in Java # - Spark is written in Scala, but has a Python interface. # - *Functional programming* languages such as Python or Scala fit very well with the Map Reduce model: # - However, we don't *have* to use functional programming. # - A MapReduce implementation will take care of the low-level functionality so that you don't have to worry about: # - load balancing # - network I/O # - network and disk transfer optimisation # - handling of machine failures # - serialization of data # - etc.. # - The model is designed to move the processing to where the data resides. # ## Typical steps in a Map Reduce Computation # # 1. ETL a big data set. # 2. _Map_ operation: extract something you care about from each row # 3. "Shuffle and Sort": task/node allocation # 4. _Reduce_ operation: aggregate, summarise, filter or transform # 5. Write the results. # ## Callbacks for Map Reduce # # - The data set, and the state of each stage of the computation, is represented as a set of key-value pairs. # # - The programmer provides a map function: # # $\operatorname{map}(k, v) \rightarrow \; \left< k', v' \right>*$ # # - and a reduce function: # # $\operatorname{reduce}(k', \left< k', v'\right> *) \rightarrow \; \left< k', v'' # \right> *$ # # - The $*$ refers to a *collection* of values. # # - These collections are *not* ordered. # ## Resilient Distributed Data # # - In a Map-Reduce computation these collections are resilient distributed data-sets (RDDs): # - The data is distributed across nodes in a cluster of computers. # - No data is lost if a single node fails. # - Data is typically stored in HBase tables, or HDFS files. # - The `map` and `reduce` functions can work in *parallel* across # different keys, or different elements of the collection. # # - The underlying framework (e.g. Hadoop or Apache Spark) allocates data and processing to different nodes, without any intervention from the programmer. # ## Word Count Example # # - In this simple example, the input is a set of URLs, each record is a document. # # - Problem: compute how many times each word has occurred across data set. # ## Word Count: Map # # # - The input to $\operatorname{map}$ is a mapping: # - Key: URL # - Value: Contents of document # $\left< document1, to \; be \; or \; not \; to \; be \right>$ # # # - In this example, our $\operatorname{map}$ function will process a given URL, and produces a mapping: # - Key: word # - Value: 1 # - So our original data-set will be transformed to: # # $\left< to, 1 \right>$ # $\left< be, 1 \right>$ # $\left< or, 1 \right>$ # $\left< not, 1 \right>$ # $\left< to, 1 \right>$ # $\left< be, 1 \right>$ # # ## Word Count: Reduce # # # - The reduce operation groups values according to their key, and then performs areduce on each key. # # - The collections are partitioned across different storage units, therefore. # # - Map-Reduce will fold the data in such a way that it minimises data-copying across the cluster. # # - Data in different partitions are reduced separately in parallel. # # - The final result is a reduce of the reduced data in each partition. # # - Therefore it is very important that our operator *is both commutative and associative*. # # - In our case the function is the `+` operator # # $\left< be, 2 \right>$ # $\left< not, 1 \right>$ # $\left< or, 1 \right>$ # $\left< to, 2 \right>$ # # ## Map and Reduce compared with Python # # - Notice that these functions are formulated differently from the standard Python functions of the same name. # # - The `reduce` function works with key-value *pairs*. # # - It would be more apt to call it something like `reduceByKey`. # # ## MiniMapReduce # # - To illustrate how the Map-Reduce programming model works, we can implement our own Map-Reduce framework in Python. # # - This *illustrates* how a problem can be written in terms of `map` and `reduce` operations. # # - Note that these are illustrative functions; this is *not* how Hadoop or Apache Spark actually implement them. # In[45]: ########################################################## # # MiniMapReduce # # A non-parallel, non-scalable Map-Reduce implementation ########################################################## def groupByKey(data): result = dict() for key, value in data: if key in result: result[key].append(value) else: result[key] = [value] return result def reduceByKey(f, data): key_values = groupByKey(data) return list(map(lambda key: (key, reduce(f, key_values[key])), key_values)) # ## Word-count using MiniMapReduce # # In[49]: data = list(map(lambda x: (x, 1), "to be or not to be".split())) data # In[50]: groupByKey(data) # In[51]: reduceByKey(lambda x, y: x + y, data) # ## Parallelising MiniMapReduce # # - We can easily turn our Map-Reduce implementation into a parallel, multi-threaded framework # by using the `my_map_multithreaded` function we defined earlier. # # - This will allow us to perform map-reduce computations that exploit parallel processing using *multiple* cores on a *single* computer. # In[56]: def reduceByKey_multithreaded(f, data): key_values = groupByKey(data) return my_map_multithreaded( lambda key: (key, reduce(f, key_values[key])), [key for key in key_values]) # In[57]: reduceByKey_multithreaded(lambda x, y: x + y, data) # ## Parallelising the reduce step # # - Provided that our operator is both associative and commutative we can # also parallelise the reduce operation. # # - We partition the data into approximately equal subsets. # # - We then reduce each subset independently on a separate core. # # - The results can be combined in a final reduce step. # ### Partitioning the data # In[58]: def split_data(data, split_points): partitions = [] n = 0 for i in split_points: partitions.append(data[n:i]) n = i partitions.append(data[n:]) return partitions data = ['a', 'b', 'c', 'd', 'e', 'f', 'g'] partitioned_data = split_data(data, [3]) partitioned_data # ### Reducing across partitions in parallel # In[59]: from threading import Thread def parallel_reduce(f, partitions): n = len(partitions) results = [None] * n threads = [None] * n def job(i): results[i] = reduce(f, partitions[i]) for i in range(n): threads[i] = Thread(target = lambda: job(i)) threads[i].start() for i in range(n): threads[i].join() return reduce(f, results) parallel_reduce(lambda x, y: x + y, partitioned_data) # ## Map-Reduce on a cluster of computers # # - The code we have written so far will *not* allow us to exploit parallelism from multiple computers in a [cluster](https://en.wikipedia.org/wiki/Computer_cluster). # # - Developing such a framework would be a very large software engineering project. # # - There are existing frameworks we can use: # - [Apache Hadoop](https://hadoop.apache.org/) # - [Apache Spark](https://spark.apache.org/) # # - In this lecture we will cover Apache Spark. # ## Apache Spark # # - Apache Spark provides an object-oriented library for processing data on the cluster. # # - It provides objects which represent resilient distributed datasets (RDDs). # # - RDDs behave a bit like Python collections (e.g. lists). # # - However: # - the underlying data is distributed across the nodes in the cluster, and # - the collections are *immutable*. # ## Apache Spark and Map-Reduce # # - We process the data by using higher-order functions to map RDDs onto *new* RDDs. # # - Each instance of an RDD has at least two *methods* corresponding to the Map-Reduce workflow: # - `map` # - `reduceByKey` # # - These methods work in the same way as the corresponding functions we defined earlier to work with the standard Python collections. # # - There are also additional RDD methods in the Apache Spark API; # - Apache Spark is a *super-set* of Map-Reduce. # # ## Word-count in Apache Spark # # # In[60]: words = "to be or not to be".split() words # ### The `SparkContext` class # # - When working with Apache Spark we invoke methods on an object which is an instance of the `pyspark.context.SparkContext` context. # # - Typically, an instance of this object will be created automatically for you and assigned to the variable `sc`. # # - The `parallelize` method in `SparkContext` can be used to turn any ordinary Python collection into an RDD; # - normally we would create an RDD from a large file or an HBase table. # In[61]: words_rdd = sc.parallelize(words) words_rdd # ### Mapping an RDD # # - Now when we invoke the `map` or `reduceByKey` methods on `my_rdd` we can set up a parallel processing computation across the cluster. # In[62]: word_tuples_rdd = words_rdd.map(lambda x: (x, 1)) word_tuples_rdd # - Notice that we do not have a result yet. # # - The computation is not performed until we request the final result to be *collected*. # # - We do this by invoking the `collect()` method: # In[63]: word_tuples_rdd.collect() # ### Reducing an RDD # # - However, we require additional processing: # In[64]: word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y) word_counts_rdd # - Now we request the final result: # In[65]: word_counts = word_counts_rdd.collect() word_counts # ### Lazy evaluation # # - It is only when we invoke `collect()` that the processing is performed on the cluster. # # - Invoking `collect()` will cause both the `map` and `reduceByKey` operations to be performed. # # - If the resulting collection is very large then this can be an expensive operation. # # ### The head of an RDD # # - The `take` method is similar to `collect`, but only returns the first $n$ elements. # # - This can be very useful for testing. # # In[66]: word_counts_rdd.take(2) # ### The complete word-count example # In[67]: text = "to be or not to be".split() rdd = sc.parallelize(text) counts = rdd.map(lambda word: (word, 1)) \ .reduceByKey(lambda x, y: x + y) counts.collect() # ## Additional RDD transformations # # - Apache Spark offers many more methods for operating on collections of tuples over and above the standard Map-Reduce framework: # # - Sorting: `sortByKey`, `sortBy`, `takeOrdered` # - Mapping: `flatMap` # - Filtering: `filter` # - Counting: `count` # - Set-theoretic: `intersection`, `union` # - Many others: [see the Transformations section of the programming guide](https://spark.apache.org/docs/latest/programming-guide.html#transformations) # # ## Creating an RDD from a text file # # - In the previous example, we created an RDD from a Python collection. # # - This is *not* typically how we would work with big data. # # - More commonly we would create an RDD corresponding to data in an # HBase table, or an HDFS file. # # - The following example creates an RDD from a text file on the native filesystem (ext4); # - With bigger data, you would use an HDFS file, but the principle is the same. # # - Each element of the RDD corresponds to a single *line* of text. # In[91]: genome = sc.textFile('../../../data/genome.txt') genome.take(5) # ## Genome example # # - We will use this RDD to calculate the frequencies of sequences of five bases, and then sort the sequences into descending order ranked by their frequency. # # - First we will define some functions to split the bases into sequences of a certain size: # In[92]: def group_characters(line, n=5): result = '' i = 0 for ch in line: result = result + ch i = i + 1 if (i % n) == 0: yield result result = '' def group_and_split(line): return [sequence for sequence in group_characters(line)] # In[93]: group_and_split('abcdefghijklmno') # - Now we will transform the original text RDD into an RDD containing key-value pairs where the key is the sequence and the value is 1, as per the word-count example. # # - Notice that if we simply map each line of text, we will obtain multi-dimensional data: # In[94]: genome.map(group_and_split).take(2) # ### Flattening an RDD using `flatMap` # # - We will need to flatten this data in order to turn it into a list of base-sequences. # # - We can use the `flatMap` method: # In[95]: sequences = genome.flatMap(group_and_split) sequences.take(3) # In[96]: counts = \ sequences.map( lambda w: (w, 1)).reduceByKey(lambda x, y: x + y) counts.take(10) # - We want to rank each sequence according to its count. # # - Therefore the key (first element) of each tuple should be the count. # # - Thefefore we need to reverse the tuples. # In[97]: def reverse_tuple(key_value_pair): return (key_value_pair[1], key_value_pair[0]) # In[98]: sequences = counts.map(reverse_tuple) sequences.take(10) # ### Sorting an RDD # # - Now we can sort the RDD in descending order of key: # In[99]: sequences_sorted = sequences.sortByKey(False) top_ten_sequences = sequences_sorted.take(10) top_ten_sequences # ## Calculating $\pi$ using Spark # # - We can estimate an approximate value for $\pi$ using the following Monte-Carlo method: # # # 1. Inscribe a circle in a square # 2. Randomly generate points in the square # 3. Determine the number of points in the square that are also in the circle # 4. Let $r$ be the number of points in the circle divided by the number of points in the square, then $\pi \approx 4 r$. # # - Note that the more points generated, the better the approximation # # See [this tutorial](https://computing.llnl.gov/tutorials/parallel_comp/#ExamplesPI). # In[107]: import numpy as np def sample(p): x, y = np.random.random(), np.random.random() return 1 if x*x + y*y < 1 else 0 NUM_SAMPLES = 5000000 count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample) \ .reduce(lambda a, b: a + b) r = float(count) / float(NUM_SAMPLES) print("Pi is approximately %.3f" % (4.0 * r))