import xarray as xr
import numpy as np
import pandas as pd
import fsspec
fs = fsspec.filesystem('s3', anon=False, requester_pays=True)
fs_osn = fsspec.filesystem('s3', anon=True, client_kwargs=dict(endpoint_url='https://ncsa.osn.xsede.org'))
ncfile_on_s3 = 's3://floodid-louisiana-model-data/gstofs/sample_run/fort.63.nc'
fs.size(ncfile_on_s3)/1e9 #GB
2.781632181
%%time
ds = xr.open_dataset(fs.open(ncfile_on_s3), drop_variables=['nvel'], chunks={'time':1, 'node':511400})
CPU times: user 2.88 s, sys: 196 ms, total: 3.07 s Wall time: 5.12 s
ds.zeta.encoding
{'chunksizes': (1, 511400), 'fletcher32': False, 'shuffle': True, 'zlib': True, 'complevel': 2, 'source': '<File-like object S3FileSystem, floodid-louisiana-model-data/gstofs/sample_run/fort.63.nc>', 'original_shape': (40, 12784991), 'dtype': dtype('<f8'), '_FillValue': -99999.0, 'coordinates': 'time y x'}
ds.zeta
<xarray.DataArray 'zeta' (time: 40, node: 12784991)> dask.array<open_dataset-9d8ed5545d2239c017b6d200f9ff4ff0zeta, shape=(40, 12784991), dtype=float64, chunksize=(1, 511400), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 2016-09-01T12:00:00 2016-09-02 ... 2016-09-21 x (node) float64 dask.array<chunksize=(511400,), meta=np.ndarray> y (node) float64 dask.array<chunksize=(511400,), meta=np.ndarray> Dimensions without coordinates: node Attributes: long_name: water surface elevation above geoid standard_name: sea_surface_height_above_geoid location: node mesh: adcirc_mesh units: m
#fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True, client_kwargs={'endpoint_url': 'https://renc.osn.xsede.org'})
#ds = xr.open_dataset(fs.get_mapper('s3://rsignellbucket2/esip/adcirc/ike'), engine='zarr',
# backend_kwargs=dict(consolidated=False), chunks={'time':90})
#fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True, client_kwargs={'endpoint_url': 'https://mghp.osn.xsede.org'})
#ds = xr.open_dataset(fs.get_mapper('s3://rsignellbucket1/esip/adcirc/ike'), engine='zarr',
# backend_kwargs=dict(consolidated=False), chunks={'time':90})
ds['zeta']
<xarray.DataArray 'zeta' (time: 40, node: 12784991)> dask.array<open_dataset-9d8ed5545d2239c017b6d200f9ff4ff0zeta, shape=(40, 12784991), dtype=float64, chunksize=(1, 511400), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 2016-09-01T12:00:00 2016-09-02 ... 2016-09-21 x (node) float64 dask.array<chunksize=(511400,), meta=np.ndarray> y (node) float64 dask.array<chunksize=(511400,), meta=np.ndarray> Dimensions without coordinates: node Attributes: long_name: water surface elevation above geoid standard_name: sea_surface_height_above_geoid location: node mesh: adcirc_mesh units: m
How many GB of sea surface height data do we have?
ds['zeta'].nbytes/1.e9
4.09119712
#from dask.distributed import LocalCluster, Client
#cluster = LocalCluster()
#client = Client(cluster)
#client
#client.close()
We want to take the maximum over the time dimension. Let's use a Dask cluster to distribute the memory and compute load, getting our work done faster!
import os
import configparser
def set_credentials(profile='default', region='us-west-2', endpoint=None, cfile=None):
# Set AWS environment variables based on profile from the user credentials file
cp = configparser.ConfigParser()
if not cfile:
cfile=os.path.expanduser('~/.aws/credentials')
if not endpoint:
endpoint=f's3.{region}.amazonaws.com'
cp.read(cfile)
os.environ['aws_access_key_id'.upper()] = cp[profile]['aws_access_key_id']
os.environ['aws_secret_access_key'.upper()] = cp[profile]['aws_secret_access_key']
os.environ['aws_s3_endpoint'.upper()] = endpoint
os.environ['aws_region'.upper()] = region
set_credentials()
os.environ.update()
from dask_gateway import Gateway
import os
# instantiate dask gateway
gateway = Gateway()
# specify cluster options
options = gateway.cluster_options()
options.conda_environment='users/users-pangeo'
options.profile = 'Medium Worker' # 2 threads/worker
# set environment variables for cluster workers (including AWS credential environment variables)
options.environment_vars = dict(os.environ)
# create new cluster
cluster = gateway.new_cluster(options)
# get the client for the cluster
client = cluster.get_client()
%%time
n_workers=30
cluster.scale(n_workers)
#client.wait_for_workers(n_workers=n_workers)
CPU times: user 2.43 ms, sys: 0 ns, total: 2.43 ms Wall time: 6.34 ms
client
Client-81ec0e33-56f8-11ee-828e-b6b13b65153d
Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
Dashboard: https://nebari.esipfed.org/gateway/clusters/dev.bdfaccdf504b4547bb4a802489537821/status |
#client.close()
This is the compute intensive step, reading all the elevation data
%%time
max_var = ds['zeta'].max(dim='time').compute()
max_var
CPU times: user 609 ms, sys: 416 ms, total: 1.02 s Wall time: 2min
<xarray.DataArray 'zeta' (node: 12784991)> array([nan, nan, nan, ..., nan, nan, nan]) Coordinates: x (node) float64 -73.68 -73.68 -73.68 -73.68 ... -90.07 -90.07 -90.07 y (node) float64 42.75 42.75 42.75 42.75 ... 30.03 30.03 30.03 30.03 Dimensions without coordinates: node
import numpy as np
import geoviews as gv
import hvplot.xarray
import holoviews.operation.datashader as dshade
dshade.datashade.precompute = True
#max_var = ds['zeta'].isel(time=20)
%%time
v = np.vstack((ds['x'], ds['y'], max_var)).T
verts = pd.DataFrame(v, columns=['x','y','vmax'])
points = gv.operation.project_points(gv.Points(verts, vdims=['vmax']))
tris = pd.DataFrame(ds['element'].values.astype('int')-1, columns=['v0','v1','v2'])
CPU times: user 4.47 s, sys: 1.23 s, total: 5.7 s Wall time: 25.5 s
tiles = gv.tile_sources.OSM
trimesh = gv.TriMesh((tris, points), label='ADCIRC Global Water Level (m)')
mesh = dshade.rasterize(trimesh).opts(cmap='turbo', colorbar=True, width=650, height=500)
tiles * mesh
Because Xarray does not yet understand that x
and y
are coordinate variables on this triangular mesh, we create our own simple function to find the closest point. If we had a lot of these, we could use a more fancy tree algorithm.
ds['zeta']
<xarray.DataArray 'zeta' (time: 40, node: 12784991)> dask.array<open_dataset-9d8ed5545d2239c017b6d200f9ff4ff0zeta, shape=(40, 12784991), dtype=float64, chunksize=(1, 511400), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 2016-09-01T12:00:00 2016-09-02 ... 2016-09-21 x (node) float64 dask.array<chunksize=(511400,), meta=np.ndarray> y (node) float64 dask.array<chunksize=(511400,), meta=np.ndarray> Dimensions without coordinates: node Attributes: long_name: water surface elevation above geoid standard_name: sea_surface_height_above_geoid location: node mesh: adcirc_mesh units: m
# find the indices of the points in (x,y) closest to the points in (xi,yi)
def nearxy(x,y,xi,yi):
ind = np.ones(len(xi),dtype=int)
for i in range(len(xi)):
dist = np.sqrt((x-xi[i])**2+(y-yi[i])**2)
ind[i] = dist.argmin()
return ind
ds['x'].min().values
#just offshore of Galveston
lat = 29.0329856
lon = -95.1535041
ind = nearxy(ds['x'].values,ds['y'].values,[lon], [lat])
ds['zeta'][:,ind].hvplot(x='time', grid=True)
Be a good citizen and shutdown your cluster if you are done using it
cluster.shutdown()
/home/conda/users/226e91a965d185c2173ace6d0daf5747b886d5f2b5299f8dbce39e2e7fb456a3-20230830-192612-585397-241-pangeo/lib/python3.10/site-packages/dask_gateway/client.py:1014: RuntimeWarning: coroutine 'rpc.close_rpc' was never awaited self.scheduler_comm.close_rpc()