import fsspec
import fsspec.implementations.reference
import zarr
import xarray as xr
from pathlib import Path
from rechunker import rechunk
import rechunker
rechunker.__version__
'0.5.1'
import zarr
zarr.__version__
'2.14.3.dev8'
Use a custom helper function ebd.start_dask_cluster
to set options on this cluster. We don't have to use this helper, it just cuts down on lines of code in notebooks.
import sys
import os
sys.path.append('/shared/users/rsignell/lib')
import ebdpy as ebd
os.environ['AWS_PROFILE'] = 'esip-qhub' # use env vars for AWS credentials to write
client, cluster, gateway = ebd.start_dask_cluster(
profile=os.environ['AWS_PROFILE'],
worker_max=20,
region='us-west-2',
use_existing_cluster=True,
adaptive_scaling=False,
wait_for_cluster=False,
propagate_env=True)
Region: us-west-2 Existing Dask clusters: Cluster Index c_idx: 0 / Name: dev.11b2890de0664f619f14067739e3f9a1 ClusterStatus.RUNNING Using existing cluster [0]. Setting Fixed Scaling workers=10 Reconnect client to clear cache client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub): https://nebari.esipfed.org/gateway/clusters/dev.11b2890de0664f619f14067739e3f9a1/status Propagating environment variables to workers Using environment: users/users-pangeo
s3_lazy_refs = 's3://esip-qhub-public/nwm/LDAS-1k/lazyrefs'
fs = fsspec.implementations.reference.DFReferenceFileSystem(s3_lazy_refs, lazy=True, target_options={"anon": True},
remote_protocol="s3", remote_options={"anon": True})
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", chunks={'time':1, 'y':3840, 'x':4608},
backend_kwargs=dict(consolidated=False))
ds
<xarray.Dataset> Dimensions: (time: 116631, y: 3840, x: 4608, vis_nir: 2, soil_layers_stag: 4) Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 2020-12-31T21:00:00 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 ... 1.918e+06 1.919e+06 Dimensions without coordinates: vis_nir, soil_layers_stag Data variables: (12/21) ACCET (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ACSNOM (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ALBEDO (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ALBSND (time, y, vis_nir, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> ALBSNI (time, y, vis_nir, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> COSZ (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> ... ... SNOWH (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> SOIL_M (time, y, soil_layers_stag, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> SOIL_W (time, y, soil_layers_stag, x) float64 dask.array<chunksize=(1, 3840, 1, 4608), meta=np.ndarray> TRAD (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> UGDRNOFF (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> crs object ... Attributes: Conventions: CF-1.6 GDAL_DataType: Generic TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 model_configuration: retrospective model_initialization_time: 1979-02-01_00:00:00 model_output_type: land model_output_valid_time: 1979-02-01_03:00:00 model_total_valid_times: 472 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
ds = ds[['ACCET', 'SNEQV', 'FSNO', 'crs']]
ds
<xarray.Dataset> Dimensions: (time: 116631, y: 3840, x: 4608) Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 2020-12-31T21:00:00 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Data variables: ACCET (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> SNEQV (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> FSNO (time, y, x) float64 dask.array<chunksize=(1, 3840, 4608), meta=np.ndarray> crs object ... Attributes: Conventions: CF-1.6 GDAL_DataType: Generic TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 model_configuration: retrospective model_initialization_time: 1979-02-01_00:00:00 model_output_type: land model_output_valid_time: 1979-02-01_03:00:00 model_total_valid_times: 472 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
ds['ACCET'].isel(time=slice(0,144))
<xarray.DataArray 'ACCET' (time: 144, y: 3840, x: 4608)> dask.array<getitem, shape=(144, 3840, 4608), dtype=float64, chunksize=(1, 3840, 4608), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 1979-02-19 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Attributes: esri_pe_string: PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_Sphere",DAT... grid_mapping: crs long_name: Accumulated total ET units: mm valid_range: [-100000, 100000000]
ds.attrs
{'Conventions': 'CF-1.6', 'GDAL_DataType': 'Generic', 'TITLE': 'OUTPUT FROM WRF-Hydro v5.2.0-beta2', 'code_version': 'v5.2.0-beta2', 'model_configuration': 'retrospective', 'model_initialization_time': '1979-02-01_00:00:00', 'model_output_type': 'land', 'model_output_valid_time': '1979-02-01_03:00:00', 'model_total_valid_times': 472, 'proj4': '+proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1=30.0 +lat_2=60.0 +lat_0=40.0 +lon_0=-97.0 +x_0=0 +y_0=0 +k_0=1.0 +nadgrids=@null +wktext +no_defs'}
fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True)
temp_name = 'esip-qhub/testing/usgs/nwm1km.tmp'
target_name = 'esip-qhub/testing/usgs/nwm1km.zarr'
fs_write.rm(temp_name, recursive=True)
fs_write.rm(target_name, recursive=True)
temp_store = fs_write.get_mapper(temp_name)
target_store = fs_write.get_mapper(target_name)
ds = ds.drop('crs')
a = len(ds.time)/(144/2)
b = (len(ds.x) * len(ds.y))/((96*2)*(132*2))
a/b
4.640266927083334
client
Client-06ee89e9-c2c5-11ed-81db-7a80b6b85ee3
Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
Dashboard: https://nebari.esipfed.org/gateway/clusters/dev.11b2890de0664f619f14067739e3f9a1/status |
rechunked = rechunk(ds.isel(time=slice(0,144)), target_chunks={'y':96*2, 'x':132*2, 'time':144/2},
target_store=target_store, temp_store=temp_store, max_mem='3.5GiB')
%%time
rechunked.execute(retries=10)
--------------------------------------------------------------------------- KilledWorker Traceback (most recent call last) File <timed eval>:1 File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/rechunker/api.py:70, in Rechunked.execute(self, **kwargs) 55 def execute(self, **kwargs): 56 """ 57 Execute the rechunking. 58 (...) 68 :func:`rechunker.rechunk`. 69 """ ---> 70 self._executor.execute_plan(self._plan, **kwargs) 71 return self._target File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/rechunker/executors/dask.py:48, in DaskPipelineExecutor.execute_plan(self, plan, **kwargs) 47 def execute_plan(self, plan: Delayed, **kwargs): ---> 48 return dask.compute(*plan, **kwargs) File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) 596 keys.append(x.__dask_keys__()) 597 postcomputes.append(x.__dask_postcompute__()) --> 599 results = schedule(dsk, keys, **kwargs) 600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/distributed/client.py:3168, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs) 3166 should_rejoin = False 3167 try: -> 3168 results = self.gather(packed, asynchronous=asynchronous, direct=direct) 3169 finally: 3170 for f in futures.values(): File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/distributed/client.py:2328, in Client.gather(self, futures, errors, direct, asynchronous) 2326 else: 2327 local_worker = None -> 2328 return self.sync( 2329 self._gather, 2330 futures, 2331 errors=errors, 2332 direct=direct, 2333 local_worker=local_worker, 2334 asynchronous=asynchronous, 2335 ) File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/distributed/utils.py:345, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs) 343 return future 344 else: --> 345 return sync( 346 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs 347 ) File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/distributed/utils.py:412, in sync(loop, func, callback_timeout, *args, **kwargs) 410 if error: 411 typ, exc, tb = error --> 412 raise exc.with_traceback(tb) 413 else: 414 return result File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/distributed/utils.py:385, in sync.<locals>.f() 383 future = wait_for(future, callback_timeout) 384 future = asyncio.ensure_future(future) --> 385 result = yield future 386 except Exception: 387 error = sys.exc_info() File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/tornado/gen.py:769, in Runner.run(self) 766 exc_info = None 768 try: --> 769 value = future.result() 770 except Exception: 771 exc_info = sys.exc_info() File /home/conda/users/c9c1fdd9f6cf411382ca52fbbe7aea9d309d91e1152939ce957e669de800f694-20230314-232639-775402-118-pangeo/lib/python3.10/site-packages/distributed/client.py:2191, in Client._gather(self, futures, errors, direct, local_worker) 2189 exc = CancelledError(key) 2190 else: -> 2191 raise exception.with_traceback(traceback) 2192 raise exc 2193 if errors == "skip": KilledWorker: Attempted to run task ('copy_intermediate_to_write-bca90f45d4dc080cca14b54ce5a10d1f', 2) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tls://10.10.105.181:35291. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
client
Client-06ee89e9-c2c5-11ed-81db-7a80b6b85ee3
Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
Dashboard: https://nebari.esipfed.org/gateway/clusters/dev.11b2890de0664f619f14067739e3f9a1/status |
zarr.convenience.consolidate_metadata(target_store)
<zarr.hierarchy.Group '/'>
ds2 = xr.open_dataset(target_store, engine='zarr', chunks={})
ds2
<xarray.Dataset> Dimensions: (time: 144, y: 3840, x: 4608) Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 1979-02-19 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Data variables: ACCET (time, y, x) float64 dask.array<chunksize=(72, 192, 264), meta=np.ndarray> FSNO (time, y, x) float64 dask.array<chunksize=(72, 192, 264), meta=np.ndarray> SNEQV (time, y, x) float64 dask.array<chunksize=(72, 192, 264), meta=np.ndarray> Attributes: Conventions: CF-1.6 GDAL_DataType: Generic TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 code_version: v5.2.0-beta2 model_configuration: retrospective model_initialization_time: 1979-02-01_00:00:00 model_output_type: land model_output_valid_time: 1979-02-01_03:00:00 model_total_valid_times: 472 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
ds2.ACCET
<xarray.DataArray 'ACCET' (time: 144, y: 3840, x: 4608)> dask.array<open_dataset-7b535523c333df3540f431d9e01965feACCET, shape=(144, 3840, 4608), dtype=float64, chunksize=(72, 192, 264), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 1979-02-01T03:00:00 ... 1979-02-19 * x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06 * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06 Attributes: esri_pe_string: PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_Sphere",DAT... grid_mapping: crs long_name: Accumulated total ET units: mm valid_range: [-100000, 100000000]
import hvplot.xarray
ds2.ACCET[:,2000,2000].hvplot(x='time')