This notebook demonstrates the latest version of the unyt_dask_array
implementation at https://github.com/chrishavlin/unyt/tree/dask_unyt
This implementation adds dask as an optional dependency to unyt
, and subclasses dask.array.core.Array
to create a unyt array with dask abilities.
The main access point is through the unyt_from_dask
function, which takes a dask array and user-specified units information to create a unyt_dask_array
object:
from unyt.dask_array import unyt_from_dask
from dask import array as dask_array
x = unyt_from_dask(dask_array.random.random((10000,10000), chunks=(1000,1000)), 'm')
x
|
This array behaves like a dask array, so that when operations are applied, we initially only build the dask execution graph:
result = (x * 2).mean()
result
|
and when we execute that graph, we get back a base unyt_quantity
or unyt_array
depending on the number of elements reutrned:
result.compute()
unyt_quantity(1.00004815, 'm')
result = (x * 2).mean(1)
result.compute()
unyt_array([0.99678626, 0.99558319, 1.00601329, ..., 1.00921666, 0.99226075, 1.00205869], 'm')
adding or subtracting follows the unyt behavior, in that we need to add/subtract objects that have units. If adding a constant, it must be a unyt_quantity
:
# this will error
result = x + 2
--------------------------------------------------------------------------- UnitOperationError Traceback (most recent call last) <ipython-input-6-d2b95cf4ae1c> in <module> 1 # this will error ----> 2 result = x + 2 ~/src/yt_general/unyt/unyt/dask_array.py in wrapper(*args, **kwargs) 190 funcname = the_func.__name__ 191 ufunc = getattr(ua.unyt_quantity, funcname) --> 192 newargs, unyt_result = _prep_ufunc(ufunc, *args, extract_dask=True, **kwargs) 193 194 dasksuperfunk = getattr(Array, funcname) ~/src/yt_general/unyt/unyt/dask_array.py in _prep_ufunc(ufunc, extract_dask, *input, **kwargs) 166 # apply the operation to the hidden unyt_quantities 167 input, unyt_inputs = _sanitize_unit_args(*input) --> 168 unyt_result = ufunc(*unyt_inputs, **kwargs) 169 170 if extract_dask: ~/src/yt_general/unyt/unyt/array.py in __array_ufunc__(self, ufunc, method, *inputs, **kwargs) 1769 raise UnitOperationError(ufunc, u0, u1) 1770 else: -> 1771 raise UnitOperationError(ufunc, u0, u1) 1772 conv, offset = u1.get_conversion_factor(u0, inp1.dtype) 1773 new_dtype = np.dtype("f" + str(inp1.dtype.itemsize)) UnitOperationError: The <ufunc 'add'> operator for unyt_arrays with units "m" (dimensions "(length)") and "dimensionless" (dimensions "1") is not well defined.
from unyt import unyt_quantity
result = x + unyt_quantity(10, 'm')
result
|
result.mean().compute()
unyt_quantity(10.50002407, 'm')
The unyt_dask_array
class will convert units of the same dimension before calculation, following normal unyt behavior:
result = x + unyt_quantity(1000, 'cm')
result.mean().compute()
unyt_quantity(10.50002407, 'm')
result = x.to('km') + unyt_quantity(1000, 'cm')
result.mean().compute().to('m')
unyt_quantity(10.50002407, 'm')
Or, in the case of multiple unyt_dask_arrays
:
x1 = unyt_from_dask(dask_array.random.random((10000,10000), chunks=(1000,1000)), 'm')
x2 = unyt_from_dask(dask_array.random.random((10000,10000), chunks=(1000,1000)), 'cm')
x3 = unyt_from_dask(dask_array.random.random((10000,10000), chunks=(1000,1000)), 'km')
x4 = (x1 * x2 + x3 * x2) / x1
x4
|
x4.mean().compute()
unyt_quantity(5.565572e-05, 'km')
If a dask client is active, then execution is managed by the client:
from dask.distributed import Client
client = Client(threads_per_worker=4, n_workers=1)
client
Client
|
Cluster
|
x_da = unyt_from_dask(dask_array.random.random((10000, 10000), chunks=(1000, 1000)), 'm')
x_da.min()
|
x_da.to('km')
|
x_da.to('km').max().compute()
unyt_quantity(0.001, 'km')