import xarray as xr
import dask.array as da
import numpy as np
import pandas as pd
import dask
size = (3, 24, 96, 5, 9, 10)
chunks = (1, 1, 96, 5, 9, 10)
arr = da.random.random(size, chunks=chunks)
arr
items = dict(
ensemble = np.arange(size[0]),
init_date = pd.date_range(start='1960', periods=size[1]),
lat = np.arange(size[2]).astype(float),
lead_time = np.arange(size[3]),
level = np.arange(size[4]).astype(float),
lon = np.arange(size[5]).astype(float),
)
dims, coords = zip(*list(items.items()))
array = xr.DataArray(arr, coords=coords, dims=dims)
dset = xr.Dataset({'data': array})
dset
<xarray.Dataset> Dimensions: (ensemble: 3, init_date: 24, lat: 96, lead_time: 5, level: 9, lon: 10) Coordinates: * ensemble (ensemble) int64 0 1 2 * init_date (init_date) datetime64[ns] 1960-01-01 1960-01-02 ... 1960-01-24 * lat (lat) float64 0.0 1.0 2.0 3.0 4.0 ... 91.0 92.0 93.0 94.0 95.0 * lead_time (lead_time) int64 0 1 2 3 4 * level (level) float64 0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 * lon (lon) float64 0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 Data variables: data (ensemble, init_date, lat, lead_time, level, lon) float64 dask.array<chunksize=(1, 1, 96, 5, 9, 10), meta=np.ndarray>
dset.to_zarr("/tmp/data.zarr", mode='w')
dset2 = xr.open_zarr("/tmp/data.zarr")
dsets = [dset, dset2]
fuse_ave_widths = [None, 200]
for dset in dsets:
for fuse_ave_width in fuse_ave_widths:
filename = '{}-{}'.format('zarr' if dset is dset2 else 'random',
'fused' if fuse_ave_width else 'unfused')
with dask.config.set(fuse_ave_width=fuse_ave_width):
dask.visualize(dset['data'].groupby("init_date.month").mean(dim="init_date"),
optimize_graph=True, filename=filename)
print(filename)
random-unfused random-fused zarr-unfused zarr-fused
from distributed import Client
c = Client()
c
Client
|
Cluster
|
dset['data'].groupby("init_date.month").mean(dim="init_date").compute();
with dask.config.set(fuse_ave_width=200):
dset['data'].groupby("init_date.month").mean(dim="init_date").compute()