try:
import s3fs
except:
!conda install s3fs -y
import s3fs
from dask.distributed import Client, progress, LocalCluster
from dask_kubernetes import KubeCluster
import pandas as pd
import xarray as xr
import gcsfs
import zarr
if False:
cluster = KubeCluster.from_yaml('/home/jovyan/s3worker.yml')
cluster.scale(10)
else:
cluster = LocalCluster()
cluster
Failed to display Jupyter Widget of type VBox
.
If you're reading this message in the Jupyter Notebook or JupyterLab Notebook, it may mean that the widgets JavaScript is still loading. If this message persists, it likely means that the widgets JavaScript library is either not installed or not enabled. See the Jupyter Widgets Documentation for setup instructions.
If you're reading this message in another frontend (for example, a static rendering on GitHub or NBViewer), it may mean that your frontend doesn't currently support widgets.
client = Client(cluster)
client
Client
|
Cluster
|
s3_bucket = 'rsignell/nwm/test01'
%%time
fs = s3fs.S3FileSystem(anon=False)
s3map = s3fs.S3Map(s3_bucket, s3=fs)
ds = xr.open_zarr(s3map, auto_chunk=False) # auto_chunk=True fails
CPU times: user 1.61 s, sys: 146 ms, total: 1.76 s Wall time: 13.7 s
%%time
ds2 = ds.chunk({'time':1})
CPU times: user 5 ms, sys: 0 ns, total: 5 ms Wall time: 4.89 ms
ds2
<xarray.Dataset> Dimensions: (nv: 2, reference_time: 11, time: 11, x: 4608, y: 3840) Coordinates: * reference_time (reference_time) datetime64[ns] 2018-04-01T18:00:00 ... * time (time) datetime64[ns] 2018-04-01T19:00:00 ... * x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 ... * y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... Dimensions without coordinates: nv Data variables: LWDOWN (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> PSFC (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> ProjectionCoordinateSystem (time) |S64 dask.array<shape=(11,), chunksize=(1,)> Q2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> RAINRATE (time, y, x) float32 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> SWDOWN (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> T2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> U2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> V2D (time, y, x) float64 dask.array<shape=(11, 3840, 4608), chunksize=(1, 3840, 4608)> time_bounds (time, nv) datetime64[ns] dask.array<shape=(11, 2), chunksize=(1, 2)> Attributes: DODS.strlen: 0 DODS_EXTRA.Unlimited_Dimension: time model_initialization_time: 2018-04-01_18:00:00 model_output_valid_time: 2018-04-01_19:00:00
%matplotlib inline
import matplotlib.pyplot as plt
plt.imshow(ds2['T2D'][0,::4,::4])
<matplotlib.image.AxesImage at 0x7f1ef6003518>
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token='browser')
#fs = gcsfs.GCSFileSystem(project='pangeo-181919')
Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=586241054156-0asut23a7m10790r2ik24309flribp7j.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdevstorage.full_control&state=jJGf9eUxVnolqMXCA76NgrWJKhkEQ7&prompt=consent&access_type=offline Enter the authorization code: 4/AACL3wNVP5WEJiqhH1ikswPwLn8nUDtoDXtOvJPZGC0MN6-EyeOlplQ
bucket = 'pangeo-data/rsignell/nwm/test01'
gcsmap = gcsfs.mapping.GCSMap(bucket, gcs=fs, check=False, create=False)
#client.get_versions(check=True)
%time ds2.to_zarr(store=gcsmap, mode='w')
distributed.scheduler - ERROR - error from worker tcp://127.0.0.1:39575: EOF when reading a line distributed.utils - ERROR - EOF when reading a line Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 238, in f result[0] = yield make_coro() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run yielded = self.gen.throw(*exc_info) File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 1378, in _gather traceback) File "/opt/conda/lib/python3.6/site-packages/six.py", line 692, in reraise raise value.with_traceback(tb) File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads return pickle.loads(x) File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 1011, in __setstate__ self.connect(self.token) File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 407, in connect self.__getattribute__('_connect_' + method)() File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 379, in _connect_browser credentials = flow.run_console() File "/opt/conda/lib/python3.6/site-packages/google_auth_oauthlib/flow.py", line 362, in run_console code = input(authorization_code_message) EOFError: EOF when reading a line
--------------------------------------------------------------------------- EOFError Traceback (most recent call last) <timed eval> in <module>() /opt/conda/lib/python3.6/site-packages/xarray/core/dataset.py in to_zarr(self, store, mode, synchronizer, group, encoding) 1163 from ..backends.api import to_zarr 1164 return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer, -> 1165 group=group, encoding=encoding) 1166 1167 def __unicode__(self): /opt/conda/lib/python3.6/site-packages/xarray/backends/api.py in to_zarr(dataset, store, mode, synchronizer, group, encoding) 777 # I think zarr stores should always be sync'd immediately 778 # TODO: figure out how to properly handle unlimited_dims --> 779 dataset.dump_to_store(store, sync=True, encoding=encoding) 780 return store /opt/conda/lib/python3.6/site-packages/xarray/core/dataset.py in dump_to_store(self, store, encoder, sync, encoding, unlimited_dims) 1068 unlimited_dims=unlimited_dims) 1069 if sync: -> 1070 store.sync() 1071 1072 def to_netcdf(self, path=None, mode='w', format=None, group=None, /opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in sync(self) 365 366 def sync(self): --> 367 self.writer.sync() 368 369 /opt/conda/lib/python3.6/site-packages/xarray/backends/common.py in sync(self) 268 if self.sources: 269 import dask.array as da --> 270 da.store(self.sources, self.targets, lock=self.lock) 271 self.sources = [] 272 self.targets = [] /opt/conda/lib/python3.6/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs) 953 954 if compute: --> 955 result.compute(**kwargs) 956 return None 957 else: /opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs) 153 dask.base.compute 154 """ --> 155 (result,) = compute(self, traverse=False, **kwargs) 156 return result 157 /opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs) 402 postcomputes = [a.__dask_postcompute__() if is_dask_collection(a) 403 else (None, a) for a in args] --> 404 results = get(dsk, keys, **kwargs) 405 results_iter = iter(results) 406 return tuple(a if f is None else f(next(results_iter), *a) /opt/conda/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, **kwargs) 2073 try: 2074 results = self.gather(packed, asynchronous=asynchronous, -> 2075 direct=direct) 2076 finally: 2077 for f in futures.values(): /opt/conda/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous) 1498 return self.sync(self._gather, futures, errors=errors, 1499 direct=direct, local_worker=local_worker, -> 1500 asynchronous=asynchronous) 1501 1502 @gen.coroutine /opt/conda/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs) 613 return future 614 else: --> 615 return sync(self.loop, func, *args, **kwargs) 616 617 def __repr__(self): /opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs) 252 e.wait(10) 253 if error[0]: --> 254 six.reraise(*error[0]) 255 else: 256 return result[0] /opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb) 691 if value.__traceback__ is not tb: 692 raise value.with_traceback(tb) --> 693 raise value 694 finally: 695 value = None /opt/conda/lib/python3.6/site-packages/distributed/utils.py in f() 236 yield gen.moment 237 thread_state.asynchronous = True --> 238 result[0] = yield make_coro() 239 except Exception as exc: 240 logger.exception(exc) /opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self) 1097 1098 try: -> 1099 value = future.result() 1100 except Exception: 1101 self.had_exception = True /opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self) 1105 if exc_info is not None: 1106 try: -> 1107 yielded = self.gen.throw(*exc_info) 1108 finally: 1109 # Break up a reference to itself /opt/conda/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker) 1376 six.reraise(type(exception), 1377 exception, -> 1378 traceback) 1379 if errors == 'skip': 1380 bad_keys.add(key) /opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb) 690 value = tp() 691 if value.__traceback__ is not tb: --> 692 raise value.with_traceback(tb) 693 raise value 694 finally: /opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in loads() 57 def loads(x): 58 try: ---> 59 return pickle.loads(x) 60 except Exception: 61 logger.info("Failed to deserialize %s", x[:10000], exc_info=True) /opt/conda/lib/python3.6/site-packages/gcsfs/core.py in __setstate__() 1009 def __setstate__(self, state): 1010 self.__dict__.update(state) -> 1011 self.connect(self.token) 1012 1013 /opt/conda/lib/python3.6/site-packages/gcsfs/core.py in connect() 405 break 406 else: --> 407 self.__getattribute__('_connect_' + method)() 408 self.method = method 409 /opt/conda/lib/python3.6/site-packages/gcsfs/core.py in _connect_browser() 377 def _connect_browser(self): 378 flow = InstalledAppFlow.from_client_config(client_config, [self.scope]) --> 379 credentials = flow.run_console() 380 self.tokens[(self.project, self.access)] = credentials 381 self._save_tokens() /opt/conda/lib/python3.6/site-packages/google_auth_oauthlib/flow.py in run_console() 360 print(authorization_prompt_message.format(url=auth_url)) 361 --> 362 code = input(authorization_code_message) 363 364 self.fetch_token(code=code) EOFError: EOF when reading a line
# works if auto_chunk=False
ds3 = xr.open_zarr(gcsmap, auto_chunk=False)
ds3
ds4 = xr.open_zarr(gcsmap, auto_chunk=True)