New in IPython Parallel 5.0 is the fact that our AsyncResult object is now a Future (specifically a subclass of concurrent.futures.Future).
This means it can be integrated into any Future-using application.
import time
import ipyparallel as ipp
rc = ipp.Client()
dv = rc[:]
dv
<DirectView [0, 1, 2, 3]>
def random_norm(n):
"""Generates a 1xN array and computes its 2-norm"""
import numpy
from numpy.linalg import norm
A = numpy.random.random(n)
return norm(A, 2)
The basic async API hasn't changed:
f = rc[-1].apply(random_norm, 100)
f
<AsyncResult: random_norm>
f.get()
5.8521376289386202
But the familiar AsyncResult object is now a Future:
f.__class__.mro()
[ipyparallel.client.asyncresult.AsyncResult, concurrent.futures._base.Future, object]
This means that we can use Future APIs to access results, etc.
f.result()
5.8521376289386202
import os
f = rc[-1].apply(os.getpid)
f.add_done_callback(lambda _: print("I got PID: %i" % _.result()))
f.get()
I got PID: 5002
5002
A more complex example shows us how AsyncResults can be integrated into existing async applications, now that they are Futures:
from tornado.gen import coroutine, sleep
from tornado.ioloop import IOLoop
import sys
def sleep_task(t):
time.sleep(t)
return os.getpid()
@coroutine
def background():
"""A backgorund coroutine to demonstrate that we aren't blocking"""
while True:
yield sleep(1)
print('.', end=' ')
sys.stdout.flush() # not needed after ipykernel 4.3
@coroutine
def work():
"""Submit some work and print the results when complete"""
for t in [ 1, 2, 3, 4 ]:
ar = rc[:].apply(sleep_task, t)
result = yield ar # this waits
print(result)
loop = IOLoop()
loop.add_callback(background)
loop.run_sync(work)
. [4998, 4997, 4999, 5002] . . [4998, 4997, 4999, 5002] . . . [4998, 4997, 4999, 5002] . . . . [4998, 4997, 4999, 5002]
So if you have an existing async application using coroutined and/or Futures, you can now integrate IPython Parallel as a standard async component for submitting work and waiting for its results.
Executors are a standard Python API provided by various job-submission tools. A standard API such as Executor is useful for different libraries to expose this common API for asynchronous execution, because it means different implementations can be easily swapped out for each other and compared, or the best one for a given context can be used without having to change the code.
With IPython Parallel, every View has an .executor
property, to provide the Executor API for the given View.
Just like Views, the assignment of work for an Executor depends on the View from which it was created.
You can get an Executor for any View by accessing View.executor
:
ex_all = rc[:].executor
ex_all.view.targets
[0, 1, 2, 3]
even_lbview = rc.load_balanced_view(targets=rc.ids[::2])
ex_even = even_lbview.executor
for pid in ex_even.map(lambda x: os.getpid(), range(10)):
print(pid)
4998 4999 4998 4998 4998 4999 4999 4998 4999 4999
Typically, though, one will want an Executor for a LoadBalancedView on all the engines. In which case, use the top-level Client.executor method:
ex = rc.executor()
ex.view
<LoadBalancedView None>
Let's make a few Executors:
The distributed executor assumes you have started a distributed cluster on the default local interface, e.g.
$> dcluster 127.0.0.1 127.0.0.1 127.0.0.1 127.0.0.1
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import distributed
class DistributedExecutor(distributed.Executor):
"""Wrap distributed.Executor to provide standard Executor.map API
distributed.Executor.map returns list of Futures,
not iterable of results, like everything else.
See blaze/distributed#91
"""
def map(self, *args, **kwargs):
list_of_futures = super().map(*args, **kwargs)
for f in list_of_futures:
yield f.result()
N = 4
ip_ex = rc.executor(targets=range(N))
dist_ex = DistributedExecutor('127.0.0.1:8787')
thread_ex = ThreadPoolExecutor(N)
process_ex = ProcessPoolExecutor(N)
executors = [process_ex, thread_ex, ip_ex, dist_ex]
We can submit the same work with the same API, using four different mechanisms for distributing work. The results will be the same:
for executor in executors:
print(executor.__class__.__name__)
it = executor.map(str, range(5))
print(list(it))
ProcessPoolExecutor ['0', '1', '2', '3', '4'] ThreadPoolExecutor ['0', '1', '2', '3', '4'] ViewExecutor ['0', '1', '2', '3', '4'] DistributedExecutor ['0', '1', '2', '3', '4']
This makes it easy to compare the different implementations. We are going to submit some dummy work—allocate and compute 2-norms of arrays of various sizes.
def task(n):
"""Generates a 1xN array and computes its 2-norm"""
import numpy
from numpy.linalg import norm
A = numpy.ones(n)
return norm(A, 2)
sizes = np.logspace(20, 24, 16, base=2, dtype=int)
sizes
array([ 1048576, 1261463, 1517571, 1825676, 2196334, 2642245, 3178688, 3824041, 4600417, 5534417, 6658042, 8009791, 9635980, 11592325, 13945857, 16777216])
Run the work locally, to get a reference:
print("Local time:")
%time ref = list(map(task, sizes))
Local time: CPU times: user 161 ms, sys: 404 ms, total: 565 ms Wall time: 560 ms
And then run again with the various Executors:
for executor in executors:
print(executor.__class__.__name__)
result = executor.map(task, sizes)
rlist = list(result)
assert rlist == ref, "%s != %s" % (rlist, ref)
# time the task assignment
%timeit list(executor.map(task, sizes))
ProcessPoolExecutor 10 loops, best of 3: 126 ms per loop ThreadPoolExecutor 10 loops, best of 3: 149 ms per loop ViewExecutor 10 loops, best of 3: 151 ms per loop DistributedExecutor 10 loops, best of 3: 141 ms per loop
For this toy work, the stdlib ProcessPoolExecutor appears to perform the best (though in testing, it seems to crash quite a bit). That's useful info. One benefit of IPython Parallel or Distributed Executors over the stdlib Executors is that they do not have to be confined to a single machine. This means the standard Executor API lets you develop small-scale parallel tools that run locally in threads or processes, and then extend the exact same code to make use of multiple machines, just by selecting a different Executor.
That seems pretty useful.