The National Water Model writes a new NetCDF file for each hour, resulting in 8760 files for a year! Here's how we are convering bunchs of little NetCDF files to Zarr.
In theory, this would be a simple as:
import xarray as xr
ds = xr.open_mfdataset('*.nc')
ds.to_zarr('all_nc.zarr', consolidated=True)
In practice, we usually want to rechunk and xarray has issues with certain NetCDF elements, and it's a bit more complicated....
import numpy as np
import xarray as xr
import pandas as pd
import numcodecs
from dask.distributed import Client, progress, LocalCluster
Build a list of filenames for open_mfdataset
dates = pd.date_range(start='2017-01-01 00:00',end='2017-12-31 23:00', freq='1h')
files = ['./nc/{}/{}.CHRTOUT_DOMAIN1.comp'.format(date.strftime('%Y'),date.strftime('%Y%m%d%H%M')) for date in dates]
len(files)
8760
dset = xr.open_dataset(files[0])
dset
<xarray.Dataset> Dimensions: (feature_id: 2729077, reference_time: 1, time: 1) Coordinates: * time (time) datetime64[ns] 2017-01-01 * reference_time (reference_time) datetime64[ns] 2016-10-01 * feature_id (feature_id) int32 101 179 181 ... 1180001803 1180001804 latitude (feature_id) float32 ... longitude (feature_id) float32 ... Data variables: crs |S1 ... order (feature_id) int32 ... elevation (feature_id) float32 ... streamflow (feature_id) float64 ... q_lateral (feature_id) float64 ... velocity (feature_id) float64 ... qSfcLatRunoff (feature_id) float64 ... qBucket (feature_id) float64 ... qBtmVertRunoff (feature_id) float64 ... Attributes: featureType: timeSeries proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... model_initialization_time: 2016-10-01_00:00:00 station_dimension: feature_id model_output_valid_time: 2017-01-01_00:00:00 model_total_valid_times: 2208 stream_order_output: 1 cdm_datatype: Station Conventions: CF-1.6 code_version: v5.1.0-alpha11 model_output_type: channel_rt model_configuration: retrospective dev_OVRTSWCRT: 1 dev_NOAH_TIMESTEP: 3600 dev_channel_only: 0 dev_channelBucket_only: 0 dev: dev_ prefix indicates development/internal me...
A nice chunk size for object storage is on the order of 100Mb.
time_chunk_size = 672
feature_chunk_size = 30000
len(files)/time_chunk_size
13.035714285714286
nchunks = len(dset.feature_id)/feature_chunk_size
nchunks
90.96923333333334
nt_chunks = int(np.ceil(len(files)/time_chunk_size))
nt_chunks
14
(time_chunk_size * feature_chunk_size )*8 / 1e6
161.28
... Close enough to 100Mb
Create a function to drop stuff that messes up open_mfdataset
def drop_coords(ds):
ds = ds.drop(['reference_time','feature_id', 'crs'])
return ds.reset_coords(drop=True)
Create a local dask cluster
cluster = LocalCluster()
cluster
VBox(children=(HTML(value='<h2>LocalCluster</h2>'), HBox(children=(HTML(value='\n<div>\n <style scoped>\n …
client = Client(cluster)
Tell blosc not to use threads since we are using dask to parallelize
numcodecs.blosc.use_threads = False
Step our way through the dataset, reading one chunk along the time dimension at a time, to avoid dask reading too many chunks before writing and blowing out memory. First time chunk is written to zarr, then others are appended.
%%time
for i in range(nt_chunks):
#for i in range(1):
print(i)
istart = i * time_chunk_size
istop = int(np.min([(i+1) * time_chunk_size, len(files)]))
ds = xr.open_mfdataset(files[istart:istop], parallel=True, preprocess=drop_coords, combine='by_coords',
concat_dim='time')
# add back in the 'feature_id' coordinate removed by preprocessing
ds.coords['feature_id'] = dset.coords['feature_id']
ds1 = ds.chunk(chunks={'time':time_chunk_size, 'feature_id':feature_chunk_size})
if i==0:
ds1.to_zarr('zarr/2017f', consolidated=True, mode='w')
else:
ds1.to_zarr('zarr/2017f', consolidated=True, append_dim='time')
0 1 2 3 4 5 6 7 8 9 10 11 12 13 CPU times: user 40min 35s, sys: 2min 39s, total: 43min 14s Wall time: 1h 59min 41s