#!/usr/bin/env python # coding: utf-8 # # Convert lots of small NetCDFs to one big Zarr # The National Water Model writes a new NetCDF file for each hour, resulting in 8760 files for a year, and 227904 files for the entire 26 year reanalysis (1993-01-01 00:00 - 2018-12-31 23:00). # # For small datasets, rechunking the data to Zarr would be as simple as: # # ``` # import xarray as xr # ds = xr.open_mfdataset('*.nc') # ds = ds.chunk({'time':672, 'feature_id':30000}) # ds.to_zarr('all_nc.zarr', consolidated=True) # ``` # For large datasets, this approach is slow and uses too much memory. Here we process the data in batches of 672 time files at a time (one time chunk). # # For each batch, we create an xarray dataset with open_mfdataset, then use [rechunker](https://github.com/pangeo-data/rechunker), which creates a rechunked Zarr dataset for that batch. We then append each batch (each time chunk) along the time dimension, building up our overall dataset. # # The nice part of this approach is that if something goes wrong with the batch, we can fix the problem and just carry on appending. # In[1]: import numpy as np import xarray as xr import pandas as pd import numcodecs from dask.distributed import Client, progress, LocalCluster, performance_report import zarr import time import shutil # In[2]: import os os.chdir('/usgs/gamone/data2/rsignell/data/NWM2') # Build a list of filenames for open_mfdataset # In[3]: dates = pd.date_range(start='1993-01-01 00:00',end='2018-12-31 23:00', freq='1h') files = ['./nc/{}/{}.CHRTOUT_DOMAIN1.comp'.format(date.strftime('%Y'),date.strftime('%Y%m%d%H%M')) for date in dates] # In[4]: len(files) # In[5]: dset = xr.open_dataset(files[0]) # In[6]: dset # A nice chunk size for object storage is on the order of 100Mb. # In[7]: time_chunk_size = 672 feature_chunk_size = 30000 # In[8]: 81312/672 # In[9]: nh_chunks = len(dset.feature_id)/feature_chunk_size nh_chunks # In[10]: nt_chunks = int(np.ceil(len(files)/time_chunk_size)) nt_chunks # In[11]: (time_chunk_size * feature_chunk_size )*8 / 1e6 # ... Close enough to 100Mb # Create a function to drop stuff that messes up `open_mfdataset` # In[12]: def drop_coords(ds): ds = ds.drop(['reference_time','feature_id', 'crs']) return ds.reset_coords(drop=True) # In[13]: #cluster.close(); client.close() # Create a local dask cluster # In[14]: #cluster = LocalCluster() # for this first step, having 32 workers is faster cluster = LocalCluster(n_workers=8, threads_per_worker=1) cluster # In[15]: client = Client(cluster) # Tell blosc not to use threads since we are using dask to parallelize # In[16]: numcodecs.blosc.use_threads = False # In[17]: zarr_step = 'zarr/step' zarr_chunked = 'zarr/nwm' zarr_temp = 'zarr/tmp' # Step our way through the dataset, reading one chunk along the time dimension at a time, to avoid dask reading too many chunks before writing and blowing out memory. First time chunk is written to zarr, then others are appended. # #### Set up Rechunker # In[18]: from rechunker import rechunk # In[19]: chunk_mem_factor = 0.8 #fraction of worker memory for each chunk (seems to be the max possible) worker_mem = cluster.worker_spec[0]['options']['memory_limit']/1e9 max_mem = '{:.2f}GB'.format(chunk_mem_factor * worker_mem) print(max_mem) # #### Do the big loop # In[20]: get_ipython().run_cell_magic('time', '', 'for i in range(nt_chunks):\n print(i)\n istart = i * time_chunk_size\n istop = int(np.min([(i+1) * time_chunk_size, len(files)]))\n \n ds = xr.open_mfdataset(files[istart:istop], parallel=True, \n preprocess=drop_coords, combine=\'by_coords\', \n concat_dim=\'time\', join=\'override\')\n \n # add back in the \'feature_id\' coordinate removed by preprocessing \n ds.coords[\'feature_id\'] = dset.coords[\'feature_id\']\n \n # chunk this step to zarr using rechunker\n\n # remote the temp and step zarr datasets\n try:\n shutil.rmtree(zarr_temp)\n while os.path.exists(zarr_temp): # check if it still exists\n pass\n except:\n pass\n\n try:\n shutil.rmtree(zarr_step)\n while os.path.exists(zarr_step): # check if it still exists\n pass\n except:\n pass\n \n chunk_plan={}\n for var in ds.data_vars:\n if len(ds[var].dims)==2:\n var_chunk = (time_chunk_size, feature_chunk_size)\n chunk_plan[var] = var_chunk\n\n array_plan = rechunk(ds, chunk_plan, max_mem, zarr_step, \n temp_store=zarr_temp)\n \n with performance_report(filename="dask-report.html"):\n result = array_plan.execute(retries=10)\n\n # read back in the zarr chunk rechunker wrote\n ds = xr.open_zarr(zarr_step)\n\n if i==0:\n ds.to_zarr(zarr_chunked, consolidated=True, mode=\'w\')\n else:\n ds.to_zarr(zarr_chunked, consolidated=True, append_dim=\'time\')\n') # The workflow threw an error on the last partial chunk because Rechunker doesn't think the chunk_plan is valid. But it is valid to have a partial last chunk. Here we just rechunk the last partial chunk without rechunker and append it to the overall Zarr dataset. # In[22]: ds1 = ds.chunk({'feature_id':feature_chunk_size, 'time':time_chunk_size}) # In[23]: ds1.to_zarr('./zarr/last_step', consolidated=True, mode='w') # In[24]: ds2 = xr.open_zarr('/usgs/gamone/data2/rsignell/data/NWM2/zarr/last_step', consolidated=True) # In[25]: ds2.to_zarr(zarr_chunked, consolidated=True, append_dim='time') # Check the resulting chunked dataset for correct start time, stop time and for any gaps. If there are no gaps we should get just a single unique value of 3600s for the difference between the hourly time steps. # In[26]: ds1 = xr.open_zarr('/usgs/gamone/data2/rsignell/data/NWM2/zarr/nwm', consolidated=True) print(ds1.time[0].values) print(ds1.time[-1].values) # In[27]: d1 = ds1.time.diff(dim='time').values/1e9 # convert datetime64 nanoseconds to seconds # In[30]: np.unique(d1) # In[ ]: #cluster.close(); client.close() # In[36]: import hvplot.xarray ds1.streamflow[:,1000].plot() # In[ ]: