# rerun the notebook with different versions to test
target_version = '0.19.0'
# # Execute this cell once and then comment out and restart notebook
# !pip install xarray=={target_version} --upgrade
# !pip install cmip6_preprocessing --upgrade
import xarray as xr
xr.__version__
'0.19.0'
import subprocess
import logging
from distributed import WorkerPlugin
from dask_gateway import GatewayCluster
from distributed import Client
class PipPlugin(WorkerPlugin):
"""
Install packages on a worker as it starts up.
Parameters
----------
packages : List[str]
A list of packages to install with pip on startup.
"""
def __init__(self, packages):
self.packages = packages
def setup(self, worker):
logger = logging.getLogger("distributed.worker")
subprocess.call(['python', '-m', 'pip', 'install', '--upgrade'] + self.packages)
logger.info("Installed %s", self.packages)
cluster = GatewayCluster()
client = cluster.get_client()
# if you comment out the following two lines, the compute at the end will break!
plugin = PipPlugin([f'xarray=={target_version}'])
client.register_worker_plugin(plugin)
client
Client
|
Cluster
|
# quickly confirm the versions on the workers
def check():
import xarray
return xarray.__version__
cluster.scale(4)
client.wait_for_workers(2)
client.run(check)
{'tls://10.36.65.5:44121': '0.19.0', 'tls://10.36.67.5:34405': '0.19.0'}
This version needs to match the version above, otherwise you might run into trouble!
# Super simplistic cloud data example (eliminating intake-esm and cmip6_preprocessing)
store = 'gs://cmip6/CMIP6/CMIP/E3SM-Project/E3SM-1-0/historical/r3i1p1f1/Omon/thetao/gr/v20200129/'
ds = xr.open_zarr(store)
# now compute something
ds.mean(['lon', 'lat']).thetao.load()
<xarray.DataArray 'thetao' (time: 1980, lev: 60)> array([[13.4562645 , 13.393538 , 13.235398 , ..., 0.9334626 , 0.90995324, 0.9026728 ], [13.619133 , 13.573829 , 13.468156 , ..., 0.93346804, 0.91000783, 0.902723 ], [13.543202 , 13.505928 , 13.432397 , ..., 0.933512 , 0.91005 , 0.902773 ], ..., [13.882875 , 13.848218 , 13.763058 , ..., 1.0088891 , 0.9831473 , 0.9660718 ], [13.666238 , 13.620644 , 13.528952 , ..., 1.0089248 , 0.9831997 , 0.9661009 ], [13.815262 , 13.749072 , 13.5942955 , ..., 1.0088788 , 0.98319495, 0.9661138 ]], dtype=float32) Coordinates: * lev (lev) float64 5.0 15.0 25.0 35.0 ... 4.875e+03 5.125e+03 5.375e+03 * time (time) object 1850-01-16 12:00:00 ... 2014-12-16 12:00:00
array([[13.4562645 , 13.393538 , 13.235398 , ..., 0.9334626 , 0.90995324, 0.9026728 ], [13.619133 , 13.573829 , 13.468156 , ..., 0.93346804, 0.91000783, 0.902723 ], [13.543202 , 13.505928 , 13.432397 , ..., 0.933512 , 0.91005 , 0.902773 ], ..., [13.882875 , 13.848218 , 13.763058 , ..., 1.0088891 , 0.9831473 , 0.9660718 ], [13.666238 , 13.620644 , 13.528952 , ..., 1.0089248 , 0.9831997 , 0.9661009 ], [13.815262 , 13.749072 , 13.5942955 , ..., 1.0088788 , 0.98319495, 0.9661138 ]], dtype=float32)
array([5.000000e+00, 1.500000e+01, 2.500000e+01, 3.500000e+01, 4.500000e+01, 5.500000e+01, 6.500000e+01, 7.500000e+01, 8.500000e+01, 9.500000e+01, 1.050000e+02, 1.150000e+02, 1.250000e+02, 1.350000e+02, 1.450000e+02, 1.550000e+02, 1.650984e+02, 1.754790e+02, 1.862912e+02, 1.976603e+02, 2.097113e+02, 2.225783e+02, 2.364088e+02, 2.513701e+02, 2.676542e+02, 2.854836e+02, 3.051192e+02, 3.268679e+02, 3.510934e+02, 3.782275e+02, 4.087846e+02, 4.433777e+02, 4.827367e+02, 5.277280e+02, 5.793729e+02, 6.388626e+02, 7.075633e+02, 7.870025e+02, 8.788252e+02, 9.847059e+02, 1.106204e+03, 1.244567e+03, 1.400497e+03, 1.573946e+03, 1.764003e+03, 1.968944e+03, 2.186457e+03, 2.413972e+03, 2.649001e+03, 2.889385e+03, 3.133405e+03, 3.379794e+03, 3.627671e+03, 3.876452e+03, 4.125768e+03, 4.375393e+03, 4.625191e+03, 4.875084e+03, 5.125028e+03, 5.375000e+03])
array([cftime.DatetimeNoLeap(1850, 1, 16, 12, 0, 0, 0), cftime.DatetimeNoLeap(1850, 2, 15, 0, 0, 0, 0), cftime.DatetimeNoLeap(1850, 3, 16, 12, 0, 0, 0), ..., cftime.DatetimeNoLeap(2014, 10, 16, 12, 0, 0, 0), cftime.DatetimeNoLeap(2014, 11, 16, 0, 0, 0, 0), cftime.DatetimeNoLeap(2014, 12, 16, 12, 0, 0, 0)], dtype=object)
# also works with cmip6_preprocessing and intake-esm
import intake
from cmip6_preprocessing.preprocessing import combined_preprocessing
col = intake.open_esm_datastore(
"https://storage.googleapis.com/cmip6/pangeo-cmip6.json"
)
z_kwargs = {"consolidated": True, "use_cftime": True}
query = dict(
experiment_id=["historical"],
table_id=["Omon"],
variable_id=['thetao'],
member_id=['r3i1p1f1'],
grid_label=["gr"],
source_id=[
"E3SM-1-0",
],
)
dset_dict = col.search(**query).to_dataset_dict(
zarr_kwargs=z_kwargs,
storage_options={"token": "anon"},
preprocess=combined_preprocessing,
aggregate=False
)
ds = dset_dict['CMIP.E3SM-Project.E3SM-1-0.historical.r3i1p1f1.Omon.thetao.gr.gs://cmip6/CMIP6/CMIP/E3SM-Project/E3SM-1-0/historical/r3i1p1f1/Omon/thetao/gr/v20200129/.nan.20200129']
# now compute something
ds.mean(['x', 'y']).thetao.load()
--> The keys in the returned dictionary of datasets are constructed as follows: 'activity_id.institution_id.source_id.experiment_id.member_id.table_id.variable_id.grid_label.zstore.dcpp_init_year.version'
<xarray.DataArray 'thetao' (time: 1980, lev: 60)> array([[13.4562645 , 13.393538 , 13.235398 , ..., 0.9334626 , 0.90995324, 0.9026728 ], [13.619133 , 13.573829 , 13.468156 , ..., 0.93346804, 0.91000783, 0.902723 ], [13.543202 , 13.505928 , 13.432397 , ..., 0.933512 , 0.91005 , 0.902773 ], ..., [13.882875 , 13.848218 , 13.763058 , ..., 1.0088891 , 0.9831473 , 0.9660718 ], [13.666238 , 13.620644 , 13.528952 , ..., 1.0089248 , 0.9831997 , 0.9661009 ], [13.815262 , 13.749072 , 13.5942955 , ..., 1.0088788 , 0.98319495, 0.9661138 ]], dtype=float32) Coordinates: * lev (lev) float64 5.0 15.0 25.0 35.0 ... 4.875e+03 5.125e+03 5.375e+03 * time (time) object 1850-01-16 12:00:00 ... 2014-12-16 12:00:00
array([[13.4562645 , 13.393538 , 13.235398 , ..., 0.9334626 , 0.90995324, 0.9026728 ], [13.619133 , 13.573829 , 13.468156 , ..., 0.93346804, 0.91000783, 0.902723 ], [13.543202 , 13.505928 , 13.432397 , ..., 0.933512 , 0.91005 , 0.902773 ], ..., [13.882875 , 13.848218 , 13.763058 , ..., 1.0088891 , 0.9831473 , 0.9660718 ], [13.666238 , 13.620644 , 13.528952 , ..., 1.0089248 , 0.9831997 , 0.9661009 ], [13.815262 , 13.749072 , 13.5942955 , ..., 1.0088788 , 0.98319495, 0.9661138 ]], dtype=float32)
array([5.000000e+00, 1.500000e+01, 2.500000e+01, 3.500000e+01, 4.500000e+01, 5.500000e+01, 6.500000e+01, 7.500000e+01, 8.500000e+01, 9.500000e+01, 1.050000e+02, 1.150000e+02, 1.250000e+02, 1.350000e+02, 1.450000e+02, 1.550000e+02, 1.650984e+02, 1.754790e+02, 1.862912e+02, 1.976603e+02, 2.097113e+02, 2.225783e+02, 2.364088e+02, 2.513701e+02, 2.676542e+02, 2.854836e+02, 3.051192e+02, 3.268679e+02, 3.510934e+02, 3.782275e+02, 4.087846e+02, 4.433777e+02, 4.827367e+02, 5.277280e+02, 5.793729e+02, 6.388626e+02, 7.075633e+02, 7.870025e+02, 8.788252e+02, 9.847059e+02, 1.106204e+03, 1.244567e+03, 1.400497e+03, 1.573946e+03, 1.764003e+03, 1.968944e+03, 2.186457e+03, 2.413972e+03, 2.649001e+03, 2.889385e+03, 3.133405e+03, 3.379794e+03, 3.627671e+03, 3.876452e+03, 4.125768e+03, 4.375393e+03, 4.625191e+03, 4.875084e+03, 5.125028e+03, 5.375000e+03])
array([cftime.DatetimeNoLeap(1850, 1, 16, 12, 0, 0, 0), cftime.DatetimeNoLeap(1850, 2, 15, 0, 0, 0, 0), cftime.DatetimeNoLeap(1850, 3, 16, 12, 0, 0, 0), ..., cftime.DatetimeNoLeap(2014, 10, 16, 12, 0, 0, 0), cftime.DatetimeNoLeap(2014, 11, 16, 0, 0, 0, 0), cftime.DatetimeNoLeap(2014, 12, 16, 12, 0, 0, 0)], dtype=object)
I have tested this with 0.19.0
and both work. Once I comment out the lines mentioned in the cluster configuration, both will fail!