Use Xarray, Dask and hvPlot from the HoloViz tool suite to explore the National Water Modle Reanalysis Version 2. We read from a cloud-optimized Zarr dataset that is part of the AWS Open Data Program, and we use a Dask cluster to parallelize computation and reading of data chunks.
import xarray as xr
import fsspec
import numpy as np
import hvplot.pandas
import hvplot.xarray
import geoviews as gv
from holoviews.operation.datashader import rasterize
import cartopy.crs as ccrs
This is not required, but speeds up computations. Here we start a local cluster that just uses the cores available on the computer running the notebook server, but there are many other ways to set up Dask clusters that can scale larger than this.
For example, if you use Qhub to install JuptyerHub with a Dask Gateway running on Kubernetes, you could start a cluster (with a specified environment and worker profile), scale it, and connect to it thusly:
from dask_gateway import Gateway
from dask.distributed import Client
gateway = Gateway()
# see Gateway options to use in new_cluster by doing: gateway.cluster_options()
cluster = gateway.new_cluster(environment='pangeo', profile='Pangeo Worker')
cluster.scale(20)
client = Client(cluster)
cluster
#client.close();cluster.shutdown() # shutdown client and cluster
from dask.distributed import Client
client = Client()
client
Client
|
Cluster
|
Open Zarr datasets in Xarray using a mapper from fsspec. We use anon=True
for free-access public buckets like the AWS Open Data Program, and requester_pays=True
for requester-pays public buckets.
url = 's3://noaa-nwm-retro-v2-zarr-pds'
%%time
ds = xr.open_zarr(fsspec.get_mapper(url, anon=True), consolidated=True)
CPU times: user 1.5 s, sys: 91.5 ms, total: 1.59 s Wall time: 3.77 s
var='streamflow'
ds[var]
<xarray.DataArray 'streamflow' (time: 227904, feature_id: 2729077)> dask.array<xarray-streamflow, shape=(227904, 2729077), dtype=float64, chunksize=(672, 30000), chunktype=numpy.ndarray> Coordinates: * feature_id (feature_id) int32 101 179 181 ... 1180001803 1180001804 latitude (feature_id) float32 dask.array<chunksize=(2729077,), meta=np.ndarray> longitude (feature_id) float32 dask.array<chunksize=(2729077,), meta=np.ndarray> * time (time) datetime64[ns] 1993-01-01 ... 2018-12-31T23:00:00 Attributes: grid_mapping: crs long_name: River Flow units: m3 s-1 valid_range: [0, 50000000]
|
array([ 101, 179, 181, ..., 1180001802, 1180001803, 1180001804], dtype=int32)
|
|
array(['1993-01-01T00:00:00.000000000', '1993-01-01T01:00:00.000000000', '1993-01-01T02:00:00.000000000', ..., '2018-12-31T21:00:00.000000000', '2018-12-31T22:00:00.000000000', '2018-12-31T23:00:00.000000000'], dtype='datetime64[ns]')
print(f'Variable size: {ds[var].nbytes/1e12:.1f} TB')
Variable size: 5.0 TB
%%time
imax = ds[var].sel(time='2017-06-01 00:00:00').argmax().values
CPU times: user 626 ms, sys: 75.6 ms, total: 702 ms Wall time: 16.3 s
Let's plot the whole hindcast time series at that location
%%time
ds[var][:,imax].hvplot(grid=True)
CPU times: user 1.82 s, sys: 181 ms, total: 2 s Wall time: 38.2 s
streamflow_April_2010 = ds[var].sel(time=slice('2010-04-01 00:00','2010-04-30 23:00'))
print(f'Variable size: {streamflow_April_2010.nbytes/1e9:.1f} GB')
Variable size: 15.7 GB
%%time
var_mean = streamflow_April_2010.mean(dim='time').compute()
CPU times: user 2.61 s, sys: 297 ms, total: 2.91 s Wall time: 51.4 s
Convert Xarray to Pandas dataframe so we can use hvplot.points for visualization
df = var_mean.to_pandas().to_frame()
The dataframe just has streamflow, so add longitude and latitude as columns
df = df.assign(latitude=ds['latitude'])
df = df.assign(longitude=ds['longitude'])
df.rename(columns={0: "transport"}, inplace=True)
p = df.hvplot.points('longitude', 'latitude', crs=ccrs.PlateCarree(),
c='transport', colorbar=True, size=14)
We don't want to plot all the 2.7M points individually, so aggregate to 0.02 degree resolution and rasterize with datashader. Use a log scale for visualization since there is a large dynamic range in streamflow.
g = rasterize(p, aggregator='mean', x_sampling=0.02, y_sampling=0.02, width=500).opts(tools=['hover'],
aspect='equal', logz=True, cmap='viridis', clim=(1e-2, np.nan))
Plot the rasterized streamflow data on an OpenStreetMap tile service basemap
g * gv.tile_sources.OSM
#client.close(); cluster.close()