import fsspec
import fsspec.implementations.reference
import zarr
import xarray as xr
from pathlib import Path
from rechunker import rechunk
import rechunker
rechunker.__version__
'0.5.1'
import zarr
zarr.__version__
'2.14.2'
Use a custom helper function ebd.start_dask_cluster
to set options on this cluster. We don't have to use this helper, it just cuts down on lines of code in notebooks.
import sys
import os
sys.path.append('/shared/users/rsignell/lib')
import ebdpy as ebd
os.environ['AWS_PROFILE'] = 'esip-qhub' # use env vars for AWS credentials to write
client, cluster, gateway = ebd.start_dask_cluster(
profile=os.environ['AWS_PROFILE'],
worker_max=30,
region='us-west-2',
worker_profile='Medium Worker',
use_existing_cluster=True,
adaptive_scaling=False,
wait_for_cluster=False,
propagate_env=True)
Region: us-west-2 Existing Dask clusters: Cluster Index c_idx: 0 / Name: dev.afcc5ab9bbf14962b459748d1e7e428f ClusterStatus.RUNNING Using existing cluster [0]. Setting Fixed Scaling workers=30 Reconnect client to clear cache client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub): https://nebari.esipfed.org/gateway/clusters/dev.afcc5ab9bbf14962b459748d1e7e428f/status Propagating environment variables to workers Using environment: users/users-pangeo
s3_lazy_refs = 's3://esip-qhub-public/nwm/LDAS-1k/lazyrefs'
%%time
fs = fsspec.implementations.reference.DFReferenceFileSystem(s3_lazy_refs, lazy=True, target_options={"anon": True},
remote_protocol="s3", remote_options={"anon": True})
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", chunks={'time':1, 'y':3840, 'x':4608},
backend_kwargs=dict(consolidated=False))
CPU times: user 4.64 s, sys: 281 ms, total: 4.92 s Wall time: 7.34 s
ds
<xarray.Dataset> Dimensions: (time: 116631, y: 3840, x: 4608, vis_nir: 2, soil_layers_stag: 4) Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 2020-12-31T21:00:00 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 ... 1.918e+06 1.919e+06 Dimensions without coordinates: vis_nir, soil_layers_stag Data variables: (12/21) ACCET (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ACSNOM (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ALBEDO (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ALBSND (time, y, vis_nir, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> ALBSNI (time, y, vis_nir, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> COSZ (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ... ... SNOWH (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> SOIL_M (time, y, soil_layers_stag, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> SOIL_W (time, y, soil_layers_stag, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> TRAD (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> UGDRNOFF (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> crs object ... Attributes: Conventions: CF-1.6 GDAL_DataType: Generic TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 model_configuration: retrospective model_initialization_time: 1979-02-01_00:00:00 model_output_type: land model_output_valid_time: 1979-02-01_03:00:00 model_total_valid_times: 472 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
ds = ds[['ACCET', 'SNEQV', 'FSNO', 'crs']]
ds
<xarray.Dataset> Dimensions: (time: 116631, y: 3840, x: 4608) Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 2020-12-31T21:00:00 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Data variables: ACCET (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> SNEQV (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> FSNO (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> crs object ... Attributes: Conventions: CF-1.6 GDAL_DataType: Generic TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 model_configuration: retrospective model_initialization_time: 1979-02-01_00:00:00 model_output_type: land model_output_valid_time: 1979-02-01_03:00:00 model_total_valid_times: 472 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
ds['ACCET'].isel(time=slice(0,144))
<xarray.DataArray 'ACCET' (time: 144, y: 3840, x: 4608)> dask.array<getitem, shape=(144, 3840, 4608), dtype=float64, chunksize=(1, 3840, 4608), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 1979-02-19 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Attributes: esri_pe_string: PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_Sphere",DAT... grid_mapping: crs long_name: Accumulated total ET units: mm valid_range: [-100000, 100000000]
ds.attrs
{'Conventions': 'CF-1.6', 'GDAL_DataType': 'Generic', 'TITLE': 'OUTPUT FROM WRF-Hydro v5.2.0-beta2', 'code_version': 'v5.2.0-beta2', 'model_configuration': 'retrospective', 'model_initialization_time': '1979-02-01_00:00:00', 'model_output_type': 'land', 'model_output_valid_time': '1979-02-01_03:00:00', 'model_total_valid_times': 472, 'proj4': '+proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1=30.0 +lat_2=60.0 +lat_0=40.0 +lon_0=-97.0 +x_0=0 +y_0=0 +k_0=1.0 +nadgrids=@null +wktext +no_defs'}
fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True)
temp_name = 'esip-qhub/testing/usgs/nwm1km.tmp'
target_name = 'esip-qhub/testing/usgs/nwm1km.zarr'
fs_write.rm(temp_name, recursive=True)
fs_write.rm(target_name, recursive=True)
temp_store = fs_write.get_mapper(temp_name)
target_store = fs_write.get_mapper(target_name)
ds = ds.drop('crs')
a = len(ds.time)/(144/2)
b = (len(ds.x) * len(ds.y))/((96*2)*(132*2))
a/b
4.640266927083334
#client.close()
#from dask.distributed import Client
#client = Client(threads_per_worker=1)
#client.amm.start()
rechunked = rechunk(ds.isel(time=slice(0,144)), target_chunks={'y':96*2, 'x':132*2, 'time':144/2},
target_store=target_store, temp_store=temp_store, max_mem='2.8GiB')
%%time
rechunked.execute(retries=10)
--------------------------------------------------------------------------- KilledWorker Traceback (most recent call last) File <timed eval>:1 File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/rechunker/api.py:70, in Rechunked.execute(self, **kwargs) 55 def execute(self, **kwargs): 56 """ 57 Execute the rechunking. 58 (...) 68 :func:`rechunker.rechunk`. 69 """ ---> 70 self._executor.execute_plan(self._plan, **kwargs) 71 return self._target File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/rechunker/executors/dask.py:48, in DaskPipelineExecutor.execute_plan(self, plan, **kwargs) 47 def execute_plan(self, plan: Delayed, **kwargs): ---> 48 return dask.compute(*plan, **kwargs) File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) 596 keys.append(x.__dask_keys__()) 597 postcomputes.append(x.__dask_postcompute__()) --> 599 results = schedule(dsk, keys, **kwargs) 600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/distributed/client.py:3168, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 3166 should_rejoin = False 3167 try: -> 3168 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 3169 finally: 3170 for f in futures.values(): File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/distributed/client.py:2328, in Client.gather(self, futures, errors, direct, asynchronous) 2326 else: 2327 local_worker = None -> 2328 return self.sync( 2329 self._gather, 2330 futures, 2331 errors=errors, 2332 direct=direct, 2333 local_worker=local_worker, 2334 asynchronous=asynchronous, 2335 ) File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/distributed/utils.py:345, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 343 return future 344 else: --> 345 return sync( 346 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 347 ) File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/distributed/utils.py:412, in sync(loop, func, callback_timeout, *args, **kwargs) 410 if error: 411 typ, exc, tb = error --> 412 raise exc.with_traceback(tb) 413 else: 414 return result File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/distributed/utils.py:385, in sync.<locals>.f() 383 future = wait_for(future, callback_timeout) 384 future = asyncio.ensure_future(future) --> 385 result = yield future 386 except Exception: 387 error = sys.exc_info() File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/tornado/gen.py:769, in Runner.run(self) 766 exc_info = None 768 try: --> 769 value = future.result() 770 except Exception: 771 exc_info = sys.exc_info() File /home/conda/users/fe7def5c6f998e02e59202a7068dedd36daae23d351ac1df13b4f646de5e193b-20230315-151621-145696-124-pangeo/lib/python3.10/site-packages/distributed/client.py:2191, in Client._gather(self, futures, errors, direct, local_worker) 2189 exc = CancelledError(key) 2190 else: -> 2191 raise exception.with_traceback(traceback) 2192 raise exc 2193 if errors == "skip": KilledWorker: Attempted to run task ('copy_intermediate_to_write-99b543990cee64af465f6100726ae64c', 0) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tls://10.10.18.231:44083. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
zarr.convenience.consolidate_metadata(target_store)
<zarr.hierarchy.Group '/'>
ds2 = xr.open_dataset(target_store, engine='zarr', chunks={})
ds2
<xarray.Dataset> Dimensions: (time: 144, y: 3840, x: 4608) Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 1979-02-19 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Data variables: ACCET (time, y, x) float64 dask.array<chunksize=(72, 192, 264), meta=np.ndarray> FSNO (time, y, x) float64 dask.array<chunksize=(72, 192, 264), meta=np.ndarray> SNEQV (time, y, x) float64 dask.array<chunksize=(72, 192, 264), meta=np.ndarray> Attributes: Conventions: CF-1.6 GDAL_DataType: Generic TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 model_configuration: retrospective model_initialization_time: 1979-02-01_00:00:00 model_output_type: land model_output_valid_time: 1979-02-01_03:00:00 model_total_valid_times: 472 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
ds2.ACCET
<xarray.DataArray 'ACCET' (time: 144, y: 3840, x: 4608)> dask.array<open_dataset-8603faf5ce88c8b55595cf260c04d3ffACCET, shape=(144, 3840, 4608), dtype=float64, chunksize=(72, 192, 264), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 1979-02-19 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Attributes: esri_pe_string: PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_Sphere",DAT... grid_mapping: crs long_name: Accumulated total ET units: mm valid_range: [-100000, 100000000]
import hvplot.xarray
ds2.ACCET[:,2000,2000].hvplot(x='time')