%matplotlib inline
import xarray as xr
from dask.distributed import Client, progress, LocalCluster
#pangeo='USGS-HPC-YETI'
pangeo='WHOI-HPC-POSEIDON'
if pangeo=='USGS-HPC-YETI':
from dask_jobqueue import SLURMCluster
import os
cluster = SLURMCluster(processes=4, threads=1, memory='16GB',
project='woodshole', walltime='01:00:00', queue='normal',
interface='ib0')
workers = cluster.start_workers(2)
print(cluster.job_script())
zarr_dataset = '/lustre/projects/hazards/cmgp/woodshole/rsignell/test_week5c'
if pangeo=='WHOI-HPC-POSEIDON':
zarr_dataset = '/vortexfs1/usgs/rsignell/data/nwm/test_week5c'
from dask_jobqueue import SLURMCluster
import os
cluster = SLURMCluster(processes=4, threads=1, memory='16GB',
project='science', walltime='03:00:00', queue='compute')
workers = cluster.start_workers(5)
print(cluster.job_script())
#!/bin/bash #SBATCH -J dask-worker #SBATCH -e dask-worker.err #SBATCH -o dask-worker.out #SBATCH -p compute #SBATCH -A science #SBATCH -n 1 #SBATCH --cpus-per-task=4 #SBATCH --mem=60G #SBATCH -t 03:00:00 /vortexfs1/usgs/rsignell/miniconda3/envs/pangeo/bin/dask-worker tcp://10.141.0.12:33093 --nthreads 1 --nprocs 4 --memory-limit 16GB --name dask-worker-7 --death-timeout 60
if pangeo=='ESIP-AWS-S3':
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('/home/jovyan/worker-template.yaml')
cluster.scale(10);
import s3fs
fs = s3fs.S3FileSystem(anon=True)
zarr_dataset = s3fs.S3Map('rsignell/nwm/test_week5c', s3=fs)
if pangeo=='Jetstream-OpenStack':
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('/home/jovyan/worker-template.yaml')
cluster.scale(2);
import s3fs
url='https://iu.jetstream-cloud.org:8080'
fs = s3fs.S3FileSystem(client_kwargs=dict(endpoint_url=url), anon=True)
s3map = s3fs.S3Map('rsignell/nwm/test_week', s3=fs)
client = Client(cluster)
client
Client
|
Cluster
|
ds = xr.open_zarr(zarr_dataset)
ds
<xarray.Dataset> Dimensions: (time: 168, x: 4608, y: 3840) Coordinates: * time (time) datetime64[ns] 2018-04-01T01:00:00 2018-04-01T02:00:00 ... * x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 -2.301e+06 ... * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 -1.917e+06 ... Data variables: LWDOWN (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)> PSFC (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)> Q2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)> RAINRATE (time, y, x) float32 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)> SWDOWN (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)> T2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)> U2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)> V2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)> Attributes: model_initialization_time: 2018-04-01_00:00:00 model_output_valid_time: 2018-04-01_01:00:00
var='T2D'
ds[var].nbytes/1.e9
23.78170368
ds[var].mean(dim='time')
<xarray.DataArray 'T2D' (y: 3840, x: 4608)> dask.array<shape=(3840, 4608), dtype=float64, chunksize=(384, 288)> Coordinates: * x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 -2.301e+06 ... * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 -1.917e+06 ...
mean_var = ds[var].mean(dim='time').persist()
progress(mean_var)
VBox()
isub=4
mean_var[::isub,::isub].plot.imshow(figsize=(8,6));
%%time
ds1d = ds[var][:,2000,2000]
ds1d.plot()
CPU times: user 46 ms, sys: 1.9 ms, total: 47.9 ms Wall time: 194 ms
cluster.stop_workers(workers)