Use Xarray, Dask and hvPlot from the HoloViz tool suite to explore the National Water Modle Reanalysis Version 2. We read from a cloud-optimized Zarr dataset that is part of the AWS Open Data Program, and we use a Dask cluster to parallelize computation and reading of data chunks.
import xarray as xr
import fsspec
import numpy as np
import hvplot.pandas
import hvplot.xarray
import geoviews as gv
from holoviews.operation.datashader import rasterize
import cartopy.crs as ccrs
This is not required, but speeds up computations. Once can start a local cluster by just doing:
from dask.distributed import Client
client = Client()
but there are many other ways to set up Dask clusters that can scale larger than this.
Since we used Qhub to install JupyterHub with a Dask Gateway running on Kubernetes, we can start a Dask cluster (with a specified environment and worker profile), scale it, and connect to it thusly:
from dask_gateway import Gateway
from dask.distributed import Client
gateway = Gateway()
# see Gateway options to use in new_cluster by doing: gateway.cluster_options()
cluster = gateway.new_cluster(environment='pangeo', profile='Pangeo Worker')
cluster.scale(20)
client = Client(cluster)
cluster
#client.close();cluster.shutdown() # shutdown client and cluster
VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n …
Open Zarr datasets in Xarray using a mapper from fsspec. We use anon=True
for free-access public buckets like the AWS Open Data Program, and requester_pays=True
for requester-pays public buckets.
url = 's3://noaa-nwm-retro-v2-zarr-pds'
%%time
ds = xr.open_zarr(fsspec.get_mapper(url, anon=True), consolidated=True)
CPU times: user 1.44 s, sys: 122 ms, total: 1.56 s Wall time: 3.77 s
var='streamflow'
ds[var]
<xarray.DataArray 'streamflow' (time: 227904, feature_id: 2729077)> dask.array<xarray-streamflow, shape=(227904, 2729077), dtype=float64, chunksize=(672, 30000), chunktype=numpy.ndarray> Coordinates: * feature_id (feature_id) int32 101 179 181 ... 1180001803 1180001804 latitude (feature_id) float32 dask.array<chunksize=(2729077,), meta=np.ndarray> longitude (feature_id) float32 dask.array<chunksize=(2729077,), meta=np.ndarray> * time (time) datetime64[ns] 1993-01-01 ... 2018-12-31T23:00:00 Attributes: grid_mapping: crs long_name: River Flow units: m3 s-1 valid_range: [0, 50000000]
|
array([ 101, 179, 181, ..., 1180001802, 1180001803, 1180001804], dtype=int32)
|
|
array(['1993-01-01T00:00:00.000000000', '1993-01-01T01:00:00.000000000', '1993-01-01T02:00:00.000000000', ..., '2018-12-31T21:00:00.000000000', '2018-12-31T22:00:00.000000000', '2018-12-31T23:00:00.000000000'], dtype='datetime64[ns]')
print(f'Variable size: {ds[var].nbytes/1e12:.1f} TB')
Variable size: 5.0 TB
%%time
imax = ds[var].sel(time='2017-06-01 00:00:00').argmax().values
CPU times: user 64.7 ms, sys: 728 µs, total: 65.4 ms Wall time: 11.7 s
Let's plot the whole hindcast time series at that location
%%time
ds[var][:,imax].hvplot(grid=True)
CPU times: user 164 ms, sys: 32.4 ms, total: 196 ms Wall time: 9.61 s
streamflow_April_2010 = ds[var].sel(time=slice('2010-04-01 00:00','2010-04-30 23:00'))
print(f'Variable size: {streamflow_April_2010.nbytes/1e9:.1f} GB')
Variable size: 15.7 GB
%%time
var_mean = streamflow_April_2010.mean(dim='time').compute()
CPU times: user 151 ms, sys: 44.2 ms, total: 195 ms Wall time: 12.8 s
Convert Xarray to Pandas dataframe so we can use hvplot.points for visualization
df = var_mean.to_pandas().to_frame()
The dataframe just has streamflow, so add longitude and latitude as columns
df = df.assign(latitude=ds['latitude'])
df = df.assign(longitude=ds['longitude'])
df.rename(columns={0: "transport"}, inplace=True)
p = df.hvplot.points('longitude', 'latitude', crs=ccrs.PlateCarree(),
c='transport', colorbar=True, size=14)
We don't want to plot all the 2.7M points individually, so aggregate to 0.02 degree resolution and rasterize with datashader. Use a log scale for visualization since there is a large dynamic range in streamflow.
g = rasterize(p, aggregator='mean', x_sampling=0.02, y_sampling=0.02, width=500).opts(tools=['hover'],
aspect='equal', logz=True, cmap='viridis', clim=(1e-2, np.nan))
Plot the rasterized streamflow data on an OpenStreetMap tile service basemap
g * gv.tile_sources.OSM
# client.close(); cluster.shutdown()