An obvious way to improve the performance of Python code is to make it run in parallel. Any relatively new computer will have multiple cores, which means that several processors can operate on the same data stored on memory. However, most of the code we have written in the course so far does not take advantage of more than one of them. In addition there is now widespread availability of computing clusters, such as those offered for use by Amazon (Vanderbilt also has its own cluster). Clusters allow several computers to work together by exchanging data over a network.
Parallel computing involves breaking a task into several independent sub-tasks, distributing these sub-tasks to available processors or computers, then coordinating the execution of these tasks and combining their outputs in an appropriate way.
There are several different models for parallel processing, including:
multiprocessing
¶The simplest way (though probably not the best) for performing parallel computing in Python is via the built-in process-based library for concurrent computing, called multiprocessing
.
import multiprocessing
import os
import numpy as np
The multiprocessing
module parallelizes by launching multiple processes, each with a seperate interpretor. You may have already heard about threads. Processes and threads are not the same:
Since processes are independent, they now have independent Global Interpreter Locks (GILs), its best to run multiprocessing on multiple CPUs. You can check how many you have on your machine:
multiprocessing.cpu_count()
Process
class¶The Process
class encapsulates a task running in a process. It will usually have a target
argument that is some callable (function/method) that is executed when the process runs, along with optional arguments that can be passed to the target.
A Process
has several methods, with some useful ones being:
is_alive
: Returns True
if the process is running.join
: Waits for the process to finish its work and terminate. An optional timeout
argument can be passed.run
: When the process starts, this method is called to invoke the target
.terminate
: Kills the process forcibly, without any cleanup.A Process
also has several other non-callable attributes, such as pid
, name
and authkey
.
Here is a trivial example of using the Process
class, showing that each has its own process ID.
import os
def job(n):
print('I am working on job {0} running on PID {1}'.format(n, os.getpid()))
jobs = []
for i in range(5):
p = multiprocessing.Process(target=job, args=(i,))
jobs.append(p)
p.start()
jobs
We can easily subclass Process
to our liking:
class FibProcess(multiprocessing.Process):
def __init__(self, n):
self.n = n
multiprocessing.Process.__init__(self)
def run(self):
a, b = 0, 1
for i in range(self.n):
a, b = b, a + b
p = FibProcess(10000)
p.start()
print(p.pid)
p.join()
print(p.exitcode)
Queue
class¶Of course, despite being independent, we would like our processes to communicate with one another, for example, to share data. One way to facilitate this in multiprocessing
is via the Queue
class, a thread/process safe, first-in-first-out (FIFO) data structure that can store any serializable Python object.
A Queue
's important methods include:
put
: Adds item to Queue
.get
: Fetches next item from Queue
.close
: Closes the Queue
, preventing the addition of more data.empty
: Returns True if the Queue
is empty.full
: Returns True if full.qsize
: Retuns approximate current number of items in Queue
.A subclass of Queue
is the JoinableQueue
, which has additional methods, notably join
, which waits until all the items have been processed, blocking the addition of new items.
from multiprocessing import Queue
q = Queue()
q.put(-1)
q.put('foobar')
q.put(5)
print(q.get())
print(q.get())
Here's a toy example of Queue
usage, where a process is fed items from a function, until it receives a None
object.
First, a function to execute a task with items from a Queue
:
def consumer(q):
while True:
thing = q.get()
if thing is None:
break
print('Consuming {}'.format(thing))
print("\nFinished consuming")
Complementing this is another function that provisions the Queue
with items:
def producer(sequence, q):
for thing in sequence:
q.put(thing)
Initialize the Queue
and start the consumer
process:
queue = multiprocessing.Queue()
consumer_process = multiprocessing.Process(target=consumer, args=[queue])
consumer_process.start()
Feed the Queue
and process until finished:
stuff = [42, 'foobar', True, range(5)]
producer(stuff, queue)
queue.put(None)
consumer_process.join()
Two things to be aware of:
terminate
a process that is still accessing a queue, the queue may become corruptedPool
class¶We often have a task that we want to split up among several worker processes in parallel. The Pool
class creates a number of processes and the methods for passing work to them. A Pool
has the following key methods:
apply
: Executes a passed function in a process and returns the result.apply_async
: Same as apply, but the result is returned asynchronously via a callbackmap
: A parallel version of apply
, which splits an iterable of data into chunks and farms chunks out to processes.map_async
: Asynchronous map
.As an example, we will choose a statistical computing task that is embarassingly parallel. This function generates statistics of bootstrapped samples from a dataset.
def bootstrap(data, nsamples, f):
boot_samples = data[np.random.randint(len(data), size=(nsamples, len(data)))]
return [f(s) for s in boot_samples]
pool = multiprocessing.Pool(processes=4)
some_data = np.random.poisson(4, 25)
result = pool.apply_async(bootstrap, (some_data, 1000, np.mean))
The result is an ApplyResult
object:
result
We may then want to take the result and calculate a confidence interval based on the quantiles.
def bootstrap_ci(boot, alpha=0.05):
lower_index = int(np.floor((0.5*alpha)*len(boot)))
upper_index = int(np.floor((1.-0.5*alpha)*len(boot)))
return boot[lower_index], boot[upper_index]
bootstrap_ci(np.sort(result.get()))
# Clean up
pool.close()
pool.join()
But, since we used Pool.apply
, this is not a parallel task. We need to use map
.
def mapped_bootstrap(n):
return bootstrap(some_data, n, np.mean)
pool = multiprocessing.Pool(processes=4)
map_result = pool.map_async(mapped_bootstrap, [250]*4)
map_result
parallel_results = map_result.get()
[len(p) for p in parallel_results]
bootstrap_ci(np.sort(np.ravel(parallel_results)))
pool.close()
pool.join()
The multiprocessing package is very useful for highly parallel tasks that do not need to communicate with each other, other than when sending the initial data to the pool of processes and when and collecting the results.
The IPython architecture consists of four components, which reside in the ipyparallel
package:
Engine The IPython engine is a Python instance that accepts Python commands over a network connection. When multiple engines are started, parallel and distributed computing becomes possible. An important property of an IPython engine is that it blocks while user code is being executed.
Hub The hub keeps track of engine connections, schedulers, clients, as well as persist all task requests and results in a database for later use.
Schedulers All actions that can be performed on the engine go through a Scheduler. While the engines themselves block when user code is run, the schedulers hide that from the user to provide a fully asynchronous interface to a set of engines.
Client The primary object for connecting to a cluster.
(courtesy Min Ragan-Kelley)
This architecture is implemented using the ØMQ messaging library and the associated Python bindings in pyzmq
.
In order to use IPython for parallel computing, you will need to start the IPython
controller and two or more IPython engines. The simplest way of doing this is
with the clusters tab, or you can use the ipcluster
command in a terminal:
$ ipcluster start --n=4
This command will start 4 IPython engines on the current host, which is appropriate for many desktop multicore systems. You can also setup IPython clusters that span many nodes in a computing cluster, but this is beyond the scope of this lecture, but you can get more information from the IPython.parallel docs..
To use the IPython cluster in our Python programs or notebooks, we start by creating an instance of ipyparallel.Client
:
from ipyparallel import Client
cli = Client()
This creates a client using the default profile; you can pass an optional profile="my_profile"
argument if you have a different one running.
Using the ids
attribute we can retreive a list of ids for the IPython engines in the cluster:
cli.ids
We can use a DirectView
object for execution of tasks, which an be accessed simply by indexing the client:
dv0 = cli[0]
dv0
The above shows just a single engine, but we want all of them:
dview = cli[:]
dview
We can get a view on whatever combination of engines we want:
cli[::2]
cli[1::2]
The block
flag specifies whether to wait for the result, or return an AsyncResult
object immediately:
dview.block = True
Finally, since we want to use IPython's parallel magic commands, we set the DirectView
to be active
:
dview.activate()
Each of these engines are ready to execute tasks. We can selectively run code on individual engines. For example, we can simply use os.getpid
to return the process ID that the engine is running on. Here is the notebook process:
import os
os.getpid()
Here is a single engine's process ID:
dv0.apply_sync(os.getpid)
And here are all the engines, run simultaneously:
dview.apply_sync(os.getpid)
Let's now consider a useful function that we might want to run in parallel. Here is a version of the approximate Bayesian computing (ABC) algorithm that we have seen in previous lectures.
import numpy
def abc(y, N, epsilon=[0.2, 0.8]):
trace = []
while len(trace) < N:
# Simulate from priors
mu = numpy.random.normal(0, 10)
sigma = numpy.random.uniform(0, 20)
x = numpy.random.normal(mu, sigma, 50)
#if (np.linalg.norm(y - x) < epsilon):
if ((abs(x.mean() - y.mean()) < epsilon[0]) &
(abs(x.std() - y.std()) < epsilon[1])):
trace.append([mu, sigma])
return trace
import numpy as np
y = np.random.normal(4, 2, 50)
Let's try running this on one of the cluster engines:
dv0.block = True
dv0.apply(abc, y, 10)
This fails with a NameError
because NumPy has not been imported on the engine to which we sent the task. Each engine has its own namespace, so we need to import whatever modules we will need prior to running our code:
cli[0].execute("import numpy")
dv0.apply(abc, y, 10)
A more efficient way is to simultaneously import modules into the local and the engine namespaces simultaneously, using a context manager:
with dview.sync_imports():
import numpy
t = dview.apply(abc, y, 10)
Easier yet, you can use the parallel cell magic to import everywhere:
%%px
import numpy
You can also use the require
decorator for functions that you wish to use on engines.
from ipyparallel import require
@require("numpy")
def abc(y, N, epsilon=[0.2, 0.8]):
trace = []
while len(trace) < N:
# Simulate from priors
mu = numpy.random.normal(0, 10)
sigma = numpy.random.uniform(0, 20)
x = numpy.random.normal(mu, sigma, 50)
#if (np.linalg.norm(y - x) < epsilon):
if ((abs(x.mean() - y.mean()) < epsilon[0]) &
(abs(x.std() - y.std()) < epsilon[1])):
trace.append([mu, sigma])
return trace
A simple way to run code on an engine is via the execute
method:
dv0.execute('x=3')
dv0['x']
We will often want to send data to our engines, or retrieve objects from them. DirectView
has push
and pull
methods for achieving this.
Recall that Python namespaces are just dictionaries. So, we can update an engine's namespace by pushing a dictionary:
dv0.push({'foo': -3, 'bar': np.arange(10)})
dv0.pull(('x', 'bar'))
Additionally, DirectView
objects also have scatter
and gather
methods, to distribute data among engines. scatter
accepts any container or Numpy array
type, while gather
assembles the respective return objects in the Client.
# Some Gaussian data
y
dview = cli[:2]
# Send to engines
dview.scatter('y', y)
dview['y']
# Remote execution of function
dview.execute('sum_y = sum(y)')
# Aggregation on client
sum(dview.gather('sum_y'))
The map
method essentially combines scatter
and gather
into a single call:
dview.map(lambda x: sum(x**2), np.split(y, 5))
The DirectView
objects we have used so far strictly allocate particular tasks to particular engines. This is often inefficient, when tasks take variable amounts of time, leaving some engines idle while some are overworked. We can use a load balanced view to distribute memory approximately equally among engines, to minimize idle time.
lb_view = cli.load_balanced_view()
lb_view
A LoadBalancedView
, though it works with all the engines (or specified subsets of engines), behaves as if it is working with a single engine.
If you do not specify the engines when the LoadBalancedView
is created, it will use all the engines that are available when it assigns tasks.
for i in range(10):
pid = lb_view.apply_sync(os.getpid)
print('Task {0} ran on process {1}'.format(i, pid))
%%px
def abc(y, N, epsilon=[0.2, 0.8]):
trace = []
while len(trace) < N:
# Simulate from priors
mu = numpy.random.normal(0, 10)
sigma = numpy.random.uniform(0, 20)
x = numpy.random.normal(mu, sigma, 50)
#if (np.linalg.norm(y - x) < epsilon):
if ((abs(x.mean() - y.mean()) < epsilon[0]) &
(abs(x.std() - y.std()) < epsilon[1])):
trace.append([mu, sigma])
return trace
tasks = lb_view.map_async(lambda n: abc(y, n), [20]*5)
tasks.msg_ids
result = np.concatenate(tasks.get())
result[:10]
Another way that you can dispatch tasks to engines is via the parallel
decorator. This decorator is a method of the DirectView
class that controls our engine pool. The decorated function is then disparched to the engines using the map
method that the decorator adds to the class.
@lb_view.parallel(block=True)
def abc(y, N, epsilon=[0.2, 0.8]):
trace = []
while len(trace) < N:
# Simulate from priors
mu = numpy.random.normal(0, 10)
sigma = numpy.random.uniform(0, 20)
x = numpy.random.normal(mu, sigma, 50)
#if (np.linalg.norm(y - x) < epsilon):
if ((abs(x.mean() - y.mean()) < epsilon[0]) &
(abs(x.std() - y.std()) < epsilon[1])):
trace.append([mu, sigma])
return trace
abc.map([y]*4, [25]*4)
The %px
cell magic is a "parallel execution" statement, which will run the code in that cell on all the engines.
%%px
import os
print(os.getpid())
%px b = numpy.random.random()
%px b
%pxresult
displays the output of the last request:
%pxresult
The %pxconfig
magic allows you to configure blocking for the parallel magics.
# Switch to asynchronous
%pxconfig --block
Remember that each engine is just another IPython, so anyting you can do in an IPython session you can also do on an engine.
%px %matplotlib inline
%px samples = abc(y, 100)
%%px
import matplotlib.pyplot as plt
import os
tsamples = numpy.transpose(samples)
plt.hist(tsamples[0])
plt.hist(tsamples[1])
_ = plt.title('PID %i' % os.getpid())
For profiling, we can also use the %timeit
magic to compare performance on the engines:
%%px
%%timeit
s = abc(y, 10)
In order to use Cython in parallel on IPython, we need to load and execute cythonmagic
on all engines.
%px %load_ext cython
As an example, let's use the Cythonized Gibbs sampler from the last lecture.
%%px
%%cython
from numpy import zeros, random
from numpy cimport *
from libc.math cimport sqrt
gamma = random.gamma
normal = random.normal
def gibbs(int N=20000, int thin=200):
cdef:
ndarray[float64_t, ndim=2] mat = zeros((N,2))
float64_t x,y = 0
int i,j
for i in range(N):
for j in range(thin):
x = gamma(3, y**2 + 4)
y = normal(1./(x+1), 1./sqrt(2*(x+1)))
mat[i] = x,y
return mat
Divide the array by the number of nodes. In this case they divide evenly, a more general partitioning of sizes is easy to do as well.
cli.ids
N = 100000
thin = 10
n = N/len(cli.ids)
dv = cli[:]
dv.push(dict(n=n, thin=thin))
# Let's just confirm visually we got what we expect
dv['n']
We can time the execution of the gibbs sampler on the remote nodes
%pxconfig --noblock
%timeit dv.execute('gibbs(n, thin)')
But a more realistic (and costly) benchmark must also include the cost of bringing the results back from the cluster engines to our local namespace. For that, we assign the call to the variable a
on each node and then use the view's gather
method to pull them back in:
%%timeit
dv.execute('a = gibbs(n, thin)')
a = dv.gather('a')
Let's compare this to the same number of samples executed on a single process:
%load_ext cython
%%cython
from libc.math cimport sqrt
from numpy import zeros, random
from numpy cimport *
gamma = random.gamma
normal = random.normal
def gibbs(int N=20000, int thin=200):
cdef:
ndarray[float64_t, ndim=2] mat = zeros((N,2))
float64_t x,y = 0
int i,j
for i in range(N):
for j in range(thin):
x = gamma(3, y**2 + 4)
y = normal(1./(x+1), 1./sqrt(2*(x+1)))
mat[i] = x,y
return mat
%%timeit
a = gibbs(N, thin)
Run parallel chains of the disaster_model
example from PyMC and return the resulting traces to your client, for plotting and summarization.
# Write your answer here