For this example, we'll open a 9 members of a 100 member ensemble of precipitation and temperature data. The analysis we do below is quite simple but the problem is a good illustration of an IO bound task.
Link to dataset: https://www.earthsystemgrid.org/dataset/gridded_precip_and_temp.html
%matplotlib inline
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
from dask.distributed import Client, progress
# HPC
# client = Client(scheduler_file='/glade/scratch/jhamman/scheduler.json')
# client
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)
cluster
VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n <style scoped>\n .…
client = Client(cluster)
client
Client
|
Cluster
|
We try storing a traditional atmospheric dataset on the cloud with two approaches
The dataset has dimensions of time, latitude, longitude, and ensmemble member. Each format is self-describing.
### Load with FUSE File system
ds = xr.open_mfdataset('/gcs/newmann-met-ensemble-netcdf/conus_ens_00*.nc',
engine='netcdf4', concat_dim='ensemble', chunks={'time': 50})
### Load with Cloud object storage
import gcsfs
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='anon', access='read_only')
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/newman-met-ensemble',
gcs=fs, check=False, create=False)
#ds = xr.open_zarr(gcsmap)
### Load with intake catalog service
# import intake
# cat = intake.Catalog('https://raw.githubusercontent.com/pangeo-data/pangeo/master/gce/catalog.yaml')
# ds = cat.newmann_zarr.read_chunked()
# Print dataset
ds
<xarray.Dataset> Dimensions: (ensemble: 9, lat: 224, lon: 464, time: 12054) Coordinates: * time (time) datetime64[ns] 1980-01-01 1980-01-02 1980-01-03 ... * lon (lon) float64 -124.9 -124.8 -124.7 -124.6 -124.4 -124.3 ... * lat (lat) float64 25.06 25.19 25.31 25.44 25.56 25.69 25.81 25.94 ... Dimensions without coordinates: ensemble Data variables: elevation (ensemble, lat, lon) float64 dask.array<shape=(9, 224, 464), chunksize=(1, 224, 464)> pcp (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 50, 224, 464)> t_mean (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 50, 224, 464)> t_range (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 50, 224, 464)> mask (ensemble, lat, lon) int32 dask.array<shape=(9, 224, 464), chunksize=(1, 224, 464)> t_max (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 50, 224, 464)> t_min (ensemble, time, lat, lon) float64 dask.array<shape=(9, 12054, 224, 464), chunksize=(1, 50, 224, 464)> Attributes: history: Version 1.0 of ensemble dataset, created Decem... nco_openmp_thread_number: 1 institution: National Center for Atmospheric Research (NCAR... title: CONUS daily 12-km gridded ensemble precipitati... source: Generated using version 1.0 of CONUS ensemble ... references: Newman et al. 2015: Gridded Ensemble Precipita...
A quick plot of the mask to give us an idea of our spatial domain
elevation = ds['elevation'].isel(ensemble=0).persist()
elevation = elevation.where(elevation > 0)
elevation.plot(figsize=(10, 6))
plt.title('Domain Elevation')
Text(0.5,1,'Domain Elevation')
We calculate the intra-ensemble range for all the mean daily temperature in this dataset. This gives us a sense of uncertainty.
temp_mean = ds['t_mean'].mean(dim='time')
spread = (temp_mean.max(dim='ensemble')
- temp_mean.min(dim='ensemble'))
spread
<xarray.DataArray 't_mean' (lat: 224, lon: 464)> dask.array<shape=(224, 464), dtype=float64, chunksize=(224, 464)> Coordinates: * lon (lon) float64 -124.9 -124.8 -124.7 -124.6 -124.4 -124.3 -124.2 ... * lat (lat) float64 25.06 25.19 25.31 25.44 25.56 25.69 25.81 25.94 ...
The expressions above didn't actually compute anything. They just build the task graph. To do the computations, we call the compute
or persist
methods:
spread = spread.persist()
progress(spread)
VBox()
spread.plot(robust=True, figsize=(10, 6))
plt.title('Intra-ensemble range in mean annual temperature')
--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) <ipython-input-9-2ee9827831a1> in <module>() ----> 1 spread.plot(robust=True, figsize=(10, 6)) 2 plt.title('Intra-ensemble range in mean annual temperature') /opt/conda/lib/python3.6/site-packages/xarray/plot/plot.py in __call__(self, **kwargs) 372 373 def __call__(self, **kwargs): --> 374 return plot(self._da, **kwargs) 375 376 @functools.wraps(hist) /opt/conda/lib/python3.6/site-packages/xarray/plot/plot.py in plot(darray, row, col, col_wrap, ax, rtol, subplot_kws, **kwargs) 155 kwargs['ax'] = ax 156 --> 157 return plotfunc(darray, **kwargs) 158 159 /opt/conda/lib/python3.6/site-packages/xarray/plot/plot.py in newplotfunc(darray, x, y, figsize, size, aspect, ax, row, col, col_wrap, xincrease, yincrease, add_colorbar, add_labels, vmin, vmax, cmap, center, robust, extend, levels, infer_intervals, colors, subplot_kws, cbar_ax, cbar_kwargs, **kwargs) 607 608 # Pass the data as a masked ndarray too --> 609 zval = darray.to_masked_array(copy=False) 610 611 _ensure_plottable(xval, yval) /opt/conda/lib/python3.6/site-packages/xarray/core/dataarray.py in to_masked_array(self, copy) 1453 Masked where invalid values (nan or inf) occur. 1454 """ -> 1455 isnull = pd.isnull(self.values) 1456 return np.ma.MaskedArray(data=self.values, mask=isnull, copy=copy) 1457 /opt/conda/lib/python3.6/site-packages/xarray/core/dataarray.py in values(self) 402 def values(self): 403 """The array's data as a numpy.ndarray""" --> 404 return self.variable.values 405 406 @values.setter /opt/conda/lib/python3.6/site-packages/xarray/core/variable.py in values(self) 385 def values(self): 386 """The variable's data as a numpy.ndarray""" --> 387 return _as_array_or_item(self._data) 388 389 @values.setter /opt/conda/lib/python3.6/site-packages/xarray/core/variable.py in _as_array_or_item(data) 209 TODO: remove this (replace with np.asarray) once these issues are fixed 210 """ --> 211 data = np.asarray(data) 212 if data.ndim == 0: 213 if data.dtype.kind == 'M': /opt/conda/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 490 491 """ --> 492 return array(a, dtype, copy=False, order=order) 493 494 /opt/conda/lib/python3.6/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs) 1190 1191 def __array__(self, dtype=None, **kwargs): -> 1192 x = self.compute() 1193 if dtype and x.dtype != dtype: 1194 x = x.astype(dtype) /opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs) 152 dask.base.compute 153 """ --> 154 (result,) = compute(self, traverse=False, **kwargs) 155 return result 156 /opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs) 405 keys = [x.__dask_keys__() for x in collections] 406 postcomputes = [x.__dask_postcompute__() for x in collections] --> 407 results = get(dsk, keys, **kwargs) 408 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 409 /opt/conda/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs) 2096 try: 2097 results = self.gather(packed, asynchronous=asynchronous, -> 2098 direct=direct) 2099 finally: 2100 for f in futures.values(): /opt/conda/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous) 1506 return self.sync(self._gather, futures, errors=errors, 1507 direct=direct, local_worker=local_worker, -> 1508 asynchronous=asynchronous) 1509 1510 @gen.coroutine /opt/conda/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs) 613 return future 614 else: --> 615 return sync(self.loop, func, *args, **kwargs) 616 617 def __repr__(self): /opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs) 249 else: 250 while not e.is_set(): --> 251 e.wait(10) 252 if error[0]: 253 six.reraise(*error[0]) /opt/conda/lib/python3.6/threading.py in wait(self, timeout) 549 signaled = self._flag 550 if not signaled: --> 551 signaled = self._cond.wait(timeout) 552 return signaled 553 /opt/conda/lib/python3.6/threading.py in wait(self, timeout) 297 else: 298 if timeout > 0: --> 299 gotit = waiter.acquire(True, timeout) 300 else: 301 gotit = waiter.acquire(False) KeyboardInterrupt:
We can compute a crude estimate of average seasonal snowfall using the temperature and precipitation variables in our dataset. Here, we'll look at the first 4 ensemble members and make some maps of the seasonal total snowfall in each ensemble member.
da_snow = ds['pcp'].where(ds['t_mean'] < 0.).resample(time='QS-Mar').sum('time')
seasonal_snow = da_snow.isel(ensemble=slice(0, 4)).groupby('time.season').mean('time').persist()
progress(seasonal_snow)
# properly sort the seasons
seasonal_snow = seasonal_snow.sel(season=['DJF', 'MAM','JJA', 'SON'])
# units here are in mm/season
seasonal_snow.plot.pcolormesh(col='season', row='ensemble', cmap='Blues', robust=True)
In the previous two examples, we've mostly reduced the time and/or ensemble dimension. Here, we'll do a reduction operation on the spatial dimension to look at a time series of extreme precipitation events near Austin, TX (30.2672° N, 97.7431° W).
buf = 0.25 # look at Austin +/- 0.25 deg
ds_tx = ds.sel(lon=slice(-97.7431-buf, -97.7431+buf), lat=slice(30.2672-buf, 30.2672+buf))
pcp_ann_max = ds_tx['pcp'].resample(time='AS').max('time')
pcp_ann_max_ts = pcp_ann_max.max(('lat', 'lon')).persist()
progress(pcp_ann_max_ts)
ax = pcp_ann_max_ts.transpose().to_pandas().plot(title='Maximum precipitation near Austin, Tx')