Read a collection of GRIB2 files on AWS as a single dataset using the Zarr library, via fsspec's ReferenceFileSystem. This notebook also demonstrates both how to generate the JSON file that fsspec uses, speeding up extracting the metadata from each GRIB2 file using Reference File Maker with a Dask Cluster.
Requires development version of fsspec_reference_maker
pip install --user git+https://github.com/intake/fsspec-reference-maker
import xarray as xr
import hvplot.xarray
import datetime as dt
import dask
import json
import fsspec
from fsspec_reference_maker.grib2 import scan_grib
from fsspec_reference_maker.combine import MultiZarrToZarr
There is a new HRRR forecast every hour, so use forecast hour 1 from past forecasts and then append the latest forecast
fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)
today = dt.datetime.utcnow().strftime('%Y%m%d')
files = fs.glob(f's3://noaa-hrrr-bdp-pds/hrrr.{today}/conus/*wrfsfcf01.grib2')
latest = files[-1].split('/')[3].split('.')[1]
print(latest)
t11z
latest_files = fs.glob(f's3://noaa-hrrr-bdp-pds/hrrr.{today}/conus/hrrr.{latest}.wrfsfc*.grib2')
files.extend(latest_files[2:])
This is for the ESIP qhub: you will need to modify to work elsewhere.
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd
ebd.set_credentials(profile='esip-qhub')
profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 30
client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max,
region=region, use_existing_cluster=True,
adaptive_scaling=False, wait_for_cluster=False,
environment='pangeo', worker_profile='Pangeo Worker',
propagate_env=True)
/home/conda/store/3d745bdbbc77faf1b06381a24d6e593eeec3375ed3ddf4003aa2fc578a214eab-pangeo/lib/python3.9/site-packages/dask_gateway/client.py:21: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead. from distributed.utils import LoopRunner, format_bytes
No Cluster running. Starting new cluster. Setting Fixed Scaling workers=30 Reconnect client to clear cache client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub): https://jupyter.qhub.esipfed.org/gateway/clusters/dev.ab10096f62044b71afd9beb36587a782/status Propagating environment variables to workers
afilter={'typeOfLevel': 'heightAboveGround', 'level': 2}
so = {"anon": True, "default_cache_type": "readahead"}
common = ['time', 'step', 'latitude', 'longitude', 'valid_time']
def gen_json(u):
date = u.split('/')[3].split('.')[1]
name = u.split('/')[5].split('.')[1:3]
outfname = f'{json_dir}{date}.{name[0]}.{name[1]}.json'
out = scan_grib(u, common, so, inline_threashold=100, filter=afilter)
with fs2.open(outfname, "w") as f:
f.write(json.dumps(out))
json_dir = 's3://esip-qhub/noaa/hrrr/jsons/'
fs2 = fsspec.filesystem('s3', anon=False, skip_instance_cache=True)
try:
fs2.rm(json_dir, recursive=True)
except:
pass
urls = [f's3://{file}' for file in files]
#so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
%%time
_ = dask.compute(*[dask.delayed(gen_json)(u) for u in urls], retries=10);
CPU times: user 292 ms, sys: 99.9 ms, total: 392 ms Wall time: 3min 58s
MultiZarrToZarr()
to combine into single reference¶flist2 = fs2.ls(json_dir)
furls = sorted(['s3://'+f for f in flist2])
print(furls[0])
print(furls[-1])
s3://esip-qhub/noaa/hrrr/jsons/20210903.t00z.wrfsfcf01.json s3://esip-qhub/noaa/hrrr/jsons/20210903.t11z.wrfsfcf18.json
# mzz = MultiZarrToZarr(furls,
# storage_options={'anon':False},
# remote_protocol='s3',
# remote_options={'anon' : 'True'}, #JSON files
# xarray_open_kwargs={
# 'decode_cf' : False,
# 'mask_and_scale' : False,
# 'decode_times' : False,
# 'use_cftime' : False,
# 'drop_variables': ['reference_time', 'crs'],
# 'decode_coords' : False
# },
# xarray_concat_args={
# # "data_vars": "minimal",
# # "coords": "minimal",
# # "compat": "override",
# "join": "override",
# "combine_attrs": "override",
# "dim": "time"
# }
# )
mzz = MultiZarrToZarr(furls,
storage_options={'anon':False},
remote_protocol='s3',
remote_options={'anon': True},
xarray_concat_args={'dim': 'valid_time'})
%%time
#%%prun -D multizarr_profile
mzz.translate('hrrr_best.json')
CPU times: user 1.58 s, sys: 350 ms, total: 1.93 s Wall time: 10.6 s
rpath = 's3://esip-qhub-public/noaa/hrrr/hrrr_best.json'
fs2.put_file(lpath='hrrr_best.json', rpath=rpath)
rpath = 's3://esip-qhub-public/noaa/hrrr/hrrr_best.json'
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}
fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds2 = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False))
ds2.data_vars
Data variables: d2m (valid_time, y, x) float32 ... pt (valid_time, y, x) float32 ... r2 (valid_time, y, x) float32 ... sh2 (valid_time, y, x) float32 ... t2m (valid_time, y, x) float32 ... unknown (valid_time, y, x) float32 ...
ds2.t2m
<xarray.DataArray 't2m' (valid_time: 29, y: 1059, x: 1799)> [55249089 values with dtype=float32] Coordinates: heightAboveGround float64 2.0 latitude (y, x) float64 ... longitude (y, x) float64 ... step timedelta64[ns] 01:00:00 time (valid_time) datetime64[ns] 2021-09-03 ... NaT * valid_time (valid_time) datetime64[us] 2021-09-03T01:00:00 ... 20... Dimensions without coordinates: y, x Attributes: (12/33) GRIB_DxInMetres: 3000.0 GRIB_DyInMetres: 3000.0 GRIB_LaDInDegrees: 38.5 GRIB_Latin1InDegrees: 38.5 GRIB_Latin2InDegrees: 38.5 GRIB_LoVInDegrees: 262.5 ... ... GRIB_stepUnits: 1 GRIB_typeOfLevel: heightAboveGround GRIB_units: K long_name: 2 metre temperature standard_name: air_temperature units: K
[55249089 values with dtype=float32]
array(2.)
[1905141 values with dtype=float64]
[1905141 values with dtype=float64]
array(3600000000000, dtype='timedelta64[ns]')
array(['2021-09-03T00:00:00.000000000', '2021-09-03T01:00:00.000000000', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT', 'NaT'], dtype='datetime64[ns]')
array(['2021-09-03T01:00:00.000000', '2021-09-03T02:00:00.000000', '2021-09-03T03:00:00.000000', '2021-09-03T04:00:00.000000', '2021-09-03T05:00:00.000000', '2021-09-03T06:00:00.000000', '2021-09-03T07:00:00.000000', '2021-09-03T08:00:00.000000', '2021-09-03T09:00:00.000000', '2021-09-03T10:00:00.000000', '2021-09-03T11:00:00.000000', '2021-09-03T12:00:00.000000', '2021-09-03T13:00:00.000000', '2021-09-03T14:00:00.000000', '2021-09-03T15:00:00.000000', '2021-09-03T16:00:00.000000', '2021-09-03T17:00:00.000000', '2021-09-03T18:00:00.000000', '2021-09-03T19:00:00.000000', '2021-09-03T20:00:00.000000', '2021-09-03T21:00:00.000000', '2021-09-03T22:00:00.000000', '2021-09-03T23:00:00.000000', '2021-09-04T00:00:00.000000', '2021-09-04T01:00:00.000000', '2021-09-04T02:00:00.000000', '2021-09-04T03:00:00.000000', '2021-09-04T04:00:00.000000', '2021-09-04T05:00:00.000000'], dtype='datetime64[us]')
Hvplot wants lon [-180,180], not [0,360]:
ds2 = ds2.assign_coords(longitude=(((ds2.longitude + 180) % 360) - 180))
ds2.t2m.hvplot.quadmesh(x='longitude', y='latitude', rasterize=True, geo=True,
tiles='OSM', cmap='turbo')
We are reading GRIB2 files, which compress the entire spatial domain as a single chunk. Therefore reading all the time values at a single point actually needs to load and uncompress all the data for that variable. But with a lot of cores, it doesn't take terribly long
%%time
ds2.t2m[:,500,500].hvplot(x='valid_time', grid=True)
CPU times: user 17.2 s, sys: 1.65 s, total: 18.8 s Wall time: 20 s
client.close(); cluster.shutdown()