import fsspec
import xarray as xr
import intake
from time import sleep
#%run /shared/users/environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari/ESIP, replace the above
## path name with a helper appropriate to your compute environment. Examples:
# %run ../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
n_workers = 30
nthreads = 2
import coiled
cluster = coiled.Cluster(
region="us-east-1",
compute_purchase_option="spot_with_fallback",
arm=True,
scheduler_port=443,
wait_for_workers=True,
n_workers=n_workers,
worker_options=dict(nthreads=nthreads),
account='dask'
)
client = cluster.get_client()
╭───────────────────────────────────────── Coiled Cluster ─────────────────────────────────────────╮ │ https://cloud.coiled.io/clusters/248586?account=dask │ ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯ ╭─────────────────── Overview ───────────────────╮╭──────────────── Configuration ─────────────────╮ │ ││ │ │ Cluster Name: dask-2ea9de36-8 ││ Region: us-east-1 │ │ ││ │ │ Scheduler Status: started ││ Scheduler Instance Type: m7g.xlarge │ │ ││ │ │ Dashboard Address: ││ Worker Instance Type(s): t4g.xlarge (30) │ │ https://cluster-cczhf.dask.host:8787?token=CfI ││ │ │ JzmUSNG_R68ET ││ Workers Requested: 30 │ │ ││ │ ╰────────────────────────────────────────────────╯╰────────────────────────────────────────────────╯ ╭─────────────────────────────────── (2023/07/27 12:57:43 UTC) ────────────────────────────────────╮ │ │ │ All workers ready. │ │ │ │ │ ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
# open the hytest data intake catalog
hytest_cat = intake.open_catalog(
r"https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml"
)
list(hytest_cat)
['conus404-catalog', 'conus404-drb-eval-tutorial-catalog', 'nhm-v1.0-daymet-catalog', 'nhm-v1.1-c404-bc-catalog', 'nhm-v1.1-gridmet-catalog', 'nwis-streamflow-usgs-gages-onprem', 'nwis-streamflow-usgs-gages-cloud', 'nwm21-streamflow-usgs-gages-onprem', 'nwm21-streamflow-usgs-gages-cloud', 'nwm21-streamflow-cloud', 'nwm21-scores', 'lcmap-cloud', 'rechunking-tutorial-cloud']
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)
['conus404-hourly-onprem', 'conus404-hourly-cloud', 'conus404-hourly-osn', 'conus404-hourly-osn2', 'conus404-daily-diagnostic-onprem', 'conus404-daily-diagnostic-cloud', 'conus404-daily-diagnostic-osn', 'conus404-daily-onprem', 'conus404-daily-cloud', 'conus404-daily-osn', 'conus404-daily-osn2', 'conus404-monthly-onprem', 'conus404-monthly-cloud', 'conus404-monthly-osn']
year = '1990'
dataset = 'conus404-hourly-cloud'
print(cat[dataset].urlpath)
ds = cat[dataset].to_dask()
%timeit da = ds.PREC_ACC_NC.sel(time=year).mean(dim='time').compute()
s3://nhgf-development/conus404/conus404_hourly_202209.zarr 18.8 s ± 881 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
dataset = 'conus404-hourly-osn'
print(cat[dataset].storage_options['client_kwargs']['endpoint_url'])
print(cat[dataset].urlpath)
ds = cat[dataset].to_dask()
%timeit da = ds.PREC_ACC_NC.sel(time=year).mean(dim='time').compute()
https://renc.osn.xsede.org s3://rsignellbucket2/hytest/conus404/conus404_hourly_202302.zarr 12.3 s ± 353 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
dataset = 'conus404-hourly-osn2'
print(cat[dataset].storage_options['client_kwargs']['endpoint_url'])
print(cat[dataset].urlpath)
ds = cat[dataset].to_dask()
%timeit da = ds.PREC_ACC_NC.sel(time=year).mean(dim='time').compute()
https://usgs.osn.mghpcc.org s3://usgspod-testbucket/hytest/conus404/conus404_hourly_202302.zarr 11.5 s ± 185 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
client.close()
sleep(5)
cluster.shutdown()