Parallelization in ipyrad using ipyparallel

One of the real strenghts of ipyrad is the advanced parallelization methods that it uses to distribute work across arbitrarily large computing clusters, and to be able to do so when working interactively and remotely. This is done through use of the ipyparallel package, which is tightly linked to ipython and jupyter. When you run the command-line ipyrad program all of the work of ipyparallel is hidden under the hood, which we've done to make the program very user-friendly. However, when using the ipyrad API, we've taken the alternative approach of instructing users to become intimate with the ipyparallel library to better understand how work is being distributed on their system. This has the benefit of allowing more flexible parallelization setups, and also makes it easier for users to take advantage of ipyparallel for parallelizing downstream analyses, which we have many examples of in the analysis-tools section of the ipyrad documentation.

Required software

All software required for this tutorial is installed during the ipyrad conda installation.

In [1]:
## conda install ipyrad -c ipyrad

Starting an ipcluster instance

The tricky aspect of using ipyparallel inside of Python (i.e., in a jupyter-notebook) is that you need to first start a cluster instance by running a command-line program called ipcluster (alternatively, you can also install an extension that makes it possible to start ipcluster from a tab in jupyter notebooks, but I feel the command line tool is simpler). This command will start separate python "kernels" (instances) running on the cluster/computer and ensure that they can all talk to each other. Using advanced options you can even connect kernels across multiple computers or nodes on a HPC cluster, which we'll demonstrate.

In [2]:
## ipcluster start --n=4

Start a jupyter-notebook

If you are working on your laptop of a workstation then I typically open up two terminals, one to start a notebook and one to start an ipcluster instance.

In [18]:
## jupyter-notebook

Open a notebook

Running jupyter-notebook will launch a server that will open a dashboard view in your browser, usually at the address is at localhost:8888. From the dashboard go to the menu and select new/notebook/Python to open a new notebook.

Connect to ipcluster in your notebook

Now from inside a notebook you can connect to the cluster using the ipyparallel library. Below we will connect to the client by providing no additional arguments, which is sufficient in this case sine we are using a very basic ipcluster setup.

In [3]:
import ipyparallel as ipp
print ipp.__version__
6.0.2
In [4]:
## connect to ipcluster using default arguments
ipyclient = ipp.Client()

## count how many engines are connected
print len(ipyclient), 'cores'
4 cores

Profiles in ipcluster and how ipyrad uses them

Below we show an example of a common error caused when the Client cannot find the ipcluster instance, in this case because it has a differnt profile name. When you start an ipcluster instance it keeps track of itself by using a specific name (its profile). The default profile is an empty string ("") and so this is the default profile that the ipp.Client() command will look for (and similarly the default profile that ipyrad will look for). If you change the name of the profile then you have to indicate this, like below.

In [22]:
## example connecting to a named profile
mpi = ipp.Client(profile="MPI")
Waiting for connection file: ~/.ipython/profile_MPI/security/ipcontroller-client.json
---------------------------------------------------------------------------
IOError                                   Traceback (most recent call last)
<ipython-input-22-89ba45db61df> in <module>()
      1 ## example connecting to a named profile
----> 2 mpi = ipp.Client(profile="MPI")

/home/deren/miniconda2/lib/python2.7/site-packages/ipyparallel/client/client.pyc in __init__(self, url_file, profile, profile_dir, ipython_dir, context, debug, sshserver, sshkey, password, paramiko, timeout, cluster_id, **extra_args)
    395                         no_file_msg,
    396                     ])
--> 397                     raise IOError(msg)
    398         if url_file is None:
    399             raise IOError(no_file_msg)

IOError: Connection file '~/.ipython/profile_MPI/security/ipcontroller-client.json' not found.
You have attempted to connect to an IPython Cluster but no Controller could be found.
Please double-check your configuration and ensure that a cluster is running.

Start a second ipcluster instance with a specific profile

By using separate profiles you can have multiple ipcluster instances running at the same time (and possibly using different options or connected to different nodes of an HPC cluster) and you ensure that you connect to each one dictinctly. Start a new instance and then connect to it like below. Here we give it a profile name and we also tell it to initiate the engines using the MPI initiator by using the --engines flag.

In [5]:
## ipcluster start --n=4 --engines=MPI --profile=MPI
In [6]:
## now you should be able to connect to the MPI profile
mpi = ipp.Client(profile="MPI")

