#!/usr/bin/env python # coding: utf-8 # # Rechunk the kerchunked dataset # In[1]: import fsspec import fsspec.implementations.reference import zarr import xarray as xr from pathlib import Path from rechunker import rechunk # In[2]: import rechunker rechunker.__version__ # In[3]: import zarr zarr.__version__ # In[4]: #client.close(); cluster.close() # #### Start a Dask Gateway cluster # Use a custom helper function `ebd.start_dask_cluster` to set options on this cluster. We don't have to use this helper, it just cuts down on lines of code in notebooks. # In[5]: from dask.distributed import LocalCluster, Client cluster = LocalCluster(n_workers=40, threads_per_worker=1) client = Client(cluster) client # #### Open Combined Kerchunked Dataset # In[6]: combined_json = '/caldera/hytest_scratch/scratch/rsignell/sudhir.json' # In[7]: get_ipython().run_cell_magic('time', '', 'fs = fsspec.filesystem("reference", fo=combined_json, ref_storage_args={\'skip_instance_cache\':True})\nm = fs.get_mapper("")\n\nds = xr.open_dataset(m, engine="zarr", mask_and_scale=False, decode_cf=False, decode_times=False,\n backend_kwargs={\'consolidated\':False}, chunks={})\n') # In[8]: ds # In[9]: ds = ds.isel(time=slice(0,144)) # In[10]: ds # In[11]: ds.nbytes/1e9 # #### set up zarr stores for temporary and final zarr stores on S3 # In[12]: fs_write = fsspec.filesystem('file') # In[13]: temp_name = '/caldera/hytest_scratch/scratch/rsignell/AORC/rechunk/test01.tmp' target_name = '/caldera/hytest_scratch/scratch/rsignell/AORC/rechunk/test01.zarr' # In[14]: temp_store = fs_write.get_mapper(temp_name) target_store = fs_write.get_mapper(target_name) # In[15]: try: fs_write.rm(temp_name, recursive=True) except: pass try: fs_write.rm(target_name, recursive=True) except: pass # In[16]: nt = len(ds.time) nt = 365*24*30 # In[17]: a = int(nt)/144 a # In[18]: b = (len(ds.longitude) * len(ds.latitude))/((2*200)*(2*200)) b # In[19]: a/b # In[20]: 2.8/4 # In[21]: #client.close() # In[22]: #from dask.distributed import Client # In[23]: #client = Client(threads_per_worker=1) # In[24]: #client.amm.start() # In[25]: import zarr.storage from numcodecs import Zstd zarr.storage.default_compressor = Zstd(level=9) # In[26]: (200*200*144)*2/1e6 # In[27]: 190/40*.7 # In[28]: chunks={'latitude':256, 'longitude':256, 'time':144} verbose=True mem = '2.8GB' group_chunks = {} # newer tuple version that also takes into account when specified chunks are larger than the array for var in ds.variables: # pick appropriate chunks from above, and default to full length chunks for dimensions that are not in `chunks` above. group_chunks[var] = [] for di in ds[var].dims: if di in chunks.keys(): if chunks[di] > len(ds[di]): group_chunks[var].append(len(ds[di])) else: group_chunks[var].append(chunks[di]) else: group_chunks[var].append(len(ds[di])) group_chunks[var] = tuple(group_chunks[var]) if verbose: print(f"Rechunking to: {group_chunks}") print(f"mem:{mem}") # #### Rechunk! # In[29]: from dask.distributed import performance_report # In[30]: ds.time.attrs['standard_name'] = 'time' del ds.time.attrs['missing_value'] ds.longitude.attrs['standard_name'] = 'longitude' del ds.longitude.attrs['missing_value'] ds.latitude.attrs['standard_name'] = 'latitude' del ds.latitude.attrs['missing_value'] # In[31]: get_ipython().run_cell_magic('time', '', 'rechunked = rechunker.rechunk(ds, target_chunks=group_chunks, max_mem=mem,\n target_store=target_store, temp_store=temp_store)\n') # In[32]: get_ipython().run_cell_magic('time', '', 'with performance_report(filename="dask-report-whole.html"):\n rechunked.execute(retries=10)\n') # In[33]: zarr.convenience.consolidate_metadata(target_store) # #### Explore the rechunked dataset # In[ ]: ds2 = xr.open_dataset(target_store, engine='zarr', chunks={}) # In[ ]: ds2.nbytes/1e9 #uncompressed data size in GB # In[ ]: ds2 # In[ ]: ds2.APCP_surface # In[ ]: import hvplot.xarray # In[ ]: get_ipython().run_cell_magic('time', '', "da = ds.APCP_surface.sel(longitude=-115.18, latitude=46.65, method='nearest').load()\n") # In[ ]: da.hvplot(x='time', grid=True) # In[ ]: cluster.close() # In[ ]: client.close() # In[ ]: client # In[ ]: