#!/usr/bin/env python # coding: utf-8 # DistArray: Distributed Arrays for Python # ======================================== # # [docs.enthought.com/distarray](http://docs.enthought.com/distarray) # Setup # ----- # Much of this notebook requires an `IPython.parallel` cluster to be running. # Outside the notebook, run # ``` # dacluster start -n4 # ``` # In[1]: # some utility imports from __future__ import print_function from pprint import pprint from matplotlib import pyplot as plt # main imports import numpy import distarray # reduce precision on printed array values numpy.set_printoptions(precision=2) # display figures inline get_ipython().run_line_magic('matplotlib', 'inline') # ### Software Versions # In[2]: print("numpy", numpy.__version__) import matplotlib print("matplotlib", matplotlib.__version__) import h5py print("h5py", h5py.__version__) print("distarray", distarray.__version__) # ### Set a RandomState # # Set a `RandomState` so random numpy arrays don't change between runs. # In[3]: from numpy.random import RandomState prng = RandomState(1234567890) # NumPy Arrays # ------------ # # DistArray is built on NumPy and provides a NumPy-array-like interface. First, let's generate a NumPy array and examine some of its attributes. # In[4]: # a 4-row 5-column NumPy array with random contents nparr = prng.rand(4, 5) nparr # In[5]: # NumPy array attributes print("type:", type(nparr)) print("dtype:", nparr.dtype) print("ndim:", nparr.ndim) print("shape:", nparr.shape) print("itemsize:", nparr.itemsize) print("nbytes:", nparr.nbytes) # DistArrays # ---------- # # We'll make our first `DistArray` out of the NumPy array created above. # In[6]: # First we need a `Context` object. More on this later. # For now, think of this object like the `NumPy` module. # `Context`s manage the worker engines for us. from distarray.globalapi import Context context = Context() # In[7]: # Make a DistArray from a NumPy array. # This will push sections of the original NumPy array out # to the engines. darr = context.fromarray(nparr) darr # In[8]: # Print the array section stored on each engine for i, a in enumerate(darr.get_localarrays()): print(i, a) # In[9]: # DistArrays have similar attributes to NumPy arrays, print("type:", type(darr)) print("dtype:", darr.dtype) print("ndim:", darr.ndim) print("shape:", darr.shape) print("itemsize:", darr.itemsize) print("nbytes:", darr.nbytes) # In[10]: # and some additional attributes. print("targets:", darr.targets) print("context:", darr.context) print("distribution:", darr.distribution) # Universal Functions (ufuncs) # ---------------------------- # In[11]: # NumPy provides `ufuncs`, or Universal Functions, that operate # elementwise over NumPy arrays. numpy.sin(nparr) # In[12]: # DistArray provides ufuncs as well, for `DistArray`s. import distarray.globalapi as da da.sin(darr) # In[13]: # `toarray` makes a NumPy array out of a DistArray, pulling all of the # pieces back to the client. We do this to display the contents of the # DistArray. da.sin(darr).toarray() # In[14]: # A NumPy binary ufunc. nparr + nparr # In[15]: # The equivalent DistArray ufunc. # Notice that a new DistArray is created without # pulling data back to the client. darr + darr # In[16]: # Contents of the resulting DistArray. (darr + darr).toarray() # Reductions # ---------- # # Functions like `sum`, `mean`, `min`, and `max` are known as *reductions*, since they take an array and produce a smaller array or a scalar. In NumPy and DistArray, some of these functions can be applied over a specific ``axis``. # In[17]: # NumPy sum print("sum:", nparr.sum()) print("sum over an axis:", nparr.sum(axis=1)) # In[18]: # DistArray sum print("sum:", darr.sum(), darr.sum().toarray()) print("sum over an axis:", darr.sum(axis=1), darr.sum(axis=1).toarray()) # Indexing and Slicing # -------------------- # # DistArrays support standard NumPy Indexing and distributed slicing, including slices with a step. Slicing is currently only supported for Block (and undistributed) DistArrays. # In[19]: # Our example array, as a reminder: darr.toarray() # In[20]: # The shapes of the local sections of our DistArray darr.localshapes() # In[21]: # Return the value of a single element darr[0, 2] # In[22]: # Take a column slice darr_view = darr[:, 3] # all rows, third column print(darr_view) print(darr_view.toarray()) # In[23]: # Slices return a new DistArray that is a view on the # original, just like in NumPy. # Changes in the view change the original array. darr_view[3] = -0.99 print("view:") print(darr_view.toarray()) print("original:") print(darr.toarray()) # In[24]: # A more complex slice, with negative indices and a step. print(darr[:, 2::2]) print(darr[:-1, 2::2].toarray()) # In[25]: # Incomplete indexing # Grab the first row darr[0] # Distributions # ------------- # Above, when we created a DistArray out of a NumPy array, we didn't specify *how* the elements should be distributed among our engines. `Distribution`s give you control over this, if you want it. In other words, `Distribution`s control which processes own which (global) indices. # In[26]: # Let's look at the `Distribution` object that was created for us # automatically by `fromarray`. distribution = darr.distribution # In[27]: # This is a 2D distribution: its 0th dimension is Block-distributed, # and it's 1st dimension isn't distributed. pprint(distribution.maps) # In[28]: # Plot this Distribution, color-coding which process each global index # belongs to. from distarray.plotting import plot_array_distribution process_coords = [(0, 0), (1, 0), (2, 0), (3, 0)] plot_array_distribution(darr, process_coords, cell_label=False, legend=True) # In[29]: # Check out which sections of this array's 0th dimension are on # each process. distribution.maps[0].bounds # The Distribution above was created for us by `fromarray`, # but DistArray lets us specify more complex distributions. # # Here, we specify that the 0th dimension has a Block distribution ('b') # and the 1st dimension has a Cyclic distribution. # # DistArray supports Block, Cyclic, Block-Cyclic, Unstructured, # and No-distribution dimensions. See the # [ScaLAPACK Documentation](http://netlib.org/scalapack/slug/node75.html) for more information about Distribution types. # In[30]: from distarray.globalapi import Distribution distribution = Distribution(context, shape=(64, 64), dist=('b', 'c')) a = context.zeros(distribution, dtype='int32') plot_array_distribution(a, process_coords, cell_label=False, legend=True) # Redistribution # -------------- # # Since `DistArray`s are distributed, the equivalent to NumPy's `reshape` (`distribute_as`) can be a more complex and costly operation. For convenience, you can supply either a `shape` or a full `Distribution` object. Only Block distributions (and No-dist) are currently redistributable. # In[31]: darr # In[32]: darr.toarray() # In[33]: # simple reshaping reshaped = darr.distribute_as((10, 2)) reshaped # In[34]: reshaped.toarray() # In[35]: # A more complex resdistribution, # changing shape, dist, and targets dist = Distribution(context, shape=(5, 4), dist=('b', 'b'), targets=(1, 3)) darr.distribute_as(dist) # Contexts # -------- # Context objects manage the setup of and communication to the worker processes for DistArray objects. They also act as the namespace to which # DistArray creation functions are attached. # In[36]: print("targets:", context.targets) print("comm:", context.comm) # In[37]: context.zeros((5, 3)) # In[38]: context.ones((20, 20)) # Parallel IO # ----------- # # DistArray has support for reading NumPy `.npy` files in parallel, for reading *and* writing `.dnpy` files in parallel (our own flat-file format), and reading and writing HDF5 files in parallel (if you have a parallel build of `h5py`). # In[39]: # load .npy files in parallel numpy.save("/tmp/outfile.npy", nparr) distribution = Distribution(context, nparr.shape) new_darr = context.load_npy("/tmp/outfile.npy", distribution) new_darr # In[40]: # save to .dnpy (a built-in flat-file format based on .npy) context.save_dnpy("/tmp/outfile", darr) # In[41]: # load from .dnpy context.load_dnpy("/tmp/outfile") # In[42]: # save DistArrays to .hdf5 files in parallel context.save_hdf5("/tmp/outfile.hdf5", darr, mode='w') # In[43]: # load DistArrays from .hdf5 files in parallel (using h5py) context.load_hdf5("/tmp/outfile.hdf5", distribution) # Context.apply # ------------- # # Global view, local control. The `apply` method on a `Context` allows you to write functions that are applied *locally* (that is, on the engines) to each section of a DistArray. This allows you to push your computation close to your data, avoiding communication round-trips and possibly speeding up your computations. # In[44]: def get_local_random(): """Function to be applied locally.""" import numpy return numpy.random.randint(10) context.apply(get_local_random) # In[45]: def get_local_var(darr): """Another local computation.""" return darr.ndarray.var() context.apply(get_local_var, args=(darr.key,)) # Context.register # ---------------- # # `Context.register` is similar to `Context.apply`, but it allows you to *register* your function with a `Context` up front, and then call it repeatedly, with a nice syntax. # In[46]: def local_demean(la): """Return the local array with the mean removed.""" return la.ndarray - la.ndarray.mean() context.register(local_demean) # In[47]: context.local_demean(darr) # MPI-only Execution # ------------------------- # Instead of using an IPython client (which uses ZeroMQ to communicate to the engines), you can run your DistArray code in MPI-only mode (using an extra MPI process for the client). This can be more performant. # In[48]: # an example script to run in MPI-only mode get_ipython().run_line_magic('cd', 'julia_set') get_ipython().system('python benchmark_julia.py -h') # In[49]: # Compile kernel.pyx get_ipython().system('python setup.py build_ext --inplace') # Run the benchmarking script with 5 MPI processes: # 4 worker processes and 1 client process get_ipython().system('mpiexec -np 5 python benchmark_julia.py --kernel=cython -r1 1024') # Distributed Array Protocol # -------------------------- # # Already have a library with its own distributed arrays? Use the Distributed Array Protocol to work with DistArray. # # The Distributed Array Protocol (DAP) is a process-local protocol that allows two subscribers, called the "producer" and the "consumer" or the "exporter" and the "importer", to communicate the essential data and metadata necessary to share a distributed-memory array between them. This allows two independently developed components to access, modify, and update a distributed array without copying. The protocol formalizes the metadata and buffers involved in the transfer, allowing several distributed array projects to collaborate, facilitating interoperability. By not copying the underlying array data, the protocol allows for efficient sharing of array data. # # http://distributed-array-protocol.readthedocs.org/en/rel-0.9.0/ # In[50]: def return_protocol_structure(la): return la.__distarray__() context.apply(return_protocol_structure, (darr.key,)) # Acknowledgement and Disclaimer # ------------------------------ # # This material is based upon work supported by the Department of Energy under Award Number DE-SC0007699. # # This report was prepared as an account of work sponsored by an agency of the United States Government. Neither the United States Government nor any agency thereof, nor any of their employees, makes any warranty, express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness of any information, apparatus, product, or process disclosed, or represents that its use would not infringe privately owned rights. Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or otherwise does not necessarily constitute or imply its endorsement, recommendation, or favoring by the United States Government or any agency thereof. The views and opinions of authors expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof.