This notebook demonstrates how to convert a netCDF dataset stored as a collection of HDF5 files in blob storage to Zarr format.
import argparse
import itertools
import adlfs
import dask.array
import fsspec
import h5py
import numpy as np
import xarray as xr
import zarr
We're working with the Daymet dataset. There are a few aspects of this dataset that matter for the conversion to Zarr.
tmin
, tmax
, prcp
, etc.). These are indexed by (time, x, y)
.{variable}_{year}_{region}.nc4
.So the raw data is like
daymetv3_dayl_1980_hawaii.nc4
daymetv3_tmax_1980_hawaii.nc4
...
daymetv3_tmax_1980_na.nc4
...
daymetv3_tmax_2010_hawaii.nc4
And here's the repr for the dataset that we'll generate for Hawaii.
<xarray.Dataset>
Dimensions: (nv: 2, time: 14600, x: 284, y: 584)
Coordinates:
lat (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
lon (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
* time (time) datetime64[ns] 1980-01-01T12:00:00 ... 20...
* x (x) float32 -5.802e+06 -5.801e+06 ... -5.519e+06
* y (y) float32 -3.9e+04 -4e+04 ... -6.21e+05 -6.22e+05
Dimensions without coordinates: nv
Data variables:
dayl (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
lambert_conformal_conic float32 ...
prcp (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
srad (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
swe (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
time_bnds (time, nv) datetime64[ns] dask.array<chunksize=(14600, 2), meta=np.ndarray>
tmax (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
tmin (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
vp (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
yearday (time) int16 dask.array<chunksize=(14600,), meta=np.ndarray>
Our overall strategy is to
attrs
we whish to preserve (global attributes and per-variable attributes)lat
, lon
, or lambert_conformal_conic
that not indexed by time
. We only need to copy them once.This is a deliberately low-level strategy that's attempting to do the conversion as efficiently as possible. In particular, we avoid any expensive inference operations in xarray, and we've set up the problem to be as parallel as possible.
We have a few helpers. These do the bulk of the work of actually copying the data from HDF5 to Zarr.
The most important aspect to notice is that we pass in an offset
. Recall that each source file is in a separate netCDF file. But our output Zarr store
will contain data for all the years. offset
tells us where in the output array this data goes.
# -----------------------------------------------------------------------------
# utils
def process_file(file, offset, url, storage_options=None):
"""
Copy the data for a single variable for a single year.
"""
storage_options = storage_options or {}
fs = adlfs.AzureBlobFileSystem(account_name="daymeteuwest")
h5file = h5py.File(fs.open(file))
group = zarr.group(fsspec.get_mapper(url, **storage_options))
variable = file.split("_")[2]
dataset = h5file[variable]
slices = dask.array.core.slices_from_chunks(dask.array.empty_like(dataset).chunks)
for slice_ in slices:
time_slice, *rest = slice_
time_slice = slice(time_slice.start + offset, time_slice.stop + offset)
target_slice = (time_slice,) + tuple(rest)
group[variable][target_slice] = dataset[slice_]
process_expanding
needs to be called once per variable per year. This updates variables like yearday
that are indexed by time
.
However, they aren't (nescesesarily?) chunked, and so we can't simply use process_file
.
def process_expanding(variable, files, offsets, url, storage_options=None):
"""
Write the all the years for an "expanding" variable like time.
"""
storage_options = storage_options or {}
fs = adlfs.AzureBlobFileSystem(account_name="daymeteuwest")
for file, offset in zip(files, offsets):
h5file = h5py.File(fs.open(file))
group = zarr.group(fsspec.get_mapper(url, **storage_options))
dataset = h5file[variable]
slice_ = (slice(offset, dataset.shape[0] + offset),)
if dataset.ndim > 1:
slice_ += tuple(slice(i) for i in dataset.shape[1:])
group[variable][slice_] = dataset
def decode_encode_attr(v):
if isinstance(v, bytes):
v = v.decode("utf-8") # is this always utf-8?
return xr.backends.zarr.encode_zarr_attr_value(v)
This section defines all the static metadata like the URL of the source data, the various varibles. We just know this ahead of time.
ACCOUNT_NAME = "daymeteuwest"
# TODO: Think about what's static vs. dynamic.
# VARIABLES, REGIONS, and YEARS could be dynamic.
VARIABLES = [
"tmin",
"tmax",
"prcp",
"srad",
"vp",
"swe",
"dayl",
]
YEARS = list(range(1980, 2020))
REGIONS = ["na", "hawaii", "puertorico"]
# These are all static properties of the dataset. Must be provided
# by the user.
# Coordinates in xarray parlance.
COORDINATES = ["lat", "lon", "time", "x", "y"]
# These are dimensions in xarray's data model, but they aren't our
# main variables of interest. They are the same for all the variables
# within a year, so we only need to copy them once per year.
EXTRA_VARIABLBES = ["lambert_conformal_conic", "time_bnds", "yearday"]
# These variables expand in time. They must be copied each year.
EXPANDING = {"time", "time_bnds", "yearday"}
missing_value = -9999.0
# These are attrs that are dropped by xarray. I discovered these
# by manually loading with `open_mdfdataset` on a couple of years
# and merging.
ATTRS_DROP = {
"_FillValue",
"DIMENSION_LIST",
"_nc3_strict",
"CLASS",
"NAME",
"_Netcdf4Dimid",
"REFERENCE_LIST",
}
def year_key(x):
return x.split("_")[3]
We're writing to Azure Blob Storage. We'll use fsspec and adlfs to create the mapping for the Zarr store. You could replace this with any fsspec-compatible URL.
import os
region = "hawaii"
account_name = "daymeteuwest"
store_url = "az://daymet-zarr/daymet_v3_{region}.zarr"
sas_token = os.environ['SAS_TOKEN'] # Grants permission to write the Azure Blob container
url = store_url.format(region=region)
storage_options = dict(
account_name=account_name,
sas_token=sas_token,
)
store = fsspec.get_mapper(url, **storage_options)
group = zarr.group(store)
# -------------------------------------------------------------------------
# Metadata Processing
# Gather a bit of metadata ahead of time.
fs = adlfs.AzureBlobFileSystem(account_name=ACCOUNT_NAME)
Now we gather all of the attrs
that xarray typically handles for us. For the most part, this involves opening up the HDF5 dataset for each variable, copying its attrs (after decoding the bytes), and then fixing a few up.
We also handle the attrs for the dataset as a whole. To verify this, I used xarray to merge the data for a couple years and compared my attrs against xarray.
The most important addition here is the _ARRAY_DIMENSIONS
variable. This is a convention used by xarray when loading datasets from Zarr. I've explicitly created them by looking at what xarray did. I suspect they can be discovered from the dataset.
templates = [x for x in files if "1980" in x]
# These are expected by xarray when reading a Zarr file.
# I don't know how best to discover these dynamically.
ATTRS_ARRAY_DIMENSIONS = {
"time_bnds": ["time", "nv"],
"srad": ["time", "y", "x"],
"tmin": ["time", "y", "x"],
"time": ["time"],
"swe": ["time", "y", "x"],
"dayl": ["time", "y", "x"],
"prcp": ["time", "y", "x"],
"x": ["x"],
"y": ["y"],
"vp": ["time", "y", "x"],
"tmax": ["time", "y", "x"],
"yearday": ["time"],
"lat": ["y", "x"],
"lon": ["y", "x"],
}
# Stage 1: Attrs handling
global_attrs = {
k: decode_encode_attr(v)
for k, v in h5py.File(fs.open(templates[0])).attrs.items()
if k not in ATTRS_DROP
}
attrs = {".zattrs": global_attrs}
for file in templates:
h5file = h5py.File(fs.open(file))
variable = file.split("_")[2]
dataset = h5file[variable]
attrs[variable] = {
k: decode_encode_attr(v)
for k, v in dataset.attrs.items()
if k not in ATTRS_DROP
}
attrs[variable]["_ARRAY_DIMENSIONS"] = ATTRS_ARRAY_DIMENSIONS[variable]
attrs[variable]["missing_value"] = missing_value
h5file = h5py.File(fs.open(templates[0]))
for variable in EXTRA_VARIABLBES + COORDINATES:
dataset = h5file[variable]
attrs[variable] = {
k: decode_encode_attr(v)
for k, v in dataset.attrs.items()
if k not in ATTRS_DROP
}
attrs[variable]["_ARRAY_DIMENSIONS"] = ATTRS_ARRAY_DIMENSIONS.get(variable, [])
if "_FILL_VALUE" in attrs[variable]:
attrs[variable]["missing_value"] = missing_value
# final attrs fixups.
# I have no idea how xarray gets these.
attrs["time_bnds"].update(
calendar="proleptic_gregorian", units="days since 1980-01-01 00:00:00"
)
attrs["time"].update(units="days since 1980-01-01T00:00:00+00:00")
We now will "preallocate" the Zarr groups, one for each variable, using the netCDF files from 1980 as a "template". This is fast, since we just write the .zarray
and .zattrs
files. We aren't actually copying any data yet.
Note that we do a bit of work here to get "nice" chunks. The chunks in the HDF5 file were a bit small for my taste (something like 4-8 MB if memory serves). We use
dask.array
to infer a good chunksize. We also explicitly set the chunking along the first dimension (time
) to be 365
. This works well for our problem, and results
in uniform chunks for each year.
Note that we use the full_shape
, which covers all the years, rather than the shape of the dataset (which only covers one year).
for file in templates: # this is fast, no need to parallelize
h5file = h5py.File(fs.open(file))
variable = file.split("_")[2]
dataset = h5file[variable]
chunks = dask.array.empty_like(dataset).chunksize
# We explicitly use a chunksize of 365 on the time axis.
# This makes it easier to append in the future, since the chunks will always be
# uniform. We won't have to rechunk a dangling chunk of data from the previous year.
chunks = (365,) + chunks[1:]
v = group.empty(
dataset.name, shape=full_shape, dtype=dataset.dtype, chunks=chunks, overwrite=True
)
v.attrs.update(attrs[dataset.name.lstrip("/")])
Next, we process the coordinates (like time
, x
, y
) and "extra variables" (like lambert_conformal_conic
) -- there's probably a better name for the "extra variables".
We do handle these differently based on whether they're indexed by time
("expanding"). If they are expanding, we again pre-allocate the full array shape, rather than
the shape from just this file (which represents a year).
If a coordinate is not expanding (i.e. it's static for every year, like lat
) then we just copy it now. This is the first time we're actually copying some data, rather
than just metadata! This could be parallized, but doesn't take too long. I'm also not sure what the best practices are for chunking coordinate dimensions.
We also handle lambert_conformal_conic
special since it's a scalar.
h5file = h5py.File(fs.open(files[0]))
for coord in (
COORDINATES + EXTRA_VARIABLBES
): # this is slowish. Maybe parallize on 1 machine.
dataset = h5file[coord]
print("handle", coord)
if coord == "lambert_conformal_conic":
# scalar!
group[dataset.name] = np.array(dataset)
v = group[dataset.name]
else:
if coord not in EXPANDING:
shape = dataset.shape
else:
shape = (full_shape[0],) + dataset.shape[1:]
shape = tuple(int(x) for x in shape) # NumPy ints -> Python ints
# We explicitly *don't* want this chunked, and so we specify chunks=shape
# TODO: figure out a good chunking for NA. Its `lat` array is 250MB
v = group.empty(
dataset.name, shape=shape, dtype=dataset.dtype, overwrite=True, chunks=shape
)
if coord not in EXPANDING:
# Actually read some static data
v[:] = np.array(dataset)
v.attrs.update(attrs[dataset.name.lstrip("/")])
handle lat handle lon handle time handle x handle y handle lambert_conformal_conic handle time_bnds handle yearday
The raw data is split into separate files by year like prcp_1980_hawaii.zarr
, prcp_1981_hawaii.zarr
. Ideally all of the prcp
data will be stored in a single Zarr Group.
One option would be to write the first group and then serially append
to that dataset year by year. This should work, but would be slow since we need to wait for 1980 to finish
before we can start on 1981, and so on.
To maximize parallelism, we pre-compute a list of offsets
. This indicates the starting index for each year. By discovering these ahead of time, we can write to the Zarr
store in parallel. (The offsets happen to be 365 for every year, so we could just define this like 365 * np.aranage(len(YEARS))
. But discovering this from
the data isn't too expensive.
files = fs.glob(f"daymet/daymet_v3_*_{region}.nc4")
dayl_files = [x for x in files if "dayl" in x]
shapes = [h5py.File(fs.open(f))["dayl"].shape for f in dayl_files]
full_shape = (np.array(shapes)[:, 0].sum(),) + shapes[0][1:]
offsets = np.cumsum(np.array(shapes)[:, 0])
offsets -= offsets[0]
We're now ready to copy the data. This is the expensive step so we've taken care to make it easy to parallelize. We'll use the process_file
from earlier, which will be mapped over all the (variable, year)
combinations. The only trick here is ensuring that we assocate the right offset
with each year
. So we group by year and then index into offsets
.
# Stage 3: Process
batches = itertools.groupby(sorted(files, key=year_key), year_key)
params = []
for i, (_, files_) in enumerate(batches):
for file in files_:
params.append((file, offsets[i]))
files2, offsets2 = zip(*params)
files2[:21:7], offsets2[:21:7]
(('daymet/daymet_v3_dayl_1980_hawaii.nc4', 'daymet/daymet_v3_dayl_1981_hawaii.nc4', 'daymet/daymet_v3_dayl_1982_hawaii.nc4'), (0, 365, 730))
We could parallize this in any number of ways. We've set stuff up to work with the concurrent.futures
api, which Dask implements. But we could also use Azure Batch at this point.
The memory usage on each worker will be modest. Just the maximum size of a chunk (something like 250 Mb) times the number of tasks it's running concurrently.
from dask_gateway import GatewayCluster
from distributed import wait
cluster = GatewayCluster()
cluster.adapt(minimum=1, maximum=50)
client = cluster.get_client()
cluster
futures = client.map(process_file, files2, offsets2, url=url, storage_options=storage_options)
We also need to copy the data from the EXPANDING
variables, which grow in time but aren't chunked.
These can be done in parallel by variable but not by time (since they aren't chunked).
dayl_params = [x for x in params if "dayl" in x[0]]
dayl_files, dayl_offsets = zip(*dayl_params)
extra_futures = client.map(
process_expanding, EXPANDING, files=dayl_files, offsets=offsets, url=url, storage_options=storage_options
)
_ = wait(futures + extra_futures)
As usual, we want to consolidate the zarr metadata.
# Stage 4: Finalize
zarr.consolidate_metadata(store)
<zarr.hierarchy.Group '/'>
At this point, we should have a cloud-optimized, analysis ready dataset that can be read by Zarr or Xarray.
import xarray as xr
import fsspec
Note: you'll want a version of adlfs with these two pull requests included
import fsspec
import xarray as xr
store = fsspec.get_mapper(
"az://daymet-zarr/daymet_v3_hawaii.zarr",
account_name="daymeteuwest",
)
%time ds = xr.open_zarr(store, consolidated=True)
CPU times: user 187 ms, sys: 31.3 ms, total: 219 ms Wall time: 555 ms
ds
<xarray.Dataset> Dimensions: (nv: 2, time: 14600, x: 284, y: 584) Coordinates: lat (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray> lon (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray> * time (time) datetime64[ns] 1980-01-01T12:00:00 ... 20... * x (x) float32 -5.802e+06 -5.801e+06 ... -5.519e+06 * y (y) float32 -3.9e+04 -4e+04 ... -6.21e+05 -6.22e+05 Dimensions without coordinates: nv Data variables: dayl (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray> lambert_conformal_conic float32 ... prcp (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray> srad (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray> swe (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray> time_bnds (time, nv) datetime64[ns] dask.array<chunksize=(14600, 2), meta=np.ndarray> tmax (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray> tmin (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray> vp (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray> yearday (time) int16 dask.array<chunksize=(14600,), meta=np.ndarray>
|
|
array(['1980-01-01T12:00:00.000000000', '1980-01-02T12:00:00.000000000', '1980-01-03T12:00:00.000000000', ..., '2019-12-29T12:00:00.000000000', '2019-12-30T12:00:00.000000000', '2019-12-31T12:00:00.000000000'], dtype='datetime64[ns]')
array([-5802250., -5801250., -5800250., ..., -5521250., -5520250., -5519250.], dtype=float32)
array([ -39000., -40000., -41000., ..., -620000., -621000., -622000.], dtype=float32)
|
array(-32767.)
|
|
|
|
|
|
|
|
This is a lot of code. Some of it can certainly be abstracted and put in a library (pangeo-forge?). Some of it is specific to the dataset. But hopefully we can establish some best practices.