A full archive of ERA5 reanalysis data has been made available as a free to access public data set on AWS. To aid effiecent cloud access to this data the provider has reproduced the native netcdf variables as Zarr stores. While this is convenient for the end user the burden of processing and the cost duplicating the original data is significant.
Kerchunk seeks to provide some middle ground to this by creating a sidecar file to go along with the native netcdf dataset. While this requires some preprocessing from the data provider, it does not require any duplicating of data. Only a single 200 MB sidecar file is neccessary to allow severless, zarr-like, access to the native Netcdf datasets.
import fsspec
import ujson
import xarray as xr
import hvplot.xarray
import panel as pn
import cartopy.crs as ccrs
import zstandard
import configparser
from pathlib import Path
import os
from kerchunk.combine import MultiZarrToZarr, auto_dask, JustLoad
Kerchunk is a new package and changes frequently, it is best to install from the main branch at https://github.com/fsspec/kerchunk.git
import kerchunk
print(kerchunk.__version__)
0.1.0+8.gff16c05
First create a filesystem pointing to the location of 2 meter air temperature files on the S3 bucket. ERA5 is a public data set so anonymous access works fine
#fs = fsspec.filesystem('s3', anon=True, config_kwargs={'connect_timeout':5, 'read_timeout':5})
Next create an S3 file system to save the files, since we will be using Dask workers on a Kubernetes cluster
fs2 = fsspec.filesystem('s3', anon=False, config_kwargs={'connect_timeout':5, 'read_timeout':5})
json_dir = 's3://esip-qhub/noaa/ERA5_jsons'
json_list = fs2.glob(f'{json_dir}/??????air_pressure_at_mean_sea_level.json')
json_list = [f's3://{j}' for j in json_list]
print(len(json_list))
print(json_list[0])
528 s3://esip-qhub/noaa/ERA5_jsons/197901air_pressure_at_mean_sea_level.json
%%time
fs_ = fsspec.filesystem("reference", fo=json_list[0], ref_storage_args={'skip_instance_cache':True},
remote_protocol='s3', remote_options={'anon':True})
m = fs_.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={})
print(ds)
<xarray.Dataset> Dimensions: (time0: 744, lat: 721, lon: 1440) Coordinates: * lat (lat) float32 90.0 89.75 ... -89.75 -90.0 * lon (lon) float32 nan 0.25 0.5 ... 359.5 359.8 * time0 (time0) datetime64[ns] 1979-01-01 ... 197... Data variables: air_pressure_at_mean_sea_level (time0, lat, lon) float32 dask.array<chunksize=(24, 100, 100), meta=np.ndarray> Attributes: institution: ECMWF source: Reanalysis title: ERA5 forecasts CPU times: user 1.73 s, sys: 157 ms, total: 1.89 s Wall time: 4.43 s
All looks good, except for 0 lon being translated to Nan. This is as a result of no fill_value being assigned in the original Netcdf file and zarr defaulting to 0 as a fill_value. We can use the postprocess method to modify this.
import zarr
def modify_fill_value(out):
out_ = zarr.open(out)
out_.lon.fill_value = -999
out_.lat.fill_value = -999
return out
def postprocess(out):
out = modify_fill_value(out)
return out
Now we can use MultiZarrToZarr to combine the individual jsons along the 'time0' dimension. There is a serial method and a new method that uses Dask. Let's try both. So spin up a dask cluster.
import sys, os
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd
aws_profile = 'esip-qhub'
ebd.set_credentials(profile=aws_profile)
aws_region = 'us-west-2'
endpoint = f's3.{aws_region}.amazonaws.com'
ebd.set_credentials(profile=aws_profile, region=aws_region, endpoint=endpoint)
worker_max = 10
client,cluster = ebd.start_dask_cluster(profile=aws_profile, worker_max=worker_max,
region=aws_region, use_existing_cluster=True,
adaptive_scaling=False, wait_for_cluster=False,
worker_profile='Small Worker', propagate_env=True)
Region: us-west-2 Existing Dask clusters: Cluster Index c_idx: 0 / Name: dev.98152e636cb8438ba887b847102dba65 ClusterStatus.RUNNING Starting new cluster. {} Setting Cluster Environment Variable AWS_DEFAULT_REGION us-west-2 Setting Fixed Scaling workers=10 Reconnect client to clear cache client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub): https://nebari.esipfed.org/gateway/clusters/dev.10b3c3f217d94a8dae4475b046d7fff1/status Propagating environment variables to workers Using environment: users/users-pangeo
client
Client-20624e14-9cc8-11ed-80f1-b6500310e0ca
Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
Dashboard: https://nebari.esipfed.org/gateway/clusters/dev.10b3c3f217d94a8dae4475b046d7fff1/status |
%%time
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
target_options = {'anon':False},
concat_dims=['time0'], #this is the dimension along which the individual files will be merged
postprocess = postprocess
)
d = mzz.translate()
CPU times: user 30.8 s, sys: 5.38 s, total: 36.2 s Wall time: 3min 46s
%%time
d = auto_dask(
json_list,
single_driver=JustLoad,
single_kwargs={"storage_options": {"anon": False}},
mzz_kwargs={"postprocess": postprocess, "concat_dims": ["time0"]},
n_batches=worker_max, # give one batch to each worker
remote_protocol="s3",
remote_options={"anon": True}
)
CPU times: user 3.73 s, sys: 749 ms, total: 4.48 s Wall time: 1min 19s
combined_json = 's3://esip-qhub/testing/ERA5/combined.json'
with fs2.open(combined_json, 'wb') as f:
f.write(ujson.dumps(d).encode())
%%time
fs = fsspec.filesystem("reference", fo=combined_json, ref_storage_args={'skip_instance_cache':True},
remote_protocol='s3', remote_options={'anon':True})
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={})
print(ds)
<xarray.Dataset> Dimensions: (time0: 385704, lat: 721, lon: 1440) Coordinates: * lat (lat) float32 90.0 89.75 ... -89.75 -90.0 * lon (lon) float32 0.0 0.25 0.5 ... 359.5 359.8 * time0 (time0) datetime64[ns] 1979-01-01 ... 202... Data variables: air_pressure_at_mean_sea_level (time0, lat, lon) float32 dask.array<chunksize=(24, 100, 100), meta=np.ndarray> Attributes: institution: ECMWF source: Reanalysis title: ERA5 forecasts CPU times: user 5.95 s, sys: 950 ms, total: 6.9 s Wall time: 9.18 s
%%time
da = ds.air_pressure_at_mean_sea_level.sel(time0 = '1979-01-02T12:00:00', method='nearest').load()
da.hvplot.image(cmap = 'viridis', coastline=True, rasterize=True,
projection=ccrs.Orthographic(45, -10), global_extent=True)
CPU times: user 5.48 s, sys: 487 ms, total: 5.97 s Wall time: 27.7 s