## print mpi info
print len(mpi), 'cores'
4 cores

Using an client() object to distribute jobs

For full details of how this works you can read the ipyparallel documentation. Here I will focus on the tools in ipyrad that we have developed to utilize Client objects to distribute work. In general, all you have to do is provide the ipyclient object to the .run() function and ipyrad will take care of the rest.

In [7]:
## the ipyclient object is simply a view to the engines
ipyclient
Out[7]:
<ipyparallel.client.client.Client at 0x7fec0c31f910>
In [8]:
import ipyrad as ip
import ipyrad.analysis as ipa

Example of using a Client in an ipyrad assembly

Here we create an Assembly and when we call the .run() command we provide a specific ipyclient object as the target to distribute work on. If you do not provide this option then by default ipyrad will look for an ipcluster instance running on the default profile ("").

In [9]:
## run steps of an ipyrad assembly on a specific ipcluster instance
data = ip.Assembly("example")
data.set_params("sorted_fastq_path", "example_empirical_rad/*.fastq.gz")
data.run("1", ipyclient=mpi)
New Assembly: example
Assembly: example
[####################] 100%  loading reads         | 0:00:12 | s1 | 

Example of using a Client in an analysis (tetrad)

In [10]:
## run ipyrad analysis tools on a specific ipcluster instance
tet = ipa.tetrad(
    name="test",
    data="./analysis-ipyrad/pedic-full_outfiles/pedic-full.snps.phy",
    mapfile="./analysis-ipyrad/pedic-full_outfiles/pedic-full.snps.map",
    nboots=20);

tet.run(ipyclient=mpi)
loading seq array [13 taxa x 14159 bp]
max unlinked SNPs per quartet (nloci): 2777
inferring 715 quartet tree sets
host compute node: [4 cores] on oud
[####################] 100% generating q-sets | 0:00:02 |  
[####################] 100% initial tree      | 0:00:06 |  
[####################] 100% bootstrap trees   | 0:00:07 |  
[####################] 100% calculating stats | 0:00:00 |  

Parallelizing any arbitrary function with ipyparallel

A very simple example...

In [18]:
## get load-balanced view of our ipyclient
lbview = ipyclient.load_balanced_view()

## a dict to store results
res = {}

## define your func
def my_sum_func(x, y):
    return x, y, sum([x, y])

## submit jobs to cluster with arguments
import random
for job in range(10):
    x = random.randint(0, 10)
    y = random.randint(0, 10)
    
    ## submitting a job returns an async object
    async = lbview.apply(my_sum_func, x, y)
    
    ## store results
    res[job] = async
    
## block until all jobs finish
ipyclient.wait()
Out[18]:
True
In [19]:
## the results objects
res
Out[19]:
{0: <AsyncResult: my_sum_func:finished>,
 1: <AsyncResult: my_sum_func:finished>,
 2: <AsyncResult: my_sum_func:finished>,
 3: <AsyncResult: my_sum_func:finished>,
 4: <AsyncResult: my_sum_func:finished>,
 5: <AsyncResult: my_sum_func:finished>,
 6: <AsyncResult: my_sum_func:finished>,
 7: <AsyncResult: my_sum_func:finished>,
 8: <AsyncResult: my_sum_func:finished>,
 9: <AsyncResult: my_sum_func:finished>}
In [20]:
for ridx in range(10):
    print "job: {}, result: {}".format(ridx, res[ridx].get())
job: 0, result: (10, 3, 13)
job: 1, result: (6, 7, 13)
job: 2, result: (6, 10, 16)
job: 3, result: (6, 9, 15)
job: 4, result: (6, 9, 15)
job: 5, result: (3, 4, 7)
job: 6, result: (0, 2, 2)
job: 7, result: (1, 7, 8)
job: 8, result: (0, 8, 8)
job: 9, result: (3, 0, 3)

Starting an ipcluster instance on a HPC cluster

See the multi-node setup in the HPC tunneling tutorial for instructions. http://ipyrad.readthedocs.io/HPC_Tunnel.html

In [ ]:
## an example command to connect to 80 cores across multiple nodes using MPI
## ipcluster start --n=80 --engines=MPI --ip='*' --profile='MPI80'