%matplotlib inline
import xarray as xr
import matplotlib.pyplot as plt
import s3fs
import utide
from dask.distributed import Client, progress, LocalCluster
from dask_kubernetes import KubeCluster
For Dask, the workers need Utide too. So we created utide-worker.yaml
, which is just the default-worker.yaml
with the addition of utide from conda-forge.
env:
- name: GCSFUSE_BUCKET
value: pangeo-data
- name: EXTRA_CONDA_PACKAGES
value: utide -c conda-forge
Since we are customizing the workers, they take about a minute to spin up....
cluster = KubeCluster.from_yaml('/home/jovyan/worker-template.yaml')
cluster.scale(10);
cluster
VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n <style scoped>\n .…
client = Client(cluster)
client
Client
|
Cluster
|
# GCS
#fs = gcsfs.GCSFileSystem(project='pangeo-181919', access='read_only')
#fmap = gcsfs.mapping.GCSMap('pangeo-data/rsignell/ocean_his_tide_zeta', gcs=fs)
# AWS s3
fs = s3fs.S3FileSystem(anon=True)
fmap = s3fs.S3Map('rsignell/tides', s3=fs)
#fmap = s3fs.S3Map('rsignell/nwm/tiny3a', s3=fs)
ds = xr.open_zarr(fmap, decode_times=False)
ds
<xarray.Dataset> Dimensions: (eta_rho: 324, ocean_time: 1441, xi_rho: 1542) Coordinates: lat_rho (eta_rho, xi_rho) float64 dask.array<shape=(324, 1542), chunksize=(81, 771)> lon_rho (eta_rho, xi_rho) float64 dask.array<shape=(324, 1542), chunksize=(81, 771)> * ocean_time (ocean_time) float64 0.0 1.8e+03 3.6e+03 5.4e+03 7.2e+03 ... Dimensions without coordinates: eta_rho, xi_rho Data variables: zeta (ocean_time, eta_rho, xi_rho) float32 dask.array<shape=(1441, 324, 1542), chunksize=(91, 41, 193)> Attributes: CPP_options: GSB, ADD_FSOBC, ADD_M2OBC, ANA_BSFLUX, ANA_BTFLUX, ANA... Conventions: CF-1.4 NCO: 4.7.3 NLM_LBC: \nEDGE: WEST SOUTH EAST NORTH \nzeta: Cha C... ana_file: ROMS/Functionals/ana_btflux.h, ROMS/Functionals/ana_fs... code_dir: /cxfs/projects/usgs/hazards/cmgp/woodshole/aaretxabale... compiler_command: /opt/intel/impi/5.0.1.035/intel64/bin/mpif90 compiler_flags: -heap-arrays -fp-model precise -ip -O3 -xW -free compiler_system: ifort cpu: x86_64 file: ocean_his_gsb_tides_55nb.nc format: netCDF-3 64bit offset file frc_file_01: ../forcings/tide_forc_GSB_55.nc grd_file: ../grids/GSB_55nb.nc header_dir: /cxfs/projects/usgs/hazards/cmgp/woodshole/aaretxabale... header_file: gsb.h his_file: ocean_his_gsb_tides_55nb.nc history: Mon Mar 12 09:36:34 2018: ncks -O -v ocean_time,zeta,l... os: Linux rst_file: ocean_rst.nc script_file: svn_rev: svn_url: https:://myroms.org/svn/src tiling: 036x010 title: Great south Bay type: ROMS/TOMS history file var_info: varinfo.dat
dt, n, m = ds['zeta'].shape
print(dt,n,m)
1441 324 1542
t = ds['ocean_time'].values/(3600*24)
#client.get_versions(check=True)
from utide import solve
import numpy as np
import warnings
lat = 40.7
%%time
with warnings.catch_warnings():
warnings.simplefilter("ignore")
acoef = solve(t, ds['zeta'][:,10,10].values, 0*t, lat, trend=False,
nodal=False, Rayleigh_min=0.95, method='ols', conf_int='linear')
val = acoef['Lsmaj']
solve: matrix prep ... solution ... diagnostics ... done. CPU times: user 312 ms, sys: 188 ms, total: 500 ms Wall time: 6.94 s
acoef['name']
array(['M2', 'N2', 'S2', 'K1', 'O1', 'ETA2', 'M6', 'M4', 'Q1', 'UPS1', 'OO1', 'J1', 'NO1', '2Q1', 'MO3', 'M3', 'SK3', 'MK3', 'MN4', 'MSF', 'MS4', 'S4', '2MK5', '2SK5', '3MK7', '2MN6', '2MS6', '2SM6', 'M8'], dtype=object)
import dask.array as da
from dask import delayed
Make Utide solve
a delayed function
usolve = delayed(solve)
Load all the data to start, since it's only 2GB
%%time
z = ds['zeta'][:].compute()
print(z.nbytes/1e9)
2.879740512 CPU times: user 4.12 s, sys: 7.87 s, total: 12 s Wall time: 21.7 s
Specify the subset interval to calcuate tides. nsub = 4 means do every 4th point
nsub = 4
Create a list of delayed functions (one for each grid cell)
%%time
with warnings.catch_warnings():
warnings.simplefilter("ignore")
coefs = [usolve(t, z[:,j*nsub,i*nsub], t*0.0, lat,
trend=False, nodal=False, Rayleigh_min=0.95, method='ols',
conf_int='linear') for j in range(int(n/nsub)) for i in range(int(m/nsub))]
CPU times: user 14 s, sys: 324 ms, total: 14.3 s Wall time: 14.1 s
Construct a list of Dask arrays
arrays = [da.from_delayed(coef['Lsmaj'], dtype=val.dtype, shape=val.shape) for coef in coefs]
Stack the arrays
stack = da.stack(arrays, axis=0)
stack
dask.array<stack, shape=(31185, 29), dtype=float64, chunksize=(1, 29)>
%%time
amps = stack.compute()
CPU times: user 2min 38s, sys: 21.5 s, total: 3min Wall time: 4min 50s
# for debugging
#pods = cluster.pods()
#print(cluster.logs(pods[0]))
m2amp = amps[:,0].reshape((int(n/nsub),int(m/nsub)))
%matplotlib inline
import matplotlib.pyplot as plt
plt.figure(figsize=(12,8))
plt.pcolormesh(m2amp)
plt.colorbar()
plt.title('M2 Elevation Amplitude');