Example using dask-gateway with a JupyterHub sandbox, intake-esm (on S3 NetCDF)
Author: A.Radhakrishnan, Feb 2021
using t2.xlarge
from netCDF4 import Dataset
import numpy as np
import matplotlib
from matplotlib import pyplot as plt
import xarray as xr
import intake,yaml
import intake_esm
import numpy as np
%matplotlib inline
import sys
from dask_gateway import Gateway
def launchDask():
#TODO try, except
gateway = Gateway()
clusters = gateway.list_clusters()
clusters
if len(clusters) >0:
cluster = gateway.connect(clusters[0].name)
print("using existing connection")
print(cluster)
else:
from dask_gateway import GatewayCluster
cluster = GatewayCluster()
return(cluster)
LAUNCH DASK CLUSTER
cluster = launchDask()
#%pip install ipywidgets
using existing connection GatewayCluster<pdub.08aa64e1f7134720b507fd602d19393a, status=running>
cluster ## see the cluster settings below, we will scale it up in a couple of cells
VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n …
min_w = 0
max_w = 30
if(min_w > 30):
print("Let's restrict the max workers to 30 for now")
min_w = 30
from distributed import Client
client = Client(cluster)
client
Client
|
Cluster
|
MONITOR THE DASHBOARD from link above
ADAPTIVELY SCALE CLUSTER, so when nothing is going on we use minimum nodes in the setting below.
cluster.adapt(minimum=min_w, maximum=max_w)
We will now use our intake_esm catalog and the intake_esm API for data exploration and search. Then use xarray and matplotlib as needed.
Catalog:
esgf-world.json is the ESM collections spec file for the netCDF data in the S3 bucket esgf-world. The cell below points to the most updated catalog (updated periodically using a CatalogBuilder API)
Other examples can be found here as well:
You can refer to https://github.com/aradhakrishnanGFDL/gfdl-aws-analysis/tree/community/esm-collection-spec-examples for the most recent catalogs More examples can be found in https://github.com/aradhakrishnanGFDL/gfdl-aws-analysis/tree/community/examples
col_url = "https://cmip6-nc.s3.us-east-2.amazonaws.com/esgf-world.json"
col = intake.open_esm_datastore(col_url)
Examples to just search for what we want from the catalog
expname_filter = ['historical']
table_id_filter = 'Omon'
model_filter = 'GFDL-ESM4'
variable_id_filter = "thetao"
ens_filter = "r1i1p1f1"
#version_filter = "v20190726"
grid_label="gn"
cat = col.search(grid_label="gn",experiment_id=expname_filter, mip_table=table_id_filter,model=model_filter,variable=variable_id_filter,version="v20190726")
dset_dict = cat.to_dataset_dict(cdf_kwargs={'chunks': {'time': 1}},storage_options={'anon':True})
--> The keys in the returned dictionary of datasets are constructed as follows: 'project.institute.model.experiment_id.mip_table'
ds = dset_dict['CMIP6.NOAA-GFDL.GFDL-ESM4.historical.Omon']
thetao = ds.thetao
thetao
<xarray.DataArray 'thetao' (ensemble_member: 1, time: 1980, lev: 35, y: 576, x: 720)> dask.array<broadcast_to, shape=(1, 1980, 35, 576, 720), dtype=float32, chunksize=(1, 1, 35, 576, 720), chunktype=numpy.ndarray> Coordinates: lat (y, x) float32 dask.array<chunksize=(576, 720), meta=np.ndarray> lon (y, x) float32 dask.array<chunksize=(576, 720), meta=np.ndarray> * time (time) object 1850-01-16 12:00:00 ... 2014-12-16 12:00:00 * x (x) float64 -299.8 -299.2 -298.8 ... 58.75 59.25 59.75 * y (y) float64 -77.91 -77.72 -77.54 ... 89.47 89.68 89.89 * lev (lev) float64 2.5 10.0 20.0 32.5 ... 5.5e+03 6e+03 6.5e+03 * ensemble_member (ensemble_member) <U8 'r1i1p1f1' Attributes: long_name: Sea Water Potential Temperature units: degC cell_methods: area: mean where sea time: mean cell_measures: area: areacello volume: volcello standard_name: sea_water_potential_temperature original_name: thetao
|
|
|
array([cftime.DatetimeNoLeap(1850, 1, 16, 12, 0, 0, 0), cftime.DatetimeNoLeap(1850, 2, 15, 0, 0, 0, 0), cftime.DatetimeNoLeap(1850, 3, 16, 12, 0, 0, 0), ..., cftime.DatetimeNoLeap(2014, 10, 16, 12, 0, 0, 0), cftime.DatetimeNoLeap(2014, 11, 16, 0, 0, 0, 0), cftime.DatetimeNoLeap(2014, 12, 16, 12, 0, 0, 0)], dtype=object)
array([-299.75, -299.25, -298.75, ..., 58.75, 59.25, 59.75])
array([-77.907938, -77.723813, -77.539688, ..., 89.472 , 89.6832 , 89.8944 ])
array([2.5000e+00, 1.0000e+01, 2.0000e+01, 3.2500e+01, 5.1250e+01, 7.5000e+01, 1.0000e+02, 1.2500e+02, 1.5625e+02, 2.0000e+02, 2.5000e+02, 3.1250e+02, 4.0000e+02, 5.0000e+02, 6.0000e+02, 7.0000e+02, 8.0000e+02, 9.0000e+02, 1.0000e+03, 1.1000e+03, 1.2000e+03, 1.3000e+03, 1.4000e+03, 1.5375e+03, 1.7500e+03, 2.0625e+03, 2.5000e+03, 3.0000e+03, 3.5000e+03, 4.0000e+03, 4.5000e+03, 5.0000e+03, 5.5000e+03, 6.0000e+03, 6.5000e+03])
array(['r1i1p1f1'], dtype='<U8')
ds.thetao
<xarray.DataArray 'thetao' (ensemble_member: 1, time: 1980, lev: 35, y: 576, x: 720)> dask.array<broadcast_to, shape=(1, 1980, 35, 576, 720), dtype=float32, chunksize=(1, 1, 35, 576, 720), chunktype=numpy.ndarray> Coordinates: lat (y, x) float32 dask.array<chunksize=(576, 720), meta=np.ndarray> lon (y, x) float32 dask.array<chunksize=(576, 720), meta=np.ndarray> * time (time) object 1850-01-16 12:00:00 ... 2014-12-16 12:00:00 * x (x) float64 -299.8 -299.2 -298.8 ... 58.75 59.25 59.75 * y (y) float64 -77.91 -77.72 -77.54 ... 89.47 89.68 89.89 * lev (lev) float64 2.5 10.0 20.0 32.5 ... 5.5e+03 6e+03 6.5e+03 * ensemble_member (ensemble_member) <U8 'r1i1p1f1' Attributes: long_name: Sea Water Potential Temperature units: degC cell_methods: area: mean where sea time: mean cell_measures: area: areacello volume: volcello standard_name: sea_water_potential_temperature original_name: thetao
|
|
|
array([cftime.DatetimeNoLeap(1850, 1, 16, 12, 0, 0, 0), cftime.DatetimeNoLeap(1850, 2, 15, 0, 0, 0, 0), cftime.DatetimeNoLeap(1850, 3, 16, 12, 0, 0, 0), ..., cftime.DatetimeNoLeap(2014, 10, 16, 12, 0, 0, 0), cftime.DatetimeNoLeap(2014, 11, 16, 0, 0, 0, 0), cftime.DatetimeNoLeap(2014, 12, 16, 12, 0, 0, 0)], dtype=object)
array([-299.75, -299.25, -298.75, ..., 58.75, 59.25, 59.75])
array([-77.907938, -77.723813, -77.539688, ..., 89.472 , 89.6832 , 89.8944 ])
array([2.5000e+00, 1.0000e+01, 2.0000e+01, 3.2500e+01, 5.1250e+01, 7.5000e+01, 1.0000e+02, 1.2500e+02, 1.5625e+02, 2.0000e+02, 2.5000e+02, 3.1250e+02, 4.0000e+02, 5.0000e+02, 6.0000e+02, 7.0000e+02, 8.0000e+02, 9.0000e+02, 1.0000e+03, 1.1000e+03, 1.2000e+03, 1.3000e+03, 1.4000e+03, 1.5375e+03, 1.7500e+03, 2.0625e+03, 2.5000e+03, 3.0000e+03, 3.5000e+03, 4.0000e+03, 4.5000e+03, 5.0000e+03, 5.5000e+03, 6.0000e+03, 6.5000e+03])
array(['r1i1p1f1'], dtype='<U8')
thetao_mean = thetao.sel(lev=2.5).mean(dim='time')
%time thetao_mean.plot(figsize=[10,8], vmin=-2, vmax=38, cmap='gist_ncar')
CPU times: user 1.06 s, sys: 125 ms, total: 1.19 s Wall time: 1min 50s
<matplotlib.collections.QuadMesh at 0x7f693d7b6ca0>
client.close()
cluster.close()