from dask.distributed import Client, progress, LocalCluster
import pandas as pd
import xarray as xr
import s3fs
# depends on the machine you are using
cluster = LocalCluster()
client = Client(cluster)
client
Client
|
Cluster
|
root = 'http://tds.renci.org:8080/thredds/dodsC/nwm/forcing_short_range/'
dates = pd.date_range(start='2018-04-01T18:00', end='2018-04-02T04:00', freq='H')
urls = ['{}{}/nwm.t{}z.short_range.forcing.f001.conus.nc'.format(root,a.strftime('%Y%m%d'),a.strftime('%H')) for a in dates]
f_zarr = 'rsignell/nwm/test01'
%%time
ds = xr.open_mfdataset(urls,concat_dim='time')
CPU times: user 488 ms, sys: 48 ms, total: 536 ms Wall time: 7.22 s
ds
<xarray.Dataset> Dimensions: (nv: 2, reference_time: 11, time: 11, x: 4608, y: 3840) Coordinates: * reference_time (reference_time) datetime64[ns] 2018-04-01T18:00:00 ... * x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 ... * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... * time (time) datetime64[ns] 2018-04-01T19:00:00 ... Dimensions without coordinates: nv Data variables: time_bounds (time, nv) datetime64[ns] dask.array<shape=(11, 2), chunksize=(1, 2)> ProjectionCoordinateSystem (time) |S64 b'' b'' b'' b'' b'' b'' b'' b'' ... T2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> LWDOWN (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> Q2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> U2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> V2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> PSFC (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> RAINRATE (time, y, x) float32 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> SWDOWN (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> Attributes: model_initialization_time: 2018-04-01_18:00:00 model_output_valid_time: 2018-04-01_19:00:00 DODS.strlen: 0 DODS_EXTRA.Unlimited_Dimension: time
fs = s3fs.S3FileSystem(anon=False)
d = s3fs.S3Map(f_zarr, s3=fs)
%time ds.to_zarr(store=d, mode='w')
CPU times: user 24.3 s, sys: 5.58 s, total: 29.9 s Wall time: 12min 2s
<xarray.backends.zarr.ZarrStore at 0x7f2b3c186b38>
s3map = s3fs.S3Map(f_zarr, s3=fs)
# works if auto_chunk=False
ds2 = xr.open_zarr(s3map, auto_chunk=False)
ds2
<xarray.Dataset> Dimensions: (nv: 2, reference_time: 11, time: 11, x: 4608, y: 3840) Coordinates: * reference_time (reference_time) datetime64[ns] 2018-04-01T18:00:00 ... * time (time) datetime64[ns] 2018-04-01T19:00:00 ... * x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 ... * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... Dimensions without coordinates: nv Data variables: LWDOWN (time, y, x) float64 ... PSFC (time, y, x) float64 ... ProjectionCoordinateSystem (time) |S64 ... Q2D (time, y, x) float64 ... RAINRATE (time, y, x) float32 ... SWDOWN (time, y, x) float64 ... T2D (time, y, x) float64 ... U2D (time, y, x) float64 ... V2D (time, y, x) float64 ... time_bounds (time, nv) datetime64[ns] ... Attributes: DODS.strlen: 0 DODS_EXTRA.Unlimited_Dimension: time model_initialization_time: 2018-04-01_18:00:00 model_output_valid_time: 2018-04-01_19:00:00
ds3 = xr.open_zarr(s3map, auto_chunk=True)
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-15-b3733a1ec58f> in <module>() ----> 1 ds3 = xr.open_zarr(s3map, auto_chunk=True) /opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in open_zarr(store, group, synchronizer, auto_chunk, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables) 476 477 variables = OrderedDict([(k, maybe_chunk(k, v)) --> 478 for k, v in ds.variables.items()]) 479 return ds._replace_vars_and_dims(variables) 480 else: /opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in <listcomp>(.0) 476 477 variables = OrderedDict([(k, maybe_chunk(k, v)) --> 478 for k, v in ds.variables.items()]) 479 return ds._replace_vars_and_dims(variables) 480 else: /opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in maybe_chunk(name, var) 471 token2 = tokenize(name, var._data) 472 name2 = 'zarr-%s' % token2 --> 473 return var.chunk(chunks, name=name2, lock=None) 474 else: 475 return var /opt/conda/lib/python3.6/site-packages/xarray/core/variable.py in chunk(self, chunks, name, lock) 820 data = indexing.ImplicitToExplicitIndexingAdapter( 821 data, indexing.OuterIndexer) --> 822 data = da.from_array(data, chunks, name=name, lock=lock) 823 824 return type(self)(self.dims, data, self._attrs, self._encoding, /opt/conda/lib/python3.6/site-packages/dask/array/core.py in from_array(x, chunks, name, lock, asarray, fancy, getitem) 1977 >>> a = da.from_array(x, chunks=(1000, 1000), lock=True) # doctest: +SKIP 1978 """ -> 1979 chunks = normalize_chunks(chunks, x.shape) 1980 if name in (None, True): 1981 token = tokenize(x, chunks) /opt/conda/lib/python3.6/site-packages/dask/array/core.py in normalize_chunks(chunks, shape) 1907 raise ValueError( 1908 "Chunks and shape must be of the same length/dimension. " -> 1909 "Got chunks=%s, shape=%s" % (chunks, shape)) 1910 1911 if shape is not None: ValueError: Chunks and shape must be of the same length/dimension. Got chunks=(11, 64), shape=(11,)