%matplotlib inline
import xarray as xr
import matplotlib.pyplot as plt
import gcsfs
try:
import utide
except:
!conda install utide -y
import utide
Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 4.5.4 latest version: 4.5.11 Please update conda by running $ conda update -n base conda ## Package Plan ## environment location: /opt/conda added / updated specs: - utide The following packages will be downloaded: package | build ---------------------------|----------------- utide-0.2.2 | py_0 83 KB conda-forge The following NEW packages will be INSTALLED: utide: 0.2.2-py_0 conda-forge Downloading and Extracting Packages utide-0.2.2 | 83 KB | ####################################### | 100% Preparing transaction: done Verifying transaction: done Executing transaction: done
from dask.distributed import Client, progress, LocalCluster
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('/home/jovyan/utide-worker.yaml')
cluster.scale(20);
cluster
VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n <style scoped>\n .…
client = Client(cluster)
client
Client
|
Cluster
|
# GCS
fs = gcsfs.GCSFileSystem(project='pangeo-181919', access='read_only')
fmap = gcsfs.mapping.GCSMap('pangeo-data/rsignell/ocean_his_tide_zeta', gcs=fs)
ds = xr.open_zarr(fmap, decode_times=False)
ds
<xarray.Dataset> Dimensions: (eta_rho: 324, ocean_time: 1441, xi_rho: 1542) Coordinates: lat_rho (eta_rho, xi_rho) float64 dask.array<shape=(324, 1542), chunksize=(81, 771)> lon_rho (eta_rho, xi_rho) float64 dask.array<shape=(324, 1542), chunksize=(81, 771)> * ocean_time (ocean_time) float64 0.0 1.8e+03 3.6e+03 5.4e+03 7.2e+03 ... Dimensions without coordinates: eta_rho, xi_rho Data variables: zeta (ocean_time, eta_rho, xi_rho) float32 dask.array<shape=(1441, 324, 1542), chunksize=(91, 41, 193)> Attributes: CPP_options: GSB, ADD_FSOBC, ADD_M2OBC, ANA_BSFLUX, ANA_BTFLUX, ANA... Conventions: CF-1.4 NCO: 4.7.3 NLM_LBC: \nEDGE: WEST SOUTH EAST NORTH \nzeta: Cha C... ana_file: ROMS/Functionals/ana_btflux.h, ROMS/Functionals/ana_fs... code_dir: /cxfs/projects/usgs/hazards/cmgp/woodshole/aaretxabale... compiler_command: /opt/intel/impi/5.0.1.035/intel64/bin/mpif90 compiler_flags: -heap-arrays -fp-model precise -ip -O3 -xW -free compiler_system: ifort cpu: x86_64 file: ocean_his_gsb_tides_55nb.nc format: netCDF-3 64bit offset file frc_file_01: ../forcings/tide_forc_GSB_55.nc grd_file: ../grids/GSB_55nb.nc header_dir: /cxfs/projects/usgs/hazards/cmgp/woodshole/aaretxabale... header_file: gsb.h his_file: ocean_his_gsb_tides_55nb.nc history: Mon Mar 12 09:36:34 2018: ncks -O -v ocean_time,zeta,l... os: Linux rst_file: ocean_rst.nc script_file: svn_rev: svn_url: https:://myroms.org/svn/src tiling: 036x010 title: Great south Bay type: ROMS/TOMS history file var_info: varinfo.dat
dt, n, m = ds['zeta'].shape
print(dt,n,m)
1441 324 1542
t = ds['ocean_time'].values/(3600*24)
#client.get_versions(check=True)
from utide import solve
import numpy as np
import warnings
lat = 40.7
%%time
with warnings.catch_warnings():
warnings.simplefilter("ignore")
acoef = solve(t, ds['zeta'][:,10,10].values, 0*t, lat, trend=False,
nodal=False, Rayleigh_min=0.95, method='ols', conf_int='linear')
val = acoef['Lsmaj']
solve: matrix prep ... solution ... diagnostics ... done. CPU times: user 765 ms, sys: 149 ms, total: 914 ms Wall time: 8.44 s
acoef['name']
array(['M2', 'N2', 'S2', 'K1', 'O1', 'ETA2', 'M6', 'M4', 'Q1', 'UPS1', 'OO1', 'J1', 'NO1', '2Q1', 'MO3', 'M3', 'SK3', 'MK3', 'MN4', 'MSF', 'MS4', 'S4', '2MK5', '2SK5', '3MK7', '2MN6', '2MS6', '2SM6', 'M8'], dtype=object)
import dask.array as da
from dask import delayed, compute
Specify the subset interval to calcuate tides. nsub = 4 means do every 4th point
nsub = 4
Load all the data to start, since even without subsetting it is only 2GB
%%time
z = ds['zeta'][:,::nsub,::nsub].compute()
print(z.nbytes/1e9)
0.180217224 CPU times: user 5.57 s, sys: 1.23 s, total: 6.8 s Wall time: 14.4 s
dt, n, m = z.shape
Create a list of delayed functions (one for each grid cell)
%%time
with warnings.catch_warnings():
warnings.simplefilter("ignore")
coefs = [delayed(solve)(t, z[:,j,i], t*0.0, lat,
trend=False, nodal=False, Rayleigh_min=0.95, method='ols',
conf_int='linear') for j in range(n) for i in range(m)]
CPU times: user 22.5 s, sys: 2.56 s, total: 25 s Wall time: 22.5 s
Construct a list of Dask arrays
arrays = [da.from_delayed(coef['Lsmaj'], dtype=val.dtype, shape=val.shape) for coef in coefs]
Stack the arrays
stack = da.stack(arrays, axis=0)
Now we compute the list of delayed tasks. This is where all the computation occurs.
stack
dask.array<stack, shape=(31266, 29), dtype=float64, chunksize=(1, 29)>
%%time
amps = np.array(stack)
CPU times: user 3min 38s, sys: 23.5 s, total: 4min 1s Wall time: 5min 42s
m2amp = amps[:,0].reshape((n,m))
%matplotlib inline
import matplotlib.pyplot as plt
plt.figure(figsize=(12,8))
plt.pcolormesh(m2amp)
plt.colorbar()
plt.title('M2 Elevation Amplitude');
# for debugging
#pods = cluster.pods()
#print(cluster.logs(pods[0]))