Parallel computing with Python

multiprocessing

The built in module multiprocessing provides functionality to create processes which runs given tasks.

http://docs.python.org/2/library/multiprocessing.html

All strategies for paralleliztion has a rathe large overhead compared to lower level languages such as C or FORTRAN.

The way multiprocessing runs code in parallel is by launching subprocesses with a seperate interpretor for for each process. This means that in order to gain speed the computation we want to perform should be relatively substantial.

(In case you are familiar with threads: It should be noted that Python has a threading module for working with threads, however, all threads will be run on a single CPU.)

Byt using multiprocessing we can utilize the machines we are running code on more efficiently

In [1]:
import multiprocessing
In [2]:
multiprocessing.cpu_count()
Out[2]:
16

Before talking about some more advanced featuers, let's describe the most typical use pattern of multiprocessing.

Note: multiprocessing can be used in the IPython Notebook, but there are sometimes issues with printing from subprocesses. To make things clearer and avoid complications we shall run external scripts in stead.

Process

Processes share nothing

To spawn a process, initiate it with a target function and call the .start() method.

This method will arrange things so that given code will be run in a seperate process from the parent process. To get the parent process to wait until a process has finished before moving on one need to call the .join() method.

In [4]:
import os
os.getpid()
Out[4]:
83928
In [7]:
%%file mp.py
from multiprocessing import Process
import os

def worker():
    print("Worker process {}".format(os.getpid()))

if __name__ == "__main__":
    proc1 = Process(target=worker)
    proc1.start()
    proc2 = Process(target=worker)
    proc2.start()
Overwriting mp.py
In [8]:
%%bash
python mp.py
Worker process 87461
Worker process 87462

To get the target function to actually work on some input, you need to provide the arguments in the constructur of the Process.

In [9]:
%%file mp.py
from multiprocessing import Process
import os

def worker(arg):
    print("Worker process {}, argument was {}".format(os.getpid(), arg))

if __name__ == "__main__":
    proc1 = Process(target=worker, args=(10,))
    proc1.start()
    proc2 = Process(target=worker, args=(11,))
    proc2.start()
Overwriting mp.py
In [10]:
%%bash
python mp.py
Worker process 87490, argument was 10
Worker process 87491, argument was 11

Processes communicate over interprocess communication channel

  • Queue
  • Pipe

Pipe

Gives a pair of connection objects which are connected by a pipe. Uses send and recv methods on objects to comminucate between processes.

Queue

Gives a thread and process safe queue shared between processes. Can contain any pickle-able object.

In [11]:
%%file mp2.py
from multiprocessing import Process, Queue
import os

def worker(tasks, results):
    t = tasks.get()
    result = t * 2
    results.put([os.getpid(), t, "->", result])

if __name__ == "__main__":
    n = 20
    my_tasks = Queue()
    my_results = Queue()
    
    workers = [Process(target=worker, args=(my_tasks, my_results)) for i in range(n)]
    
    for proc in workers:
        proc.start()
    
    for i in range(n):
        my_tasks.put(i)
    
    for i in range(n):
        result = my_results.get()
        print(result)
Overwriting mp2.py
In [12]:
%%bash
python mp2.py
[87680, 0, '->', 0]
[87681, 1, '->', 2]
[87682, 2, '->', 4]
[87683, 3, '->', 6]
[87685, 5, '->', 10]
[87684, 4, '->', 8]
[87687, 7, '->', 14]
[87686, 6, '->', 12]
[87688, 8, '->', 16]
[87689, 9, '->', 18]
[87691, 11, '->', 22]
[87690, 10, '->', 20]
[87693, 13, '->', 26]
[87692, 12, '->', 24]
[87694, 14, '->', 28]
[87695, 15, '->', 30]
[87696, 16, '->', 32]
[87697, 17, '->', 34]
[87699, 19, '->', 38]
[87698, 18, '->', 36]

Because the processes are executed in parallel we can never know the order of results being put in the Queue.

In [13]:
from multiprocessing import Queue
In [14]:
q = Queue()
In [15]:
q.get?

Manager

A special Process which holds Python objects so that other processes can manipulate them. When a managed object is manipulated somewhere, the manager will make sure the change is propagated to any other processes using the managed object.

In [16]:
%%file mp3.py
from multiprocessing import Process, Manager
import os

def worker(l):
    p = os.getpid()
    l[int(str(p)[-2:])] = p


if __name__ == "__main__":
    n = 100
    manager = Manager()
    l = manager.list()
    l.extend([0] * n)
    
    processes = [Process(target=worker, args=(l,)) for i in range(20)]
    
    for proc in processes:
        proc.start()
    
    for proc in processes:
        proc.join()
    
    print(l)
Overwriting mp3.py
In [17]:
%%bash
python mp3.py
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 87824, 87825, 87826, 87827, 87828, 87829, 87830, 87831, 87832, 87833, 87834, 87835, 87836, 87837, 87838, 87839, 87840, 87841, 87842, 87843, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

Pool

The Pool class distributes work between workers and collects the results as a list. It is extremely handy for quickly implementing some simple parallelization.

In [20]:
%%file mp.py
import multiprocessing
import os


def task(args):
    print "Running process", os.getpid(), "with args", args
    return os.getpid(), args


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = pool.map(task, [1,2,3,4]*3)
    print(result)
Overwriting mp.py
In [21]:
%%bash
python mp.py
Running process 88078 with args 4
Running process 88077 with args 3
Running process 88077 with args 3
Running process 88077 with args 2
Running process 88075 with args 1
Running process 88075 with args 1
Running process 88075 with args 4
Running process 88075 with args 3
Running process 88076 with args 2
Running process 88076 with args 2
Running process 88076 with args 1
Running process 88076 with args 4
[(88075, 1), (88076, 2), (88077, 3), (88078, 4), (88075, 1), (88076, 2), (88077, 3), (88075, 4), (88076, 1), (88077, 2), (88075, 3), (88076, 4)]

The method .map() works like the built in function map(), but will send data from the iterable to different processes. By default it will send one element at a time, but this can be changed with the chunksize parameter.

A similar method called .map_async() usually performs better in parallel, but in that case one has to fetch the results using a .get() method of the returned value of .map_async() (which is an instance of the class AsyncResult).

In [22]:
%%file mp.py
import multiprocessing
import os

def task(args):
    print "Running process", os.getpid(), "with args", args
    return os.getpid(), args


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = pool.map_async(task, [1,2,3,4])
    print(result.get())
Overwriting mp.py
In [23]:
%%bash
python mp.py
Running process 88108 with args 1
Running process 88109 with args 2
Running process 88110 with args 3
Running process 88111 with args 4
[(88108, 1), (88109, 2), (88110, 3), (88111, 4)]

IPython Parallel

http://ipython.org/ipython-doc/dev/parallel/

The strategy for achieving parallelization in IPython Parallel differs from multiprocessing in that you need to start engine processes and a hub process before you want to run code.

One then connect to the hub, which will aid in the distrubution of work across the engines.

When IPython is installed, a program ipcluster is also installed which simplifies starting engines and hub. To start a cluster with 4 workers one would run

$ ipcluster start -n 4


In the IPython Notebook, clusters can also be started from the IPython Dashboard.

In [24]:
from IPython.parallel import Client

Let's try initiating the client before we have started the cluster!

In [25]:
cli = Client()
---------------------------------------------------------------------------
IOError                                   Traceback (most recent call last)
<ipython-input-25-e24bb842824a> in <module>()
----> 1 cli = Client()

/Users/vale/.virtualenvs/devel/lib/python2.7/site-packages/IPython/parallel/client/client.pyc in __init__(self, url_or_file, profile, profile_dir, ipython_dir, context, debug, exec_key, sshserver, sshkey, password, paramiko, timeout, **extra_args)
    409                     url_or_file = os.path.join(self._cd.security_dir, url_or_file)
    410                 if not os.path.exists(url_or_file):
--> 411                     raise IOError("Connection file not found: %r" % url_or_file)
    412             with open(url_or_file) as f:
    413                 cfg = json.loads(f.read())

IOError: Connection file not found: u'/Users/vale/.ipython/profile_default/security/ipcontroller-client.json'

