from IPython.parallel import Client from IPython.core.display import clear_output from IPython.parallel.error import TimeoutError, CompositeError from IPython.parallel.client.view import View import openturns as ot from datetime import timedelta import sys from IPython.display import VimeoVideo VimeoVideo("63250251") class OpenTURNSIPythonParallelFunction(ot.OpenTURNSPythonFunction): """Distributed Python function using IPython.parallel. Parameters ---------- n : int The input dimension. p : int The output dimension. func : callable A Python function implementing the calculation for a single vector (aka NumericalPoint) input X. view : IPython.parallel.DirectView or IPython.parallel.LoadBalancedView A view of your IPython engines (either direct or load-balanced). verbose : boolean, optional Give information on progress of jobs or don't. on_error : string, optional What should be done in case an engine bumps into an error: - 'abort' prints the first error an engine has bumped into and aborts the whole stack; - 'ignore' excepts any error the engines could bump into and fills the outputs with NaN's. interval : float, optional The time interval in-between two checks of jobs status. This basically depends on the job duration. """ def __init__(self, n, p, func, view, verbose=True, on_error='abort', interval=1.): assert callable(func) assert isinstance(view, View) assert on_error in ['abort', 'ignore'] super(OpenTURNSIPythonParallelFunction, self).__init__(n, p) self._func = func self._view = view self._verbose = bool(verbose) self._on_error = on_error self._interval = float(interval) def _exec(self, X): return self._exec_sample([X])[0] def _exec_sample(self, X): jobs = self._view.map_async(self._func, X) Y, done = [None] * len(jobs), [] while not jobs.ready(): jobs.wait(self._interval) if self._verbose: clear_output(wait=True) print("%4i/%i tasks finished after %s." % (jobs.progress, len(jobs), timedelta(0., jobs.elapsed))) sys.stdout.flush() for i in range(len(jobs)): if i in done: continue try: Y[i] = jobs[i] done.append(i) except TimeoutError: break except CompositeError as err: Y.append([np.nan] * self.getOutputDimension()) done.append(i) if self._on_error == 'abort': try: jobs.abort() except AssertionError: pass if float(ot.__version__) < 1.3: err.print_exception() raise err else: pass if self._verbose: print('\ndone') return Y def IPythonParallelFunction(*args, **kwargs): __doc__ = OpenTURNSIPythonParallelFunction.__doc__ return ot.NumericalMathFunction(OpenTURNSIPythonParallelFunction(*args, **kwargs)) client = Client(profile='default') d_view = client.direct_view() lb_view = client.load_balanced_view() with d_view.sync_imports(): from math import sin from time import sleep def foo_implementation(X): """This is the function we want to distribute over our clients.""" Y = [sin(X[1]) / X[0]] sleep(2.) # Say it takes time to run... return Y foo = IPythonParallelFunction(n=2, p=1, func=foo_implementation, view=lb_view) distribution = ot.ComposedDistribution([ot.Uniform(1., 5.)] * 2) some_random_X = distribution.getSample(10) foo(some_random_X) foo([0., 1.]) %%px --target=2 %qtconsole from IPython.display import Image Image('OpenTURNSIPythonParallelFunction_local_qtconsole.png') from IPython.parallel import bind_kernel bind_kernel() %%px --target=2 import socket print('The targeted engine is running on host: %s\n' % socket.gethostname()) %connect_info from IPython.display import Image Image('OpenTURNSIPythonParallelFunction_remote_console.png')