While chunk shape should be determined by specifying target end-member use case performance (e.g. time series extraction 8x slower than map extraction), chunk size (in MiB) should be deterined by how long it takes to read all the data in a variable with a cloud-native approach. Reading all the data is also a typical use case for computing averages, anomalys, global metrics, etc.
Here we test reading two different native chunk sizes (3.8 MiB and 8.2 MiB), and then create larger virtual chunk sizes using Dask and see which is fastest on a cluster with 60 threads.
import fsspec
import xarray as xr
Here we are using Dask Gateway on the ESIP Qhub (JupyterHub with Dask/Kubernetes) running on AWS us-west-2 region. We create a cluster with 60 threads (30 workers each with 2 threads)
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','rsignell','lib'))
import ebdpy as ebd
profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 30
client,cluster,gateway = ebd.start_dask_cluster(profile=profile,worker_max=worker_max,
region=region, use_existing_cluster=True,
adaptive_scaling=False, wait_for_cluster=True,
worker_profile='Medium Worker',
propagate_env=True)
Region: us-west-2 No Cluster running. Starting new cluster. Setting Cluster Environment Variable AWS_DEFAULT_REGION us-west-2 Setting Fixed Scaling workers=30 Reconnect client to clear cache client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub): https://nebari.esipfed.org/gateway/clusters/dev.34e2aae08db54f2b8db088265022a168/status Elapsed time to wait for 30 live workers: 30/30 workers - 138 seconds Propagating environment variables to workers Using environment: users/users-pangeo
url = 's3://dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/'
While most s3 requests take 50-100 milliseconds, a few slow requests can take dozens of seconds or even minutes. So let's avoid those by setting the timeout to 5 seconds.
fs = fsspec.filesystem('s3', requester_pays=True,
config_kwargs={'connect_timeout':5, 'read_timeout':5})
fs.ls('dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/')
['dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time010_lat010_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time010_lat010_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time010_lat050_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time010_lat050_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time010_lat050_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time010_lat100_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time010_lat100_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time010_lat100_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time012_lat010_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time048_lat010_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time048_lat010_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time050_lat010_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time050_lat010_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time050_lat010_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time050_lat050_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time050_lat050_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time050_lat050_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time050_lat100_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time050_lat100_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time050_lat100_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time100_lat010_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time100_lat010_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time100_lat010_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time100_lat050_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time100_lat050_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time100_lat050_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time100_lat100_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time100_lat100_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time100_lat100_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time120_lat010_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time120_lat010_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time1440_lat010_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time1440_lat010_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time1440_lat010_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time2160_lat010_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time2160_lat010_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time2160_lat010_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time720_lat010_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time720_lat010_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/single_time_smaller_spatial_chunks_time001_lat050_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/single_time_smaller_spatial_chunks_time001_lat050_lon100', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/single_time_smaller_spatial_chunks_time001_lat100_lon050']
fs.glob('dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/*time2160*')
['dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time2160_lat010_lon010', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time2160_lat010_lon050', 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time2160_lat010_lon100']
url = 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time2160_lat010_lon050/inst.zarr'
ds = xr.open_dataset(fs.get_mapper(url), engine='zarr', consolidated=True, chunks={})
ds.BCEXTTAU
<xarray.DataArray 'BCEXTTAU' (time: 5136, lat: 721, lon: 1152)> dask.array<open_dataset-f8c33014242b9f317ee8c4eefed9885aBCEXTTAU, shape=(5136, 721, 1152), dtype=float32, chunksize=(2160, 10, 50), chunktype=numpy.ndarray> Coordinates: * lat (lat) float64 -90.0 -89.75 -89.5 -89.25 ... 89.25 89.5 89.75 90.0 * lon (lon) float64 -180.0 -179.7 -179.4 -179.1 ... 179.1 179.4 179.7 * time (time) datetime64[ns] 2020-06-01 ... 2020-12-31T23:00:00 Attributes: fmissing_value: 999999986991104.0 long_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ standard_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ units: 1 valid_range: [-999999986991104.0, 999999986991104.0] vmax: 999999986991104.0 vmin: -999999986991104.0
%%timeit
dsmean = ds.BCEXTTAU.mean(dim='time').compute()
13.4 s ± 1.47 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
url = 'dieumy-chunking-study/dieumynguyen_rechunked/geos-fp-global__inst/hybrid_time2160_lat010_lon100/inst.zarr'
ds = xr.open_dataset(fs.get_mapper(url), engine='zarr', consolidated=True, chunks={})
ds.BCEXTTAU
<xarray.DataArray 'BCEXTTAU' (time: 5136, lat: 721, lon: 1152)> dask.array<open_dataset-d33ed18357693975b9da79650bf3a72bBCEXTTAU, shape=(5136, 721, 1152), dtype=float32, chunksize=(2160, 10, 100), chunktype=numpy.ndarray> Coordinates: * lat (lat) float64 -90.0 -89.75 -89.5 -89.25 ... 89.25 89.5 89.75 90.0 * lon (lon) float64 -180.0 -179.7 -179.4 -179.1 ... 179.1 179.4 179.7 * time (time) datetime64[ns] 2020-06-01 ... 2020-12-31T23:00:00 Attributes: fmissing_value: 999999986991104.0 long_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ standard_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ units: 1 valid_range: [-999999986991104.0, 999999986991104.0] vmax: 999999986991104.0 vmin: -999999986991104.0
%%timeit
dsmean = ds.BCEXTTAU.mean(dim='time').compute()
9.52 s ± 641 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Finding: it's significantly fast to load 8.2MiB native chunks compared to 4.1MiB native chunks
ds = xr.open_dataset(fs.get_mapper(url), engine='zarr', consolidated=True,
chunks={'time':2160, 'lat':20, 'lon':100})
ds.BCEXTTAU
<xarray.DataArray 'BCEXTTAU' (time: 5136, lat: 721, lon: 1152)> dask.array<open_dataset-93a410bfc8735b21f0f8ba7e64366ab4BCEXTTAU, shape=(5136, 721, 1152), dtype=float32, chunksize=(2160, 20, 100), chunktype=numpy.ndarray> Coordinates: * lat (lat) float64 -90.0 -89.75 -89.5 -89.25 ... 89.25 89.5 89.75 90.0 * lon (lon) float64 -180.0 -179.7 -179.4 -179.1 ... 179.1 179.4 179.7 * time (time) datetime64[ns] 2020-06-01 ... 2020-12-31T23:00:00 Attributes: fmissing_value: 999999986991104.0 long_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ standard_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ units: 1 valid_range: [-999999986991104.0, 999999986991104.0] vmax: 999999986991104.0 vmin: -999999986991104.0
%%timeit
dsmean = ds.BCEXTTAU.mean(dim='time').compute()
8.99 s ± 221 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Finding: it's slightly faster to load virtual 16.4 MiB virtual chunks compared to 8.2 MiB native chunks. 16.4 MiB native chunks would likely be even faster
ds = xr.open_dataset(fs.get_mapper(url), engine='zarr', consolidated=True,
chunks={'time':2160, 'lat':40, 'lon':100})
ds.BCEXTTAU
<xarray.DataArray 'BCEXTTAU' (time: 5136, lat: 721, lon: 1152)> dask.array<open_dataset-c05fac689792930e00d1de79f3b41054BCEXTTAU, shape=(5136, 721, 1152), dtype=float32, chunksize=(2160, 40, 100), chunktype=numpy.ndarray> Coordinates: * lat (lat) float64 -90.0 -89.75 -89.5 -89.25 ... 89.25 89.5 89.75 90.0 * lon (lon) float64 -180.0 -179.7 -179.4 -179.1 ... 179.1 179.4 179.7 * time (time) datetime64[ns] 2020-06-01 ... 2020-12-31T23:00:00 Attributes: fmissing_value: 999999986991104.0 long_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ standard_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ units: 1 valid_range: [-999999986991104.0, 999999986991104.0] vmax: 999999986991104.0 vmin: -999999986991104.0
%%timeit
dsmean = ds.BCEXTTAU.mean(dim='time').compute()
10.1 s ± 2.71 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Finding: about the same time to load virtual 33 MiB virtual chunks compared to 16 MiB virtual chunks.
ds = xr.open_dataset(fs.get_mapper(url), engine='zarr', consolidated=True,
chunks={'time':2160, 'lat':100, 'lon':100})
ds.BCEXTTAU
<xarray.DataArray 'BCEXTTAU' (time: 5136, lat: 721, lon: 1152)> dask.array<open_dataset-3ce70cfd63ba2aaecac20c472de335e5BCEXTTAU, shape=(5136, 721, 1152), dtype=float32, chunksize=(2160, 100, 100), chunktype=numpy.ndarray> Coordinates: * lat (lat) float64 -90.0 -89.75 -89.5 -89.25 ... 89.25 89.5 89.75 90.0 * lon (lon) float64 -180.0 -179.7 -179.4 -179.1 ... 179.1 179.4 179.7 * time (time) datetime64[ns] 2020-06-01 ... 2020-12-31T23:00:00 Attributes: fmissing_value: 999999986991104.0 long_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ standard_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ units: 1 valid_range: [-999999986991104.0, 999999986991104.0] vmax: 999999986991104.0 vmin: -999999986991104.0
%%timeit
dsmean = ds.BCEXTTAU.mean(dim='time').compute()
9.63 s ± 685 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Finding: About the same to load virtual 82 MiB virtual chunks compared to 16.4 MiB virtual chunks
ds = xr.open_dataset(fs.get_mapper(url), engine='zarr', consolidated=True,
chunks={'time':2160, 'lat':200, 'lon':100})
ds.BCEXTTAU
<xarray.DataArray 'BCEXTTAU' (time: 5136, lat: 721, lon: 1152)> dask.array<open_dataset-8bef34d2d5a9a6788dde832ad153c792BCEXTTAU, shape=(5136, 721, 1152), dtype=float32, chunksize=(2160, 200, 100), chunktype=numpy.ndarray> Coordinates: * lat (lat) float64 -90.0 -89.75 -89.5 -89.25 ... 89.25 89.5 89.75 90.0 * lon (lon) float64 -180.0 -179.7 -179.4 -179.1 ... 179.1 179.4 179.7 * time (time) datetime64[ns] 2020-06-01 ... 2020-12-31T23:00:00 Attributes: fmissing_value: 999999986991104.0 long_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ standard_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ units: 1 valid_range: [-999999986991104.0, 999999986991104.0] vmax: 999999986991104.0 vmin: -999999986991104.0
%%timeit
dsmean = ds.BCEXTTAU.mean(dim='time').compute()
10.2 s ± 495 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Finding: Slightly slower to load virtual 164 MiB virtual chunks compared to 82 MiB virtual chunks. Some of the workers don't have enough tasks to stay completely busy
ds = xr.open_dataset(fs.get_mapper(url), engine='zarr', consolidated=True,
chunks={'time':2160, 'lat':500, 'lon':100})
ds.BCEXTTAU
<xarray.DataArray 'BCEXTTAU' (time: 5136, lat: 721, lon: 1152)> dask.array<open_dataset-9f841ddfc8182a4ab15608f8ef614d75BCEXTTAU, shape=(5136, 721, 1152), dtype=float32, chunksize=(2160, 500, 100), chunktype=numpy.ndarray> Coordinates: * lat (lat) float64 -90.0 -89.75 -89.5 -89.25 ... 89.25 89.5 89.75 90.0 * lon (lon) float64 -180.0 -179.7 -179.4 -179.1 ... 179.1 179.4 179.7 * time (time) datetime64[ns] 2020-06-01 ... 2020-12-31T23:00:00 Attributes: fmissing_value: 999999986991104.0 long_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ standard_name: Black Carbon Extinction AOT [550 nm] __ENSEMBLE__ units: 1 valid_range: [-999999986991104.0, 999999986991104.0] vmax: 999999986991104.0 vmin: -999999986991104.0
%%timeit
dsmean = ds.BCEXTTAU.mean(dim='time').compute()
The slowest run took 11.33 times longer than the fastest. This could mean that an intermediate result is being cached. 46 s ± 52.8 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Finding: slower still to load virtual 412 MiB virtual chunks compared to 164 MiB virtual chunks.
gateway.stop_cluster(cluster_name=cluster.name)
print(f'stopped {cluster.name}')
stopped dev.34e2aae08db54f2b8db088265022a168
From this testing, the optimal uncompresssed chunk size for this dataset is around 16-82 MiB.
It's the compresssed chunks size that really matters for performance though, and if we assume the typical 2X compresssion the optimal compresssed chunk size should be around 8-41 MiB. This is in the same range as the 8-16 MiB byte-range requests mentioned in this AWS white paper on optimizing s3 performance, and similar to the 10-200MiB number commonly used in the Pangeo community.