from dask.distributed import Client, progress, LocalCluster
import pandas as pd
import xarray as xr
import s3fs
# depends on the machine you are using
cluster = LocalCluster()
client = Client(cluster)
client
Client
|
Cluster
|
root = 'http://tds.renci.org:8080/thredds/dodsC/nwm/forcing_short_range/'
dates = pd.date_range(start='2018-04-07T18:00', end='2018-04-08T04:00', freq='H')
urls = ['{}{}/nwm.t{}z.short_range.forcing.f001.conus.nc'.format(root,a.strftime('%Y%m%d'),a.strftime('%H')) for a in dates]
%%time
ds = xr.open_mfdataset(urls,concat_dim='time')
CPU times: user 480 ms, sys: 116 ms, total: 596 ms Wall time: 8.58 s
ds = ds.drop(['ProjectionCoordinateSystem'])
ds
<xarray.Dataset> Dimensions: (nv: 2, reference_time: 11, time: 11, x: 4608, y: 3840) Coordinates: * reference_time (reference_time) datetime64[ns] 2018-04-07T18:00:00 ... * x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 -2.301e+06 ... * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 -1.917e+06 ... * time (time) datetime64[ns] 2018-04-07T19:00:00 ... Dimensions without coordinates: nv Data variables: time_bounds (time, nv) datetime64[ns] dask.array<shape=(11, 2), chunksize=(1, 2)> T2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> LWDOWN (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> Q2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> U2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> V2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> PSFC (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> RAINRATE (time, y, x) float32 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> SWDOWN (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> Attributes: model_initialization_time: 2018-04-07_18:00:00 model_output_valid_time: 2018-04-07_19:00:00 DODS.strlen: 0 DODS_EXTRA.Unlimited_Dimension: time
fs = s3fs.S3FileSystem(anon=False)
f_zarr = 'rsignell/nwm/test03'
d = s3fs.S3Map(f_zarr, s3=fs)
%time ds.to_zarr(store=d, mode='w')
CPU times: user 26.3 s, sys: 5.69 s, total: 32 s Wall time: 12min
<xarray.backends.zarr.ZarrStore at 0x7fdc40044e10>
ds2 = xr.open_zarr(d)
ds2
<xarray.Dataset> Dimensions: (nv: 2, reference_time: 11, time: 11, x: 4608, y: 3840) Coordinates: * reference_time (reference_time) datetime64[ns] 2018-04-07T18:00:00 ... * time (time) datetime64[ns] 2018-04-07T19:00:00 ... * x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 -2.301e+06 ... * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 -1.917e+06 ... Dimensions without coordinates: nv Data variables: LWDOWN (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> PSFC (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> Q2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> RAINRATE (time, y, x) float32 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> SWDOWN (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> T2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> U2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> V2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> time_bounds (time, nv) datetime64[ns] dask.array<shape=(11, 2), chunksize=(11, 2)> Attributes: DODS.strlen: 0 DODS_EXTRA.Unlimited_Dimension: time model_initialization_time: 2018-04-07_18:00:00 model_output_valid_time: 2018-04-07_19:00:00