#!/usr/bin/env python # coding: utf-8 # Testing Utide on Pangeo with Dask distributed # ==== # # In[1]: get_ipython().run_line_magic('matplotlib', 'inline') import xarray as xr import matplotlib.pyplot as plt import gcsfs try: import utide except: get_ipython().system('conda install utide -y') import utide # In[2]: from dask.distributed import Client, progress, LocalCluster from dask_kubernetes import KubeCluster # For Dask, the workers need Utide too. So we created `utide-worker.yaml`, which is just the `default-worker.yaml` with the addition of utide from conda-forge. # ``` # env: # - name: GCSFUSE_BUCKET # value: pangeo-data # - name: EXTRA_CONDA_PACKAGES # value: utide -c conda-forge # ``` # # Since we are customizing the workers, they take about a minute to spin up.... # In[3]: cluster = KubeCluster.from_yaml('/home/jovyan/utide-worker.yaml') cluster.scale(10); cluster # In[ ]: client = Client(cluster) client # ### Open an Xarray Dataset # In[ ]: fs = gcsfs.GCSFileSystem(project='pangeo-181919', access='read_only') gcsmap = gcsfs.mapping.GCSMap('pangeo-data/rsignell/ocean_his_tide_zeta', gcs=fs) # In[ ]: ds = xr.open_zarr(gcsmap, decode_times=False) # In[ ]: ds # In[ ]: dt, n, m = ds['zeta'].shape print(dt,n,m) # In[ ]: t = ds['ocean_time'].values/(3600*24) # In[ ]: #client.get_versions(check=True) # In[ ]: from utide import solve import numpy as np import warnings lat = 40.7 # In[ ]: get_ipython().run_cell_magic('time', '', 'with warnings.catch_warnings():\n warnings.simplefilter("ignore")\n acoef = solve(t, ds[\'zeta\'][:,10,10].values, 0*t, lat, trend=False, \n nodal=False, Rayleigh_min=0.95, method=\'ols\', conf_int=\'linear\')\n val = acoef[\'Lsmaj\']\n') # In[ ]: acoef['name'] # In[ ]: import dask.array as da from dask import delayed # Make Utide `solve` a delayed function # In[ ]: usolve = delayed(solve) # Load all the data to start, since it's only 2GB # In[ ]: get_ipython().run_cell_magic('time', '', "z = ds['zeta'][:].compute()\nprint(z.nbytes/1e9)\n") # Specify the subset interval to calcuate tides. nsub = 4 means do every 4th point # In[ ]: nsub = 10 # Create a list of delayed functions (one for each grid cell) # In[ ]: get_ipython().run_cell_magic('time', '', 'with warnings.catch_warnings():\n warnings.simplefilter("ignore")\n coefs = [usolve(t, z[:,j*nsub,i*nsub], t*0.0, lat, \n trend=False, nodal=False, Rayleigh_min=0.95, method=\'ols\',\n conf_int=\'linear\') for j in range(int(n/nsub)) for i in range(int(m/nsub))]\n') # Construct a list of Dask arrays # In[ ]: arrays = [da.from_delayed(coef['Lsmaj'], dtype=val.dtype, shape=val.shape) for coef in coefs] # Stack the arrays # In[ ]: stack = da.stack(arrays, axis=0) # In[ ]: stack # In[ ]: get_ipython().run_cell_magic('time', '', 'amps = stack.compute()\n') # In[ ]: pods = cluster.pods() #print(cluster.logs(pods[0])) # In[ ]: m2amp = amps[:,0].reshape((int(n/nsub),int(m/nsub))) # In[ ]: get_ipython().run_line_magic('matplotlib', 'inline') import matplotlib.pyplot as plt # In[ ]: plt.figure(figsize=(12,8)) plt.pcolormesh(m2amp) plt.colorbar() plt.title('M2 Elevation Amplitude');