IPython Parallel 7 adds a Cluster
API for starting/stopping clusters.
This is the new implementation of ipcluster
,
which can be more easily re-used in Python programs.
The ipcluster
script is
Controllers and Engines are started with "Launchers", which are objects representing a running process.
Each Cluster has:
The combination of profile_dir
and cluster_id
uniquely identifies a cluster.
You can have many clusters in one profile, but each must have a distinct cluster id.
To create a cluster, instantiate a Cluster object:
from ipyparallel import Cluster
cluster = Cluster()
cluster
<Cluster(cluster_id='touchy-1623263478-xhlt', profile_dir='~/.ipython/profile_default')>
To start the cluster:
await cluster.start_controller()
cluster
<Cluster(cluster_id='touchy-1623263478-xhlt', profile_dir='~/.ipython/profile_default', controller=<running>)>
engine_set_id = await cluster.start_engines(n=4)
cluster
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
<Cluster(cluster_id='touchy-1623263478-xhlt', profile_dir='~/.ipython/profile_default', controller=<running>, engine_sets=['1623263481-w75s'])>
As you can see, all methods on the Cluster object are async by default.
Every async method also has a _sync
variant, if you don't want to / can't use asyncio.
engine_set_2 = cluster.start_engines_sync(n=2)
engine_set_2
Starting 2 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
'1623263483-iafz'
At this point, we have a cluster with a controller and six engines in two groups.
There is also a start_cluster
method that starts the controller and one engine set, for convenience:
engine_set_id = await cluster.start_cluster(n=4)
We can get a client object connected to the cluster with connect_client()
rc = cluster.connect_client()
rc.wait_for_engines(6)
rc.ids
[0, 1, 2, 3, 4, 5]
And we can use our classic apply_async(...).get_dict()
pattern to get a dict by engine id of hostname, pid for each engine:
def identify():
import os
import socket
return {"host": socket.gethostname(), "pid": os.getpid()}
rc[:].apply_async(identify).get_dict()
{0: {'host': 'touchy', 'pid': 81944}, 1: {'host': 'touchy', 'pid': 81945}, 2: {'host': 'touchy', 'pid': 81946}, 3: {'host': 'touchy', 'pid': 81947}, 4: {'host': 'touchy', 'pid': 81952}, 5: {'host': 'touchy', 'pid': 81953}}
We can send signals to engine sets by id
(sending signals to just one engine is still a work in progress)
import signal
import time
ar = rc[:].apply_async(time.sleep, 100)
# oops! I meant 1!
await cluster.signal_engines(signal.SIGINT)
ar.get()
Sending signal 2 to engine(s) 1623263481-w75s Sending signal 2 to engine(s) 1623263483-iafz
[0:apply]: ---------------------------------------------------------------------------KeyboardInterrupt Traceback (most recent call last)<string> in <module> KeyboardInterrupt: [1:apply]: ---------------------------------------------------------------------------KeyboardInterrupt Traceback (most recent call last)<string> in <module> KeyboardInterrupt: [2:apply]: ---------------------------------------------------------------------------KeyboardInterrupt Traceback (most recent call last)<string> in <module> KeyboardInterrupt: [3:apply]: ---------------------------------------------------------------------------KeyboardInterrupt Traceback (most recent call last)<string> in <module> KeyboardInterrupt: ... 2 more exceptions ...
Now it's time to cleanup. Every start_
method has a correspinding stop_method
.
We can stop one engine set at a time with stop_engines
:
await cluster.stop_engines(engine_set_2)
Stopping engine(s): 1623263483-iafz
Or stop the whole cluster
await cluster.stop_cluster()
Stopping engine(s): 1623263481-w75s Stopping controller Controller stopped: {'exit_code': 0, 'pid': 81906}
Cluster can also be used as a Context manager, in which case:
as
returns a connected clientThis makes it a lot easier to scope an IPython cluster for the duration of a computation and ensure that it is cleaned up when you are done.
import os
with Cluster(n=4) as rc:
engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'> Waiting for connection file: ~/.ipython/profile_default/security/ipcontroller-touchy-1623263508-mdel-client.json Stopping engine(s): 1623263508-5i4g Stopping controller
{0: 82284, 1: 82282, 2: 82283, 3: 82285}
Controller stopped: {'exit_code': 0, 'pid': 82281}
It can also be async
async with Cluster(n=2) as rc:
engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids
Starting 2 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'> Waiting for connection file: ~/.ipython/profile_default/security/ipcontroller-touchy-1623263514-nqk6-client.json Stopping engine(s): 1623263514-b9f9 Stopping controller
{0: 82407, 1: 82408}
Controller stopped: {'exit_code': 0, 'pid': 82406}
IPython's mechanism for launching controllers and engines is called Launchers
.
These are in ipyparallel.cluster.launcher
.
There are two kinds of Launcher:
n
enginesYou can use abbreviations to access the launchers that ship with IPython parallel, such as 'MPI', 'Local', or 'SGE', or you can pass classes themselves (or their import strings, such as 'mymodule.MyEngineSetLauncher').
I'm going to start a cluster with engines using MPI:
cluster = Cluster(n=4, engine_launcher_class='MPI')
await cluster.start_cluster()
rc = cluster.connect_client()
Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
rc.wait_for_engines(4)
rc.ids
[0, 1, 2, 3]
Now I'm going to run a test with another new feature
def uhoh():
import time
from mpi4py import MPI
rank = MPI.COMM_WORLD.rank
if rank == 0:
print("rank 0: oh no.")
1 / 0
print(f"rank {rank}: barrier")
MPI.COMM_WORLD.barrier()
ar = rc[:].apply_async(uhoh)
ar.get(timeout=2)
--------------------------------------------------------------------------- TimeoutError Traceback (most recent call last) /var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_81840/824703262.py in <module> 12 13 ar = rc[:].apply_async(uhoh) ---> 14 ar.get(timeout=2) ~/dev/ip/parallel/ipyparallel/client/asyncresult.py in get(self, timeout) 227 raise self.exception() 228 else: --> 229 raise error.TimeoutError("Result not ready.") 230 231 def _check_ready(self): TimeoutError: Result not ready.
Uh oh! We are stuck in barrier because engine 0 failed.
Let's try interrupting and getting the errors:
import signal
await cluster.signal_engines(signal.SIGINT)
ar.get(timeout=2)
Sending signal 2 to engine(s) 1623263525-cijg
--------------------------------------------------------------------------- TimeoutError Traceback (most recent call last) /var/folders/qr/3vxfnp1x2t1fw55dr288mphc0000gn/T/ipykernel_81840/3902026823.py in <module> 1 import signal 2 await cluster.signal_engines(signal.SIGINT) ----> 3 ar.get(timeout=2) ~/dev/ip/parallel/ipyparallel/client/asyncresult.py in get(self, timeout) 227 raise self.exception() 228 else: --> 229 raise error.TimeoutError("Result not ready.") 230 231 def _check_ready(self): TimeoutError: Result not ready.
It didn't work! This is because MPI.barrier isn't actually interruptible 😢.
We are going to have to resort to more drastic measures, and restart the engines:
await cluster.restart_engines()
Stopping engine(s): 1623263525-cijg Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'> engine set stopped 1623263525-cijg: {'exit_code': -9, 'pid': 82790}
rc.wait_for_engines(4)
rc.ids
[4, 5, 6, 7]
We are now back to having 4 responsive engines. Their IPP engine id may have changed, but I can get back to using them.
def get_rank():
from mpi4py import MPI
return MPI.COMM_WORLD.rank
rank_map = rc[:].apply_async(get_rank).get_dict()
rank_map
{4: 0, 5: 3, 6: 1, 7: 2}
Finally, clean everything up
await cluster.stop_cluster()
Stopping engine(s): 1623263525-cijg Stopping controller Controller stopped: {'exit_code': 0, 'pid': 82767} engine set stopped 1623263525-cijg: {'exit_code': 1, 'pid': 82998}