Recall there are three components of IPython parallel:
from IPython.parallel import Client
rc = Client()
rc.ids
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Here we use all the engines. A DirectView
is returned.
dview = rc[:]
Then define the is_prime
function as usual.
with open('../builtin-cpuheavy/prime_list.txt') as f:
PRIMES = [int(l) for l in f]
def is_prime(n):
# import until the function is called
# make sure all engines import math
# (not a good use pattern though, more on this later)
import math
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
Use map_async
or map_sync
to map function to run in parallel
ar = dview.map_async(is_prime, PRIMES[:8])
wait_interactive()
blocks the notebook server, which provides the current task status.
Note that if one iterrupts tasks here, it only interrupts the notebook itself, the IPython cluster is still running.
ar.wait_interactive()
8/8 tasks finished after 4 s done
ar.get()
[True, True, True, True, True, False, True, True]
speedup = ar.serial_time / ar.wall_time
speedup
3.0117662156527576
the metadata for each task's execution can be asssed by ar.metadata
ar.metadata[:1]
[{'after': [], 'completed': datetime.datetime(2015, 4, 27, 1, 32, 42, 693804), 'data': {}, 'engine_id': 0, 'engine_uuid': '5880a92f-21cb-4b49-8221-e0b9ae659b02', 'error': None, 'execute_input': None, 'execute_result': None, 'follow': [], 'msg_id': '275ccb27-345e-4a6e-a9d7-c68e3a705ca7', 'outputs': [], 'outputs_ready': True, 'received': datetime.datetime(2015, 4, 27, 1, 32, 42, 695203), 'started': datetime.datetime(2015, 4, 27, 1, 32, 41, 695235), 'status': 'ok', 'stderr': '', 'stdout': '', 'submitted': datetime.datetime(2015, 4, 27, 1, 32, 41, 693984)}]
If any modules imported, engines should import them as well. So here use a dview.sync_import()
context_manager to help this issue. Note that import numpy as np
will not actually intepreted as np
module on engines but instead remaining numpy
.
with dview.sync_imports():
import math
import numpy as np # this won't work
importing math on engine(s) importing numpy on engine(s)
def find_np():
np.random.randint(10)
rc[:2].apply_sync(find_np)
[0:apply]: ---------------------------------------------------------------------------NameError Traceback (most recent call last)<string> in <module>() <ipython-input-10-d9e2624d366e> in find_np() NameError: name 'np' is not defined [1:apply]: ---------------------------------------------------------------------------NameError Traceback (most recent call last)<string> in <module>() <ipython-input-10-d9e2624d366e> in find_np() NameError: name 'np' is not defined
In IPython shell, %%px
ipython magic helps do some trivial parallel setup. The %%px
cell block executes its statements on all engines.
%%px --local
will executes the statments in the notebook as well.
%%px
import numpy as np
np.random.randint(6)
Out[0:1]: 1
Out[1:1]: 3
Out[2:1]: 5
Out[3:1]: 0
Out[4:1]: 5
Out[5:1]: 0
Out[6:1]: 0
Out[7:1]: 4
Out[8:1]: 2
Out[9:1]: 4
Out[10:1]: 5
Out[11:1]: 0
Try to run the following for multiple times, since engines use same processes (like a remote Python intepreter) the return value will stay the same.
%%px
import os
os.getpid()
Out[0:2]: 97098
Out[1:2]: 97099
Out[2:2]: 97100
Out[3:2]: 97102
Out[4:2]: 97104
Out[5:2]: 97106
Out[6:2]: 97108
Out[7:2]: 97110
Out[8:2]: 97112
Out[9:2]: 97114
Out[10:2]: 97116
Out[11:2]: 97118
Pushing / pulling a variable to all engines
# push
dview['prog'] = 'val_prime'
# pull
dview['prog']
['val_prime', 'val_prime', 'val_prime', 'val_prime', 'val_prime', 'val_prime', 'val_prime', 'val_prime', 'val_prime', 'val_prime', 'val_prime', 'val_prime']
# all engines get a portion of x's elements
ar = dview.scatter('x', list(range(15)))
ar.wait()
dview['x']
[[0, 1], [2, 3], [4, 5], [6], [7], [8], [9], [10], [11], [12], [13], [14]]
# get x from all engines and combined
dview.gather('x', block=True)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
Here is another example
%%px
import numpy as np
rand_n = np.random.randint(0, 10, 6)
dview['rand_n']
[array([3, 6, 0, 2, 1, 6]), array([2, 2, 7, 3, 6, 8]), array([7, 9, 6, 3, 9, 6]), array([4, 8, 2, 6, 8, 6]), array([6, 0, 2, 1, 5, 0]), array([7, 9, 4, 7, 8, 9]), array([6, 9, 8, 4, 5, 3]), array([5, 2, 5, 3, 7, 3]), array([3, 2, 0, 8, 9, 8]), array([2, 5, 8, 8, 6, 5]), array([1, 8, 6, 6, 0, 8]), array([7, 8, 9, 8, 0, 0])]
dview.gather('rand_n', block=True)
array([3, 6, 0, 2, 1, 6, 2, 2, 7, 3, 6, 8, 7, 9, 6, 3, 9, 6, 4, 8, 2, 6, 8, 6, 6, 0, 2, 1, 5, 0, 7, 9, 4, 7, 8, 9, 6, 9, 8, 4, 5, 3, 5, 2, 5, 3, 7, 3, 3, 2, 0, 8, 9, 8, 2, 5, 8, 8, 6, 5, 1, 8, 6, 6, 0, 8, 7, 8, 9, 8, 0, 0])
# sum at each engine
def rand_sum():
return np.sum(rand_n)
ar = dview.apply_async(rand_sum)
ar
<AsyncResult: finished>
ar.get()
[18, 28, 40, 34, 14, 44, 35, 25, 30, 34, 29, 32]
parallel sum shoud equal to serial sum
sum(ar.get()) == sum(dview.gather('rand_n', block=True))
True