from dask.distributed import Client, progress, LocalCluster
import pandas as pd
import xarray as xr
import s3fs
# depends on the machine you are using
cluster = LocalCluster()
client = Client(cluster)
client
Client
|
Cluster
|
root = 'http://tds.renci.org:8080/thredds/dodsC/nwm/forcing_short_range/'
dates = pd.date_range(start='2018-04-07T18:00', end='2018-04-07T20:00', freq='H')
urls = ['{}{}/nwm.t{}z.short_range.forcing.f001.conus.nc'.format(root,a.strftime('%Y%m%d'),a.strftime('%H')) for a in dates]
%%time
ds = xr.open_mfdataset(urls,concat_dim='time')
CPU times: user 156 ms, sys: 28 ms, total: 184 ms Wall time: 2.15 s
ds
<xarray.Dataset> Dimensions: (nv: 2, reference_time: 3, time: 3, x: 4608, y: 3840) Coordinates: * reference_time (reference_time) datetime64[ns] 2018-04-07T18:00:00 ... * x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 ... * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... * time (time) datetime64[ns] 2018-04-07T19:00:00 ... Dimensions without coordinates: nv Data variables: time_bounds (time, nv) datetime64[ns] dask.array<shape=(3, 2), chunksize=(1, 2)> ProjectionCoordinateSystem (time) |S64 b'' b'' b'' T2D (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> LWDOWN (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> Q2D (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> U2D (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> V2D (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> PSFC (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> RAINRATE (time, y, x) float32 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> SWDOWN (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> Attributes: model_initialization_time: 2018-04-07_18:00:00 model_output_valid_time: 2018-04-07_19:00:00 DODS.strlen: 0 DODS_EXTRA.Unlimited_Dimension: time
ds['ProjectionCoordinateSystem'].dtype
dtype('S64')
#ds = ds.drop(['ProjectionCoordinateSystem'])
ds
<xarray.Dataset> Dimensions: (nv: 2, reference_time: 3, time: 3, x: 4608, y: 3840) Coordinates: * reference_time (reference_time) datetime64[ns] 2018-04-07T18:00:00 ... * x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 ... * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... * time (time) datetime64[ns] 2018-04-07T19:00:00 ... Dimensions without coordinates: nv Data variables: time_bounds (time, nv) datetime64[ns] dask.array<shape=(3, 2), chunksize=(1, 2)> ProjectionCoordinateSystem (time) |S64 b'' b'' b'' T2D (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> LWDOWN (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> Q2D (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> U2D (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> V2D (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> PSFC (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> RAINRATE (time, y, x) float32 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> SWDOWN (time, y, x) float64 dask.array<shape=(3, 3840, 4608), chunksize=(1, 3840, 4608)> Attributes: model_initialization_time: 2018-04-07_18:00:00 model_output_valid_time: 2018-04-07_19:00:00 DODS.strlen: 0 DODS_EXTRA.Unlimited_Dimension: time
fs = s3fs.S3FileSystem(anon=False)
f_zarr = 'rsignell/nwm/test02'
d = s3fs.S3Map(f_zarr, s3=fs)
%time ds.to_zarr(store=d, mode='w')
CPU times: user 8.84 s, sys: 1.2 s, total: 10 s Wall time: 3min 35s
<xarray.backends.zarr.ZarrStore at 0x7fcbb9fa85f8>
ds2 = xr.open_zarr(d)
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-15-8198db1c8578> in <module>() ----> 1 ds2 = xr.open_zarr(d) /opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in open_zarr(store, group, synchronizer, auto_chunk, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables) 476 477 variables = OrderedDict([(k, maybe_chunk(k, v)) --> 478 for k, v in ds.variables.items()]) 479 return ds._replace_vars_and_dims(variables) 480 else: /opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in <listcomp>(.0) 476 477 variables = OrderedDict([(k, maybe_chunk(k, v)) --> 478 for k, v in ds.variables.items()]) 479 return ds._replace_vars_and_dims(variables) 480 else: /opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in maybe_chunk(name, var) 471 token2 = tokenize(name, var._data) 472 name2 = 'zarr-%s' % token2 --> 473 return var.chunk(chunks, name=name2, lock=None) 474 else: 475 return var /opt/conda/lib/python3.6/site-packages/xarray/core/variable.py in chunk(self, chunks, name, lock) 820 data = indexing.ImplicitToExplicitIndexingAdapter( 821 data, indexing.OuterIndexer) --> 822 data = da.from_array(data, chunks, name=name, lock=lock) 823 824 return type(self)(self.dims, data, self._attrs, self._encoding, /opt/conda/lib/python3.6/site-packages/dask/array/core.py in from_array(x, chunks, name, lock, asarray, fancy, getitem) 1988 >>> a = da.from_array(x, chunks=(1000, 1000), lock=True) # doctest: +SKIP 1989 """ -> 1990 chunks = normalize_chunks(chunks, x.shape) 1991 if name in (None, True): 1992 token = tokenize(x, chunks) /opt/conda/lib/python3.6/site-packages/dask/array/core.py in normalize_chunks(chunks, shape) 1918 raise ValueError( 1919 "Chunks and shape must be of the same length/dimension. " -> 1920 "Got chunks=%s, shape=%s" % (chunks, shape)) 1921 1922 if shape is not None: ValueError: Chunks and shape must be of the same length/dimension. Got chunks=(3, 64), shape=(3,)
ds2
%matplotlib inline
isub=4
ds2['T2D'][0,::isub,::isub].plot.imshow()
ds2['T2D'][:,1000,1000].plot()