#!/usr/bin/env python # coding: utf-8 # Testing Utide on Pangeo with Dask distributed # ==== # # In[2]: get_ipython().run_line_magic('matplotlib', 'inline') import xarray as xr import matplotlib.pyplot as plt import s3fs import utide # In[3]: from dask.distributed import Client, progress, LocalCluster from dask_kubernetes import KubeCluster # For Dask, the workers need Utide too. So we created `utide-worker.yaml`, which is just the `default-worker.yaml` with the addition of utide from conda-forge. # ``` # env: # - name: GCSFUSE_BUCKET # value: pangeo-data # - name: EXTRA_CONDA_PACKAGES # value: utide -c conda-forge # ``` # # Since we are customizing the workers, they take about a minute to spin up.... # In[5]: cluster = KubeCluster.from_yaml('/home/jovyan/worker-template.yaml') cluster.scale(10); cluster # In[7]: client = Client(cluster) client # ### Open an Xarray Dataset # In[8]: # GCS #fs = gcsfs.GCSFileSystem(project='pangeo-181919', access='read_only') #fmap = gcsfs.mapping.GCSMap('pangeo-data/rsignell/ocean_his_tide_zeta', gcs=fs) # In[13]: # AWS s3 fs = s3fs.S3FileSystem(anon=True) fmap = s3fs.S3Map('rsignell/tides', s3=fs) #fmap = s3fs.S3Map('rsignell/nwm/tiny3a', s3=fs) # In[14]: ds = xr.open_zarr(fmap, decode_times=False) # In[15]: ds # In[16]: dt, n, m = ds['zeta'].shape print(dt,n,m) # In[17]: t = ds['ocean_time'].values/(3600*24) # In[18]: #client.get_versions(check=True) # In[19]: from utide import solve import numpy as np import warnings lat = 40.7 # In[20]: get_ipython().run_cell_magic('time', '', 'with warnings.catch_warnings():\n warnings.simplefilter("ignore")\n acoef = solve(t, ds[\'zeta\'][:,10,10].values, 0*t, lat, trend=False, \n nodal=False, Rayleigh_min=0.95, method=\'ols\', conf_int=\'linear\')\n val = acoef[\'Lsmaj\']\n') # In[21]: acoef['name'] # In[22]: import dask.array as da from dask import delayed # Make Utide `solve` a delayed function # In[23]: usolve = delayed(solve) # Load all the data to start, since it's only 2GB # In[24]: get_ipython().run_cell_magic('time', '', "z = ds['zeta'][:].compute()\nprint(z.nbytes/1e9)\n") # Specify the subset interval to calcuate tides. nsub = 4 means do every 4th point # In[25]: nsub = 4 # Create a list of delayed functions (one for each grid cell) # In[26]: get_ipython().run_cell_magic('time', '', 'with warnings.catch_warnings():\n warnings.simplefilter("ignore")\n coefs = [usolve(t, z[:,j*nsub,i*nsub], t*0.0, lat, \n trend=False, nodal=False, Rayleigh_min=0.95, method=\'ols\',\n conf_int=\'linear\') for j in range(int(n/nsub)) for i in range(int(m/nsub))]\n') # Construct a list of Dask arrays # In[27]: arrays = [da.from_delayed(coef['Lsmaj'], dtype=val.dtype, shape=val.shape) for coef in coefs] # Stack the arrays # In[28]: stack = da.stack(arrays, axis=0) # In[29]: stack # In[30]: get_ipython().run_cell_magic('time', '', 'amps = stack.compute()\n') # In[ ]: # for debugging #pods = cluster.pods() #print(cluster.logs(pods[0])) # In[39]: m2amp = amps[:,0].reshape((int(n/nsub),int(m/nsub))) # In[40]: get_ipython().run_line_magic('matplotlib', 'inline') import matplotlib.pyplot as plt # In[41]: plt.figure(figsize=(12,8)) plt.pcolormesh(m2amp) plt.colorbar() plt.title('M2 Elevation Amplitude'); # In[ ]: