For this example, we'll open a 9 members of a 100 member ensemble of precipitation and temperature data. The analysis we do below is quite simple but the problem is a good illustration of an IO bound task.
Link to dataset: https://www.earthsystemgrid.org/dataset/gridded_precip_and_temp.html
%matplotlib inline
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
from dask.distributed import Client, progress
# HPC
# client = Client(scheduler_file='/glade/scratch/jhamman/scheduler.json')
# client
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)
cluster
client = Client(cluster)
client
We try storing a traditional atmospheric dataset on the cloud with two approaches
The dataset has dimensions of time, latitude, longitude, and ensmemble member. Each format is self-describing.
### Load with FUSE File system
# ds = xr.open_mfdataset('/gcs/newmann-met-ensemble-netcdf/conus_ens_00*.nc',
# engine='netcdf4', concat_dim='ensemble', chunks={'time': 50})
### Load with Cloud object storage
import gcsfs
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='anon', access='read_only')
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/gmet_v1.zarr',
gcs=fs, check=False, create=False)
ds = xr.open_zarr(gcsmap)
### Load with intake catalog service
# import intake
# cat = intake.Catalog('https://raw.githubusercontent.com/pangeo-data/pangeo/master/gce/catalog.yaml')
# ds = cat.gmet_v1.read_chunked()
# Print dataset
ds
A quick plot of the mask to give us an idea of our spatial domain
if 'ensemble' in ds['elevation'].dims:
elevation = ds['elevation'].isel(ensemble=0).persist()
else:
elevation = ds['elevation'].persist()
elevation = elevation.where(elevation > 0)
elevation.plot(figsize=(10, 6))
plt.title('Domain Elevation')
We calculate the intra-ensemble range for all the mean daily temperature in this dataset. This gives us a sense of uncertainty.
temp_mean = ds['t_mean'].mean(dim='time')
spread = (temp_mean.max(dim='ensemble')
- temp_mean.min(dim='ensemble'))
spread
The expressions above didn't actually compute anything. They just build the task graph. To do the computations, we call the compute
or persist
methods:
spread = spread.persist()
progress(spread)
spread
spread.attrs['units'] = 'degC'
spread.plot(robust=True, figsize=(10, 6))
plt.title('Intra-ensemble range in mean annual temperature')
We can compute a crude estimate of average seasonal snowfall using the temperature and precipitation variables in our dataset. Here, we'll look at the first 4 ensemble members and make some maps of the seasonal total snowfall in each ensemble member.
da_snow = ds['pcp'].where(ds['t_mean'] < 0.).resample(time='QS-Mar').sum('time')
seasonal_snow = da_snow.isel(ensemble=slice(0, 4)).groupby('time.season').mean('time').persist()
progress(seasonal_snow)
# properly sort the seasons
seasonal_snow = seasonal_snow.sel(season=['DJF', 'MAM','JJA', 'SON'])
seasonal_snow.attrs['units'] = 'mm/season'
seasonal_snow
seasonal_snow.plot.pcolormesh(col='season', row='ensemble', cmap='Blues', robust=True)
In the previous two examples, we've mostly reduced the time and/or ensemble dimension. Here, we'll do a reduction operation on the spatial dimension to look at a time series of extreme precipitation events near Austin, TX (30.2672° N, 97.7431° W).
buf = 0.25 # look at Austin +/- 0.25 deg
ds_tx = ds.sel(lon=slice(-97.7431-buf, -97.7431+buf), lat=slice(30.2672-buf, 30.2672+buf))
pcp_ann_max = ds_tx['pcp'].resample(time='AS').max('time')
pcp_ann_max_ts = pcp_ann_max.max(('lat', 'lon')).persist()
progress(pcp_ann_max_ts)
ax = pcp_ann_max_ts.transpose().to_pandas().plot(title='Maximum precipitation near Austin, Tx', legend=False)