import fsspec
import fsspec.implementations.reference
import zarr
import xarray as xr
from pathlib import Path
from rechunker import rechunk
import rechunker
rechunker.__version__
'0.5.1'
Use a custom helper function ebd.start_dask_cluster
to set options on this cluster. We don't have to use this helper, it just cuts down on lines of code in notebooks.
import sys
import os
sys.path.append('/shared/users/rsignell/lib')
import ebdpy as ebd
os.environ['AWS_PROFILE'] = 'esip-qhub' # use env vars for AWS credentials to write
client, cluster, gateway = ebd.start_dask_cluster(
profile=os.environ['AWS_PROFILE'],
worker_max=30,
region='us-west-2',
use_existing_cluster=True,
adaptive_scaling=False,
wait_for_cluster=True,
propagate_env=True)
s3_lazy_refs = 's3://esip-qhub-public/nwm/LDAS-1k/lazyrefs'
fs = fsspec.implementations.reference.DFReferenceFileSystem(s3_lazy_refs, lazy=True, target_options={"anon": True},
remote_protocol="s3", remote_options={"anon": True})
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", chunks={'time':1, 'y':3840, 'x':4608},
backend_kwargs=dict(consolidated=False))
ds
<xarray.Dataset> Dimensions: (time: 116631, y: 3840, x: 4608, vis_nir: 2, soil_layers_stag: 4) Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 2020-12-31T21:00:00 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 ... 1.918e+06 1.919e+06 Dimensions without coordinates: vis_nir, soil_layers_stag Data variables: (12/21) ACCET (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ACSNOM (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ALBEDO (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ALBSND (time, y, vis_nir, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> ALBSNI (time, y, vis_nir, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> COSZ (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ... ... SNOWH (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> SOIL_M (time, y, soil_layers_stag, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> SOIL_W (time, y, soil_layers_stag, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> TRAD (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> UGDRNOFF (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> crs object ... Attributes: Conventions: CF-1.6 GDAL_DataType: Generic TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 model_configuration: retrospective model_initialization_time: 1979-02-01_00:00:00 model_output_type: land model_output_valid_time: 1979-02-01_03:00:00 model_total_valid_times: 472 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
ds = ds[['ACCET', 'SNEQV', 'FSNO', 'crs']]
ds
<xarray.Dataset> Dimensions: (time: 116631, y: 3840, x: 4608) Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 2020-12-31T21:00:00 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Data variables: ACCET (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> SNEQV (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> FSNO (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> crs object ... Attributes: Conventions: CF-1.6 GDAL_DataType: Generic TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 model_configuration: retrospective model_initialization_time: 1979-02-01_00:00:00 model_output_type: land model_output_valid_time: 1979-02-01_03:00:00 model_total_valid_times: 472 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
ds['ACCET'].isel(time=slice(0,144))
<xarray.DataArray 'ACCET' (time: 144, y: 3840, x: 4608)> dask.array<getitem, shape=(144, 3840, 4608), dtype=float64, chunksize=(1, 3840, 4608), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 1979-02-19 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Attributes: esri_pe_string: PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_Sphere",DAT... grid_mapping: crs long_name: Accumulated total ET units: mm valid_range: [-100000, 100000000]
ds.attrs
{'Conventions': 'CF-1.6', 'GDAL_DataType': 'Generic', 'TITLE': 'OUTPUT FROM WRF-Hydro v5.2.0-beta2', 'code_version': 'v5.2.0-beta2', 'model_configuration': 'retrospective', 'model_initialization_time': '1979-02-01_00:00:00', 'model_output_type': 'land', 'model_output_valid_time': '1979-02-01_03:00:00', 'model_total_valid_times': 472, 'proj4': '+proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1=30.0 +lat_2=60.0 +lat_0=40.0 +lon_0=-97.0 +x_0=0 +y_0=0 +k_0=1.0 +nadgrids=@null +wktext +no_defs'}
fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True)
temp_name = 'esip-qhub/testing/usgs/nwm1km.tmp'
target_name = 'esip-qhub/testing/usgs/nwm1km.zarr'
fs_write.rm(temp_name, recursive=True)
fs_write.rm(target_name, recursive=True)
temp_store = fs_write.get_mapper(temp_name)
target_store = fs_write.get_mapper(target_name)
ds = ds.drop('crs')
a = len(ds.time)/(144/2)
b = (len(ds.x) * len(ds.y))/((96*2)*(132*2))
a/b
4.640266927083334
rechunked = rechunk(ds.isel(time=slice(0,144)), target_chunks={'y':96*2, 'x':132*2, 'time':144/2},
target_store=target_store, temp_store=temp_store, max_mem='3.5GiB')
--------------------------------------------------------------------------- ReadOnlyError Traceback (most recent call last) Cell In[20], line 1 ----> 1 rechunked = rechunk(ds.isel(time=slice(0,144)), target_chunks={'y':96*2, 'x':132*2, 'time':144/2}, 2 target_store=target_store, temp_store=temp_store, max_mem='3.5GiB') File /home/conda/users/6da996e8bac98e9914efe8740ae916e58e2ced6a618659b47fccce8f5bc9cd6a-20230314-182641-028005-117-pangeo/lib/python3.10/site-packages/rechunker/api.py:297, in rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options, executor) 294 if isinstance(executor, str): 295 executor = _get_executor(executor) --> 297 copy_spec, intermediate, target = _setup_rechunk( 298 source=source, 299 target_chunks=target_chunks, 300 max_mem=max_mem, 301 target_store=target_store, 302 target_options=target_options, 303 temp_store=temp_store, 304 temp_options=temp_options, 305 ) 306 plan = executor.prepare_plan(copy_spec) 307 return Rechunked(executor, plan, source, intermediate, target) File /home/conda/users/6da996e8bac98e9914efe8740ae916e58e2ced6a618659b47fccce8f5bc9cd6a-20230314-182641-028005-117-pangeo/lib/python3.10/site-packages/rechunker/api.py:395, in _setup_rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options) 392 attrs = _encode_zarr_attributes(attrs) 394 if temp_store is not None: --> 395 temp_group = zarr.group(temp_store) 396 else: 397 temp_group = None File /home/conda/users/6da996e8bac98e9914efe8740ae916e58e2ced6a618659b47fccce8f5bc9cd6a-20230314-182641-028005-117-pangeo/lib/python3.10/site-packages/zarr/hierarchy.py:1355, in group(store, overwrite, chunk_store, cache_attrs, synchronizer, path, zarr_version) 1352 requires_init = overwrite or not contains_group(store, path) 1354 if requires_init: -> 1355 init_group(store, overwrite=overwrite, chunk_store=chunk_store, 1356 path=path) 1358 return Group(store, read_only=False, chunk_store=chunk_store, 1359 cache_attrs=cache_attrs, synchronizer=synchronizer, path=path, 1360 zarr_version=zarr_version) File /home/conda/users/6da996e8bac98e9914efe8740ae916e58e2ced6a618659b47fccce8f5bc9cd6a-20230314-182641-028005-117-pangeo/lib/python3.10/site-packages/zarr/storage.py:643, in init_group(store, overwrite, path, chunk_store) 640 store['zarr.json'] = store._metadata_class.encode_hierarchy_metadata(None) # type: ignore 642 # initialise metadata --> 643 _init_group_metadata(store=store, overwrite=overwrite, path=path, 644 chunk_store=chunk_store) 646 if store_version == 3: 647 # TODO: Should initializing a v3 group also create a corresponding 648 # empty folder under data/root/? I think probably not until there 649 # is actual data written there. 650 pass File /home/conda/users/6da996e8bac98e9914efe8740ae916e58e2ced6a618659b47fccce8f5bc9cd6a-20230314-182641-028005-117-pangeo/lib/python3.10/site-packages/zarr/storage.py:706, in _init_group_metadata(store, overwrite, path, chunk_store) 704 key = _prefix_to_group_key(store, _path_to_prefix(path)) 705 if hasattr(store, '_metadata_class'): --> 706 store[key] = store._metadata_class.encode_group_metadata(meta) # type: ignore 707 else: 708 store[key] = encode_group_metadata(meta) File /home/conda/users/6da996e8bac98e9914efe8740ae916e58e2ced6a618659b47fccce8f5bc9cd6a-20230314-182641-028005-117-pangeo/lib/python3.10/site-packages/zarr/storage.py:1398, in FSStore.__setitem__(self, key, value) 1396 def __setitem__(self, key, value): 1397 if self.mode == 'r': -> 1398 raise ReadOnlyError() 1399 key = self._normalize_key(key) 1400 path = self.dir_path(key) ReadOnlyError: object is read-only
%%time
rechunked.execute(retries=10)
zarr.convenience.consolidate_metadata(target_store)
ds2 = xr.open_dataset(target_store, engine='zarr', chunks={})
ds2
ds2.ACCET
import hvplot.xarray
ds2.ACCET[:,2000,2000].hvplot(x='time')
import rechunker
rechunker.__version__