%matplotlib inline
import xarray as xr
import matplotlib.pyplot as plt
import gcsfs
try:
import utide
except:
!conda install utide -y
import utide
from dask.distributed import Client, progress, LocalCluster
from dask_kubernetes import KubeCluster
For Dask, the workers need Utide too. So we created utide-worker.yaml
, which is just the default-worker.yaml
with the addition of utide from conda-forge.
env:
- name: GCSFUSE_BUCKET
value: pangeo-data
- name: EXTRA_CONDA_PACKAGES
value: utide -c conda-forge
Since we are customizing the workers, they take about a minute to spin up....
cluster = KubeCluster.from_yaml('/home/jovyan/utide-worker.yaml')
cluster.scale(10);
cluster
VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n <style scoped>\n .…
client = Client(cluster)
client
fs = gcsfs.GCSFileSystem(project='pangeo-181919', access='read_only')
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/rsignell/ocean_his_tide_zeta', gcs=fs)
ds = xr.open_zarr(gcsmap, decode_times=False)
ds
dt, n, m = ds['zeta'].shape
print(dt,n,m)
t = ds['ocean_time'].values/(3600*24)
#client.get_versions(check=True)
from utide import solve
import numpy as np
import warnings
lat = 40.7
%%time
with warnings.catch_warnings():
warnings.simplefilter("ignore")
acoef = solve(t, ds['zeta'][:,10,10].values, 0*t, lat, trend=False,
nodal=False, Rayleigh_min=0.95, method='ols', conf_int='linear')
val = acoef['Lsmaj']
acoef['name']
import dask.array as da
from dask import delayed
Make Utide solve
a delayed function
usolve = delayed(solve)
Load all the data to start, since it's only 2GB
%%time
z = ds['zeta'][:].compute()
print(z.nbytes/1e9)
Specify the subset interval to calcuate tides. nsub = 4 means do every 4th point
nsub = 10
Create a list of delayed functions (one for each grid cell)
%%time
with warnings.catch_warnings():
warnings.simplefilter("ignore")
coefs = [usolve(t, z[:,j*nsub,i*nsub], t*0.0, lat,
trend=False, nodal=False, Rayleigh_min=0.95, method='ols',
conf_int='linear') for j in range(int(n/nsub)) for i in range(int(m/nsub))]
Construct a list of Dask arrays
arrays = [da.from_delayed(coef['Lsmaj'], dtype=val.dtype, shape=val.shape) for coef in coefs]
Stack the arrays
stack = da.stack(arrays, axis=0)
stack
%%time
amps = stack.compute()
pods = cluster.pods()
#print(cluster.logs(pods[0]))
m2amp = amps[:,0].reshape((int(n/nsub),int(m/nsub)))
%matplotlib inline
import matplotlib.pyplot as plt
plt.figure(figsize=(12,8))
plt.pcolormesh(m2amp)
plt.colorbar()
plt.title('M2 Elevation Amplitude');