Passing results as input in IPython.parallel

Passing results of one task as the input of another, without waiting for the first to finish.

This is not a use case that is particularly easy with IPython.parallel, but there are a few ways it can be done.

The simplest way to do this is to persist the data on the engine, and run subsequent computation on the same engine where the data was loaded.

As usual, start by creating a client and view

In [1]:
from IPython import parallel
rc = parallel.Client()
view = rc.load_balanced_view()

Now define our functions. We have three:

  1. load data ino the engine's namespace (a random array)
  2. do something with that data (get the 2-norm)
  3. remove data from the engine's namespace
In [2]:
%px import numpy as np

def load_data(sz, name='data'):
    """A function that loads data into a particular name in the global ns.
    In this case, a random array.
    data = np.random.random((sz,sz))
    globals()[name] = data

def get_norm(a):
    """A simple analysis function, gets the 2-norm of an array"""
    return np.linalg.norm(a, 2)

def cleanup_name(name):
    """Delete a name from the engine's namespace"""
    del globals()[name]

Now we submit the work. The relevant bits:

  1. load-balance the data-loading tasks without restriction.
  2. use temp_flags(follow=load_task) to ensure that analysis happens on the same engine as the load.
  3. use parallel.Reference to get engine-local data as an argument to the analysis function, without moving any data around.
  4. cleanup the namspace after the analysis is done.

We submit all of this work before we know where the data-loading will take place.

In [3]:
loads = {}
norms = {}
for sz in [ 2 ** p for p in range(10) ]:
    # create unique names, to avoid collision
    name = 'data_%i' % sz
    # load the data into the global namespace
    ar = loads[sz] = view.apply_async(load_data, sz, name=name)
    # submit the analysis to the same engine
    with view.temp_flags(follow=ar):
        # use a Reference to get the engine-local data
        # as an argument to the function
        norms[sz] = ar2 = view.apply_async(get_norm, parallel.Reference(name))
    # submit a final task to cleanup anything we put in the global namespace
    with view.temp_flags(follow=ar2):
        view.apply_async(cleanup_name, name).get()

Now we can display the results of our work:

In [4]:
for sz in sorted(norms):
    ar = norms[sz]
    norm = ar.get()
    print("{0:>4}x{0:<4} 2-norm = {1:7.3f}".format(sz, norm))
   1x1    2-norm =   0.818
   2x2    2-norm =   1.616
   4x4    2-norm =   1.663
   8x8    2-norm =   4.053
  16x16   2-norm =   8.550
  32x32   2-norm =  15.897
  64x64   2-norm =  32.467
 128x128  2-norm =  63.872
 256x256  2-norm = 128.242
 512x512  2-norm = 255.966

And finally, verify that we don't have any data_foo names lingering on any of the engines.

In [5]:
%px %who
[stdout:0] Interactive namespace is empty.
[stdout:1] Interactive namespace is empty.
[stdout:2] Interactive namespace is empty.
[stdout:3] Interactive namespace is empty.
[stdout:4] Interactive namespace is empty.
[stdout:5] Interactive namespace is empty.
[stdout:6] Interactive namespace is empty.
[stdout:7] Interactive namespace is empty.