#!/usr/bin/env python # coding: utf-8 # # Compare data load times for CONUS404 # Computes mean over 1 year from hourly CONUS404 data with 60 cpus (30 workers w/2 cpus each) # ## Test case #2: Use compute in US-EAST-1 # * AWS S3 # * OSN pod design #1 at RENCI # * OSN pod design #2 at MGHPCC # # # In[1]: import fsspec import xarray as xr import intake from time import sleep # In[2]: #%run /shared/users/environment_set_up/Start_Dask_Cluster_Nebari.ipynb ## If this notebook is not being run on Nebari/ESIP, replace the above ## path name with a helper appropriate to your compute environment. Examples: # %run ../environment_set_up/Start_Dask_Cluster_Denali.ipynb # %run ../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb # In[3]: n_workers = 30 nthreads = 2 # In[4]: import coiled cluster = coiled.Cluster( region="us-east-1", compute_purchase_option="spot_with_fallback", arm=True, scheduler_port=443, wait_for_workers=True, n_workers=n_workers, worker_options=dict(nthreads=nthreads), account='dask' ) client = cluster.get_client() # In[5]: # open the hytest data intake catalog hytest_cat = intake.open_catalog( r"https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml" ) list(hytest_cat) # In[6]: # open the conus404 sub-catalog cat = hytest_cat['conus404-catalog'] list(cat) # In[7]: year = '1990' # #### AWS S3 storage in us-west-2 # In[8]: dataset = 'conus404-hourly-cloud' print(cat[dataset].urlpath) ds = cat[dataset].to_dask() get_ipython().run_line_magic('timeit', "da = ds.PREC_ACC_NC.sel(time=year).mean(dim='time').compute()") # #### OSN storage pod design #1 at RENCI # In[9]: dataset = 'conus404-hourly-osn' print(cat[dataset].storage_options['client_kwargs']['endpoint_url']) print(cat[dataset].urlpath) ds = cat[dataset].to_dask() get_ipython().run_line_magic('timeit', "da = ds.PREC_ACC_NC.sel(time=year).mean(dim='time').compute()") # #### OSN storage pod design #2 at MGHPCC # In[10]: dataset = 'conus404-hourly-osn2' print(cat[dataset].storage_options['client_kwargs']['endpoint_url']) print(cat[dataset].urlpath) ds = cat[dataset].to_dask() get_ipython().run_line_magic('timeit', "da = ds.PREC_ACC_NC.sel(time=year).mean(dim='time').compute()") # ## Stop cluster # In[11]: client.close() sleep(5) cluster.shutdown() # In[ ]: