Dask task scheduling exploration for https://github.com/dask/distributed/issues/2602.
import dask.array as da
from distributed import Client
def my_custom_function(f):
# a pretend custom function that would do a bunch of stuff along
# axis 0 and 2 and then reduce the data heavily
return f.ravel()[::15][None, :]
shape = (80000, 100, 500)
nc0 = 4
nc1 = 2
chunks0 = (shape[0] // nc0, shape[1], shape[2])
chunks1 = (shape[0], shape[1] // nc1, shape[2])
data = da.random.random(shape, chunks=chunks0)
# now rechunk the data to permit me to do some computations along different axes
# this aggregates chunks along axis 0 and dis-aggregates along axis 1
data_rc = data.rechunk(chunks1)
# apply that function to each chunk
res = data_rc.map_blocks(my_custom_function, dtype=data.dtype,
drop_axis=[1, 2], new_axis=[1], chunks=(1, 15))
Here are the components of the computation.
data
|
data_rc
|
res
|
res.visualize(rankdir="LR", color="order")
So to do anything with output block (0, 0, 0)
(the 4th set of squares), we'll need information from every input block. This is a generally hard problem.
The rechunking really hurts. One possible solution is to not rechunk the first axis while rechunking the second.
chunks2 = shape[0] // nc0, shape[1] // nc1, shape[2] # only rechunk axis 1
data_rc2 = data.rechunk(chunks2)
# apply that function to each chunk
res2 = data_rc2.map_blocks(my_custom_function, dtype=data.dtype,
drop_axis=[1, 2], new_axis=[1], chunks=(1, 15))
data_rc2
|
res2.visualize(rankdir='LR', color='order')
This does increase the number of chunks (by the original number of chunks along axis 0?), which will put us up against other limitations of the scheduler. But it avoids the requirement that accessing any output data.