IPython keeps track of addresses and such of local engines in the .ipython directory in your home folder. But Client() can also take a URL or a path to a configuration file with information on how to connect to the cluster.

Now start the cluster (either in terminal or on the Dashboard)

In [26]:
cli = Client()
In [27]:
cli.ids
Out[27]:
[0, 1, 2, 3, 4, 5, 6, 7]

The ids field lists the engines in the cluster.

Say we want to have a look at the pid like we have done for all the multiprocessing exampels.

In [28]:
def get_pid():
    import os
    return os.getpid()
In [29]:
cli[0].apply_sync(get_pid)
Out[29]:
88707
In [30]:
get_pid()
Out[30]:
83928

We need to import os in the function we send to the engine. That engine was started without any imported functions. Every time we want the engines to have some data, we need to explicitly send it to them. Or instructions on how to get it in the case of import.

There is a method for syncing import across engines.

In [33]:
cli[:].apply_sync(get_pid)
Out[33]:
[88707, 88708, 88706, 88709, 88710, 88711, 88712, 88713]

A collection of engines is referred to as an engine pool, the standard view to an engine pool is called a direct view.

In [34]:
dview = cli[:]

The direct view provides a decorator which can be used to make any function parallelized.

In [35]:
@dview.parallel(block=True)
def pstr(x):
    return str(x)

After the .parallel decorater have been used on a function, it is changed in such a way that arguments given to the function will be split up, sent to engines, then put together again after the calculation is done.

The new ParallelFunction also has a .map() method which will run the function for each of the values in an iterable on different engines, and return the list of results when it's done.

In [36]:
str(range(10))
Out[36]:
'[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'
In [37]:
pstr(range(10))
Out[37]:
['[0, 1]', '[2, 3]', '[4]', '[5]', '[6]', '[7]', '[8]', '[9]']
In [38]:
pstr.map(range(10))
Out[38]:
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
In [39]:
from numpy import random
In [40]:
@dview.parallel(block=True)
def task(delay):
    import os, time
    
    t0 = time.time()
    pid = os.getpid()
    time.sleep(delay)
    t1 = time.time()
    
    return [pid, t0, t1]
In [43]:
delays = random.rand(4)
In [44]:
task.map(delays)
Out[44]:
[[88707, 1361974304.03308, 1361974304.959194],
 [88708, 1361974304.034218, 1361974304.233133],
 [88706, 1361974304.035217, 1361974304.954711],
 [88709, 1361974304.036044, 1361974304.276678]]
In [45]:
def visualize_tasks(results):
    res = np.array(results)
    fig, ax = plt.subplots(figsize=(10, res.shape[1]))
    
    yticks = []
    yticklabels = []
    tmin = min(res[:,1])
    for n, pid in enumerate(np.unique(res[:,0])):
        yticks.append(n)
        yticklabels.append("%d" % pid)
        for m in np.where(res[:,0] == pid)[0]:
            ax.add_patch(Rectangle((res[m,1] - tmin, n-0.25),
                         res[m,2] - res[m,1], 0.5, color="green", alpha=0.5))
        
    ax.set_ylim(-.5, n+.5)
    ax.set_xlim(0, max(res[:,2]) - tmin + 0.)
    ax.set_yticks(yticks)
    ax.set_yticklabels(yticklabels)
    ax.set_ylabel("PID")
    ax.set_xlabel("seconds")
In [46]:
delays = random.rand(100) / 4.
In [47]:
result = task.map(delays)
In [48]:
visualize_tasks(result)

There are more views to engine pools, one of them is the load_balanced_view which automatically helps utilize engines which normally would sit aroudn idly waiting for other engines to finish.

In [49]:
lbview = cli.load_balanced_view()
In [50]:
@lbview.parallel(block=True)
def lb_task(delay):
    import os, time
    
    t0 = time.time()
    pid = os.getpid()
    time.sleep(delay)
    t1 = time.time()
    
    return [pid, t0, t1]
In [51]:
result = lb_task.map(delays)
In [52]:
visualize_tasks(result)

To make sure that engines have the same modules loaded, one can use the method .sync_imports() of an engine pool view.

In [53]:
with dview.sync_imports():
    import numpy
    import os
importing numpy on engine(s)
importing os on engine(s)
In [54]:
%%px
os.getpid()
Out[0:1]: 88707
Out[1:1]: 88708
Out[2:1]: 88706
Out[3:1]: 88709
Out[4:1]: 88710
Out[5:1]: 88711
Out[6:1]: 88712
Out[7:1]: 88713

Since the engines will not have the same namespace as any interpreter that might want to use them, if we want them to have the same data to work on one need to send that data to the engines.

This is done with the .push() method of an engine pool view.

In [55]:
dview.push(dict(a=np.arange(5), b=np.zeros(5)), block=True)
Out[55]:
[None, None, None, None, None, None, None, None]
In [56]:
dview.pull("a", block=True)
Out[56]:
[array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4])]

For the sake of convenience, pushing and pulling to the namespaces on the engines can be done through a dictionary like syntax

In [57]:
dview["a"]
Out[57]:
[array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4]),
 array([0, 1, 2, 3, 4])]
In [58]:
dview["c"] = "See"
In [59]:
dview.pull("c", block=True)
Out[59]:
['See', 'See', 'See', 'See', 'See', 'See', 'See', 'See']

Something which might be more useful is to push different sections of data to various engines, just like when we applied the parallel function above.

The direct view has the methods .scatter() and .gather().

In [60]:
dview.scatter('x', np.arange(95), block=True)

This will divide the array to four sections, and divide those out to different engines.

In [61]:
%%px
x
Out[0:2]: array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11])
Out[1:2]: array([12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23])
Out[2:2]: array([24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35])
Out[3:2]: array([36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47])
Out[4:2]: array([48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59])
Out[5:2]: array([60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71])
Out[6:2]: array([72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83])
Out[7:2]: array([84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94])
In [62]:
def x_mean():
    return x.mean()
In [63]:
dview.apply_sync(x_mean)
Out[63]:
[5.5, 17.5, 29.5, 41.5, 53.5, 65.5, 77.5, 89.0]
In [64]:
dview.gather("x", targets=1, block=True)
Out[64]:
array([12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23])

An alternative to importing modules in the definition of a function, there is a @require decoratir in IPython.parallel which will cause the required module to be imported before the code in the function is executed.

In [65]:
from IPython.parallel import require
In [66]:
@require("os")
def get_pid():
    return os.getpid()
In [67]:
dview.apply_sync(get_pid)
Out[67]:
[88707, 88708, 88706, 88709, 88710, 88711, 88712, 88713]

Assignment 7

This is a very naive function for factoring integers

In [ ]:
def factorize(n):
    if n < 2:
        return []
    factors = []
    p = 2

    while True:
        if n == 1:
            return factors
        r = n % p
        if r == 0:
            factors.append(p)
            n = n / p
        elif p * p >= n:
            factors.append(n)
            return factors
        elif p > 2:
            p += 2
        else:
            p += 1

The task is to factorize all numbers from 2 to 500000, and count the number of unique factors. Then count how many times a factor count occurs. That is, make a histogram of factor counts.

For the sake of clarity, this is what the task would be for the numbers up to 10:

number  factors         unique   num_unique_factors
2       [2]                      1
3       [3]                      1
4       [2, 2]          [2]      1
5       [5]                      1
6       [2, 3]                   2
7       [7]                      1
8       [2, 2, 2]       [2]      1
9       [3, 3]          [3]      1
10      [2, 5]                   2

Which gives us

{1: 7, 2: 2}


1.

First implement this serially, that is, how you normally would. (It should take in the order of 15 seconds for 500000 numbers).

2.

Then make an implementation which uses multiprocessing for parallelization, (the parallelization strategy should be to factor some numbers on some processes, don't worry about the implementation of the factoring function).

3.

Implement the same thing using IPython.parallel. (Remember that you need to start the ipcluster before running a script using IPython.parallel, this is something I have a tendency to forget.)

For this assignment, put everything in one script. Call the script num_factors.py. Put it in the scripts directory of your repository.

The script should take a character as an argument to determining how it will be run. So to run it serially, you run

$ num_factors.py s

With multiprocessing

$ num_factors.py m

and with IPython.parallel

$ num_factors.py i

The script should print a dictionary like in the example above

In [ ]: