Basic Usage

Recall there are three components of IPython parallel:

  • Client (what notebook server connects to)
  • Controller
  • Engine (IPython shells)
In [1]:
from IPython.parallel import Client
rc = Client()
rc.ids
Out[1]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

Here we use all the engines. A DirectView is returned.

In [2]:
dview = rc[:]

Then define the is_prime function as usual.

In [3]:
with open('../builtin-cpuheavy/prime_list.txt') as f:
    PRIMES = [int(l) for l in f]

def is_prime(n):
    # import until the function is called
    # make sure all engines import math
    # (not a good use pattern though, more on this later)
    import math  
    
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

Run in parallel

Use map_async or map_sync to map function to run in parallel

In [4]:
ar = dview.map_async(is_prime, PRIMES[:8])

wait_interactive() blocks the notebook server, which provides the current task status.

Note that if one iterrupts tasks here, it only interrupts the notebook itself, the IPython cluster is still running.

In [5]:
ar.wait_interactive()
   8/8 tasks finished after    4 s
done
In [6]:
ar.get()
Out[6]:
[True, True, True, True, True, False, True, True]
In [7]:
speedup = ar.serial_time / ar.wall_time
speedup
Out[7]:
3.0117662156527576

the metadata for each task's execution can be asssed by ar.metadata

In [8]:
ar.metadata[:1]
Out[8]:
[{'after': [],
  'completed': datetime.datetime(2015, 4, 27, 1, 32, 42, 693804),
  'data': {},
  'engine_id': 0,
  'engine_uuid': '5880a92f-21cb-4b49-8221-e0b9ae659b02',
  'error': None,
  'execute_input': None,
  'execute_result': None,
  'follow': [],
  'msg_id': '275ccb27-345e-4a6e-a9d7-c68e3a705ca7',
  'outputs': [],
  'outputs_ready': True,
  'received': datetime.datetime(2015, 4, 27, 1, 32, 42, 695203),
  'started': datetime.datetime(2015, 4, 27, 1, 32, 41, 695235),
  'status': 'ok',
  'stderr': '',
  'stdout': '',
  'submitted': datetime.datetime(2015, 4, 27, 1, 32, 41, 693984)}]

Import modules remotely

If any modules imported, engines should import them as well. So here use a dview.sync_import() context_manager to help this issue. Note that import numpy as np will not actually intepreted as np module on engines but instead remaining numpy.

In [9]:
with dview.sync_imports():
    import math
    import numpy as np  # this won't work
importing math on engine(s)
importing numpy on engine(s)
In [10]:
def find_np():
    np.random.randint(10)

rc[:2].apply_sync(find_np)
[0:apply]: 
---------------------------------------------------------------------------NameError                                 Traceback (most recent call last)<string> in <module>()
<ipython-input-10-d9e2624d366e> in find_np()
NameError: name 'np' is not defined

[1:apply]: 
---------------------------------------------------------------------------NameError                                 Traceback (most recent call last)<string> in <module>()
<ipython-input-10-d9e2624d366e> in find_np()
NameError: name 'np' is not defined

IPython Parallel Magic

In IPython shell, %%px ipython magic helps do some trivial parallel setup. The %%px cell block executes its statements on all engines.

%%px --local will executes the statments in the notebook as well.

In [11]:
%%px
import numpy as np
np.random.randint(6)
Out[0:1]: 1
Out[1:1]: 3
Out[2:1]: 5
Out[3:1]: 0
Out[4:1]: 5
Out[5:1]: 0
Out[6:1]: 0
Out[7:1]: 4
Out[8:1]: 2
Out[9:1]: 4
Out[10:1]: 5
Out[11:1]: 0

Try to run the following for multiple times, since engines use same processes (like a remote Python intepreter) the return value will stay the same.

In [12]:
%%px
import os
os.getpid()
Out[0:2]: 97098
Out[1:2]: 97099
Out[2:2]: 97100
Out[3:2]: 97102
Out[4:2]: 97104
Out[5:2]: 97106
Out[6:2]: 97108
Out[7:2]: 97110
Out[8:2]: 97112
Out[9:2]: 97114
Out[10:2]: 97116
Out[11:2]: 97118

Passing/Collecting Data

Pushing / pulling a variable to all engines

In [13]:
# push
dview['prog'] = 'val_prime'

# pull
dview['prog']
Out[13]:
['val_prime',
 'val_prime',
 'val_prime',
 'val_prime',
 'val_prime',
 'val_prime',
 'val_prime',
 'val_prime',
 'val_prime',
 'val_prime',
 'val_prime',
 'val_prime']

Splitting a variable across engines

In [19]:
# all engines get a portion of x's elements
ar = dview.scatter('x', list(range(15)))
ar.wait()
In [20]:
dview['x']
Out[20]:
[[0, 1], [2, 3], [4, 5], [6], [7], [8], [9], [10], [11], [12], [13], [14]]
In [21]:
# get x from all engines and combined
dview.gather('x', block=True)
Out[21]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

Here is another example

In [22]:
%%px 
import numpy as np
rand_n = np.random.randint(0, 10, 6)
In [23]:
dview['rand_n']
Out[23]:
[array([3, 6, 0, 2, 1, 6]),
 array([2, 2, 7, 3, 6, 8]),
 array([7, 9, 6, 3, 9, 6]),
 array([4, 8, 2, 6, 8, 6]),
 array([6, 0, 2, 1, 5, 0]),
 array([7, 9, 4, 7, 8, 9]),
 array([6, 9, 8, 4, 5, 3]),
 array([5, 2, 5, 3, 7, 3]),
 array([3, 2, 0, 8, 9, 8]),
 array([2, 5, 8, 8, 6, 5]),
 array([1, 8, 6, 6, 0, 8]),
 array([7, 8, 9, 8, 0, 0])]
In [24]:
dview.gather('rand_n', block=True)
Out[24]:
array([3, 6, 0, 2, 1, 6, 2, 2, 7, 3, 6, 8, 7, 9, 6, 3, 9, 6, 4, 8, 2, 6, 8,
       6, 6, 0, 2, 1, 5, 0, 7, 9, 4, 7, 8, 9, 6, 9, 8, 4, 5, 3, 5, 2, 5, 3,
       7, 3, 3, 2, 0, 8, 9, 8, 2, 5, 8, 8, 6, 5, 1, 8, 6, 6, 0, 8, 7, 8, 9,
       8, 0, 0])
In [25]:
# sum at each engine
def rand_sum():
    return np.sum(rand_n)

ar = dview.apply_async(rand_sum)
In [26]:
ar
Out[26]:
<AsyncResult: finished>
In [27]:
ar.get()
Out[27]:
[18, 28, 40, 34, 14, 44, 35, 25, 30, 34, 29, 32]

parallel sum shoud equal to serial sum

In [28]:
sum(ar.get()) == sum(dview.gather('rand_n', block=True))
Out[28]:
True