#!/usr/bin/env python # coding: utf-8 # Analysis of Gridded Ensemble Precipitation and Temperature Estimates over the Contiguous United States # ==== # # For this example, we'll open a 9 members of a 100 member ensemble of precipitation and temperature data. The analysis we do below is quite simple but the problem is a good illustration of an IO bound task. # # Link to dataset: https://www.earthsystemgrid.org/dataset/gridded_precip_and_temp.html # In[ ]: get_ipython().run_line_magic('matplotlib', 'inline') import numpy as np import xarray as xr import matplotlib.pyplot as plt # ### Connect to Dask Distributed Cluster # In[ ]: from dask.distributed import Client, progress # HPC # client = Client(scheduler_file='/glade/scratch/jhamman/scheduler.json') # client from dask_kubernetes import KubeCluster cluster = KubeCluster(n_workers=10) cluster # In[ ]: client = Client(cluster) client # ### Open Dataset # # We try storing a traditional atmospheric dataset on the cloud with two approaches # # 1. Hosting raw netCDF files from FUSE mounted file system based on the [gcsfs](gcsfs.readthedocs.io/en/latest/) Python project # 2. Storing the data in an easier-to-read but new format, [Zarr](http://zarr.readthedocs.io/en/stable/) # 3. Loading an [Intake](https://intake.readthedocs.io/en/latest/) catalog file from GCS, specifying the location of the zarr data. # # The dataset has dimensions of time, latitude, longitude, and ensmemble member. Each format is self-describing. # In[ ]: ### Load with FUSE File system # ds = xr.open_mfdataset('/gcs/newmann-met-ensemble-netcdf/conus_ens_00*.nc', # engine='netcdf4', concat_dim='ensemble', chunks={'time': 50}) ### Load with Cloud object storage import gcsfs fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='anon', access='read_only') gcsmap = gcsfs.mapping.GCSMap('pangeo-data/gmet_v1.zarr', gcs=fs, check=False, create=False) ds = xr.open_zarr(gcsmap) ### Load with intake catalog service # import intake # cat = intake.Catalog('https://raw.githubusercontent.com/pangeo-data/pangeo/master/gce/catalog.yaml') # ds = cat.gmet_v1.read_chunked() # In[ ]: # Print dataset ds # ### Figure: Elevation and domain mask # # A quick plot of the mask to give us an idea of our spatial domain # In[ ]: if 'ensemble' in ds['elevation'].dims: elevation = ds['elevation'].isel(ensemble=0).persist() else: elevation = ds['elevation'].persist() elevation = elevation.where(elevation > 0) elevation.plot(figsize=(10, 6)) plt.title('Domain Elevation') # ### Intra-ensemble range # # We calculate the intra-ensemble range for all the mean daily temperature in this dataset. This gives us a sense of uncertainty. # In[ ]: temp_mean = ds['t_mean'].mean(dim='time') spread = (temp_mean.max(dim='ensemble') - temp_mean.min(dim='ensemble')) spread # ### Calling compute # The expressions above didn't actually compute anything. They just build the task graph. To do the computations, we call the `compute` or `persist` methods: # In[ ]: spread = spread.persist() progress(spread) # #### Figure: Intra-ensemble range # # In[ ]: spread spread.attrs['units'] = 'degC' # In[ ]: spread.plot(robust=True, figsize=(10, 6)) plt.title('Intra-ensemble range in mean annual temperature') # ### Average seasonal snowfall # # We can compute a crude estimate of average seasonal snowfall using the temperature and precipitation variables in our dataset. Here, we'll look at the first 4 ensemble members and make some maps of the seasonal total snowfall in each ensemble member. # In[ ]: da_snow = ds['pcp'].where(ds['t_mean'] < 0.).resample(time='QS-Mar').sum('time') seasonal_snow = da_snow.isel(ensemble=slice(0, 4)).groupby('time.season').mean('time').persist() progress(seasonal_snow) # In[ ]: # properly sort the seasons seasonal_snow = seasonal_snow.sel(season=['DJF', 'MAM','JJA', 'SON']) seasonal_snow.attrs['units'] = 'mm/season' seasonal_snow # #### Figure: Average seasonal snowfall totals # In[ ]: seasonal_snow.plot.pcolormesh(col='season', row='ensemble', cmap='Blues', robust=True) # ### Extract a time series of annual maximum precipitation events over a region # # In the previous two examples, we've mostly reduced the time and/or ensemble dimension. Here, we'll do a reduction operation on the spatial dimension to look at a time series of extreme precipitation events near Austin, TX (30.2672° N, 97.7431° W). # In[ ]: buf = 0.25 # look at Austin +/- 0.25 deg ds_tx = ds.sel(lon=slice(-97.7431-buf, -97.7431+buf), lat=slice(30.2672-buf, 30.2672+buf)) # In[ ]: pcp_ann_max = ds_tx['pcp'].resample(time='AS').max('time') # In[ ]: pcp_ann_max_ts = pcp_ann_max.max(('lat', 'lon')).persist() progress(pcp_ann_max_ts) # #### Figure: Timeseries of maximum precipitation near Austin, Tx. # In[ ]: ax = pcp_ann_max_ts.transpose().to_pandas().plot(title='Maximum precipitation near Austin, Tx', legend=False) # In[ ]: