Compute and visualize the mean annual river discharge from the National Water Model v2 from 2.7 million rivers in minutes using Pangeo.
import xarray as xr
import fsspec
import numpy as np
import hvplot.pandas
import hvplot.xarray
import geoviews as gv
from holoviews.operation.datashader import rasterize
import cartopy.crs as ccrs
from dask_gateway import Gateway
from dask.distributed import Client
gateway = Gateway()
gateway.list_clusters()
if gateway.list_clusters():
print('Existing Dask clusters:')
for c in gateway.list_clusters():
print('Cluster Name:',c.name,c.status)
# New or connect:
# If no cluster is running, create a new one, else connect to the first one found (idx=0, change if other cluster should be running)
idx=0
if not gateway.list_clusters():
print('No Cluster running... starting new')
cluster = gateway.new_cluster(environment='default', profile='Pangeo Worker')
else:
print('Found Clusters running... connecting to first ')
cluster=gateway.connect(gateway.list_clusters()[idx].name)
No Cluster running... starting new
#from dask.distributed import Client, progress
#from dask_kubernetes import KubeCluster
#cluster = KubeCluster()
#cluster.adapt(minimum=4, maximum=20);
cluster.scale(20)
cluster
VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n …
client = Client(cluster)
%%time
ds = xr.open_zarr(fsspec.get_mapper('s3://noaa-nwm-retro-v2.0-zarr-pds/',
anon=True), consolidated=True)
CPU times: user 2.51 s, sys: 92.9 ms, total: 2.6 s Wall time: 1min 7s
ds.streamflow
<xarray.DataArray 'streamflow' (time: 227904, feature_id: 2729077)> dask.array<xarray-streamflow, shape=(227904, 2729077), dtype=float64, chunksize=(672, 30000), chunktype=numpy.ndarray> Coordinates: * feature_id (feature_id) int32 101 179 181 ... 1180001803 1180001804 latitude (feature_id) float32 dask.array<chunksize=(2729077,), meta=np.ndarray> longitude (feature_id) float32 dask.array<chunksize=(2729077,), meta=np.ndarray> * time (time) datetime64[ns] 1993-01-01 ... 2018-12-31T23:00:00 Attributes: grid_mapping: crs long_name: River Flow units: m3 s-1 valid_range: [0, 50000000]
|
array([ 101, 179, 181, ..., 1180001802, 1180001803, 1180001804], dtype=int32)
|
|
array(['1993-01-01T00:00:00.000000000', '1993-01-01T01:00:00.000000000', '1993-01-01T02:00:00.000000000', ..., '2018-12-31T21:00:00.000000000', '2018-12-31T22:00:00.000000000', '2018-12-31T23:00:00.000000000'], dtype='datetime64[ns]')
lat = ds['latitude'].values
lon = ds['longitude'].values
print(lat.max(), lon.max())
52.86352 -66.99203
Let's find the site with the largest streamflow on June 1, 2017
imax = ds.streamflow.sel(time='2017-06-01 00:00:00').argmax().values
ds.feature_id[imax].values
array(19085497, dtype=int32)
Let's plot the whole year-long time series at that location
%%time
ds.streamflow[:,imax].hvplot(grid=True)
CPU times: user 195 ms, sys: 40.1 ms, total: 235 ms Wall time: 15.8 s
var='streamflow'
ds[var].nbytes/1e9
4975.740516864
%%time
var_mean = ds[var].sel(time=slice('2017-01-01','2017-12-31')).mean(dim='time').compute()
CPU times: user 385 ms, sys: 73.8 ms, total: 459 ms Wall time: 1min 33s
df = var_mean.to_pandas().to_frame()
df = df.assign(latitude=lat)
df = df.assign(longitude=lon)
df.rename(columns={0: "transport"}, inplace=True)
p = df.hvplot.points('longitude', 'latitude', crs=ccrs.PlateCarree(),
c='transport', colorbar=True, size=14)
g = rasterize(p, aggregator='mean', x_sampling=0.02, y_sampling=0.02, width=500).opts(tools=['hover'],
aspect='equal', logz=True, cmap='viridis', clim=(1e-2, np.nan))
g * gv.tile_sources.OSM
import xarray;print(xarray.__version__)
import zarr;print(zarr.__version__)
import fsspec;print(fsspec.__version__)
import s3fs;print(s3fs.__version__)
0.16.2 2.6.1 0.8.4 0.4.2
client.close(); cluster.shutdown()