The National Water Model writes a new NetCDF file for each hour, resulting in 8760 files for a year, and 227904 files for the entire 26 year reanalysis (1993-01-01 00:00 - 2018-12-31 23:00).
For small datasets, rechunking the data to Zarr would be as simple as:
import xarray as xr
ds = xr.open_mfdataset('*.nc')
ds = ds.chunk({'time':672, 'feature_id':30000})
ds.to_zarr('all_nc.zarr', consolidated=True)
For large datasets, this approach is slow and uses too much memory. Here we process the data in batches of 672 time files at a time (one time chunk).
For each batch, we create an xarray dataset with open_mfdataset, then use rechunker, which creates a rechunked Zarr dataset for that batch. We then append each batch (each time chunk) along the time dimension, building up our overall dataset.
The nice part of this approach is that if something goes wrong with the batch, we can fix the problem and just carry on appending.
import numpy as np
import xarray as xr
import pandas as pd
import numcodecs
from dask.distributed import Client, progress, LocalCluster, performance_report
import zarr
import time
import shutil
import os
os.chdir('/usgs/gamone/data2/rsignell/data/NWM2')
Build a list of filenames for open_mfdataset
dates = pd.date_range(start='1993-01-01 00:00',end='2018-12-31 23:00', freq='1h')
files = ['./nc/{}/{}.CHRTOUT_DOMAIN1.comp'.format(date.strftime('%Y'),date.strftime('%Y%m%d%H%M')) for date in dates]
len(files)
227904
dset = xr.open_dataset(files[0])
dset
<xarray.Dataset> Dimensions: (feature_id: 2729077, reference_time: 1, time: 1) Coordinates: * time (time) datetime64[ns] 1993-01-01 * reference_time (reference_time) datetime64[ns] 1992-11-01 * feature_id (feature_id) int32 101 179 181 ... 1180001803 1180001804 latitude (feature_id) float32 ... longitude (feature_id) float32 ... Data variables: crs |S1 b'' order (feature_id) int32 ... elevation (feature_id) float32 ... streamflow (feature_id) float64 ... q_lateral (feature_id) float64 ... velocity (feature_id) float64 ... qSfcLatRunoff (feature_id) float64 ... qBucket (feature_id) float64 ... qBtmVertRunoff (feature_id) float64 ... Attributes: featureType: timeSeries proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... model_initialization_time: 1992-11-01_00:00:00 station_dimension: feature_id model_output_valid_time: 1993-01-01_00:00:00 model_total_valid_times: 1464 stream_order_output: 1 cdm_datatype: Station Conventions: CF-1.6 code_version: v5.1.0-alpha11 model_output_type: channel_rt model_configuration: retrospective dev_OVRTSWCRT: 1 dev_NOAH_TIMESTEP: 3600 dev_channel_only: 0 dev_channelBucket_only: 0 dev: dev_ prefix indicates development/internal me...
array(['1993-01-01T00:00:00.000000000'], dtype='datetime64[ns]')
array(['1992-11-01T00:00:00.000000000'], dtype='datetime64[ns]')
array([ 101, 179, 181, ..., 1180001802, 1180001803, 1180001804], dtype=int32)
[2729077 values with dtype=float32]
[2729077 values with dtype=float32]
array(b'', dtype='|S1')
[2729077 values with dtype=int32]
[2729077 values with dtype=float32]
[2729077 values with dtype=float64]
[2729077 values with dtype=float64]
[2729077 values with dtype=float64]
[2729077 values with dtype=float64]
[2729077 values with dtype=float64]
[2729077 values with dtype=float64]
A nice chunk size for object storage is on the order of 100Mb.
time_chunk_size = 672
feature_chunk_size = 30000
81312/672
121.0
nh_chunks = len(dset.feature_id)/feature_chunk_size
nh_chunks
90.96923333333334
nt_chunks = int(np.ceil(len(files)/time_chunk_size))
nt_chunks
340
(time_chunk_size * feature_chunk_size )*8 / 1e6
161.28
... Close enough to 100Mb
Create a function to drop stuff that messes up open_mfdataset
def drop_coords(ds):
ds = ds.drop(['reference_time','feature_id', 'crs'])
return ds.reset_coords(drop=True)
#cluster.close(); client.close()
Create a local dask cluster
#cluster = LocalCluster() # for this first step, having 32 workers is faster
cluster = LocalCluster(n_workers=8, threads_per_worker=1)
cluster
VBox(children=(HTML(value='<h2>LocalCluster</h2>'), HBox(children=(HTML(value='\n<div>\n <style scoped>\n …
client = Client(cluster)
Tell blosc not to use threads since we are using dask to parallelize
numcodecs.blosc.use_threads = False
zarr_step = 'zarr/step'
zarr_chunked = 'zarr/nwm'
zarr_temp = 'zarr/tmp'
Step our way through the dataset, reading one chunk along the time dimension at a time, to avoid dask reading too many chunks before writing and blowing out memory. First time chunk is written to zarr, then others are appended.
from rechunker import rechunk
chunk_mem_factor = 0.8 #fraction of worker memory for each chunk (seems to be the max possible)
worker_mem = cluster.worker_spec[0]['options']['memory_limit']/1e9
max_mem = '{:.2f}GB'.format(chunk_mem_factor * worker_mem)
print(max_mem)
13.50GB
%%time
for i in range(nt_chunks):
print(i)
istart = i * time_chunk_size
istop = int(np.min([(i+1) * time_chunk_size, len(files)]))
ds = xr.open_mfdataset(files[istart:istop], parallel=True,
preprocess=drop_coords, combine='by_coords',
concat_dim='time', join='override')
# add back in the 'feature_id' coordinate removed by preprocessing
ds.coords['feature_id'] = dset.coords['feature_id']
# chunk this step to zarr using rechunker
# remote the temp and step zarr datasets
try:
shutil.rmtree(zarr_temp)
while os.path.exists(zarr_temp): # check if it still exists
pass
except:
pass
try:
shutil.rmtree(zarr_step)
while os.path.exists(zarr_step): # check if it still exists
pass
except:
pass
chunk_plan={}
for var in ds.data_vars:
if len(ds[var].dims)==2:
var_chunk = (time_chunk_size, feature_chunk_size)
chunk_plan[var] = var_chunk
array_plan = rechunk(ds, chunk_plan, max_mem, zarr_step,
temp_store=zarr_temp)
with performance_report(filename="dask-report.html"):
result = array_plan.execute(retries=10)
# read back in the zarr chunk rechunker wrote
ds = xr.open_zarr(zarr_step)
if i==0:
ds.to_zarr(zarr_chunked, consolidated=True, mode='w')
else:
ds.to_zarr(zarr_chunked, consolidated=True, append_dim='time')
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) <timed exec> in <module> ~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options, executor) 301 target_options=target_options, 302 temp_store=temp_store, --> 303 temp_options=temp_options, 304 ) 305 plan = executor.prepare_plan(copy_spec) ~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in _setup_rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options) 381 temp_store_or_group=temp_group, 382 temp_options=options, --> 383 name=name, 384 ) 385 copy_spec.write.array.attrs.update(variable_attrs) # type: ignore ~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name) 484 itemsize, 485 max_mem, --> 486 consolidate_reads=consolidate_reads, 487 ) 488 ~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/algorithm.py in rechunking_plan(shape, source_chunks, target_chunks, itemsize, max_mem, consolidate_reads, consolidate_writes) 127 128 if consolidate_writes: --> 129 write_chunks = consolidate_chunks(shape, target_chunks, itemsize, max_mem) 130 else: 131 write_chunks = tuple(target_chunks) ~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/algorithm.py in consolidate_chunks(shape, chunks, itemsize, max_mem, chunk_limits) 51 chunk_limit_per_axis[n_ax] = shape[n_ax] 52 else: ---> 53 raise ValueError(f"Invalid chunk_limits {chunk_limits}.") 54 55 chunk_mem = itemsize * prod(chunks) ValueError: Invalid chunk_limits (96, 2729077).
The workflow threw an error on the last partial chunk because Rechunker doesn't think the chunk_plan is valid. But it is valid to have a partial last chunk. Here we just rechunk the last partial chunk without rechunker and append it to the overall Zarr dataset.
ds1 = ds.chunk({'feature_id':feature_chunk_size, 'time':time_chunk_size})
ds1.to_zarr('./zarr/last_step', consolidated=True, mode='w')
<xarray.backends.zarr.ZarrStore at 0x7f53d6518530>
ds2 = xr.open_zarr('/usgs/gamone/data2/rsignell/data/NWM2/zarr/last_step', consolidated=True)
ds2.to_zarr(zarr_chunked, consolidated=True, append_dim='time')
<xarray.backends.zarr.ZarrStore at 0x7f544c8b4410>
Check the resulting chunked dataset for correct start time, stop time and for any gaps. If there are no gaps we should get just a single unique value of 3600s for the difference between the hourly time steps.
ds1 = xr.open_zarr('/usgs/gamone/data2/rsignell/data/NWM2/zarr/nwm', consolidated=True)
print(ds1.time[0].values)
print(ds1.time[-1].values)
1993-01-01T00:00:00.000000000 2018-12-31T23:00:00.000000000
d1 = ds1.time.diff(dim='time').values/1e9 # convert datetime64 nanoseconds to seconds
np.unique(d1)
array([3600], dtype='timedelta64[ns]')
#cluster.close(); client.close()
import hvplot.xarray
ds1.streamflow[:,1000].plot()
Matplotlib is building the font cache; this may take a moment.
[<matplotlib.lines.Line2D at 0x7f53f34c1110>]