import warnings
warnings.simplefilter("ignore") # filter some warning messages
import xarray as xr
import hvplot.xarray
import datetime as dt
import pandas as pd
import dask
import panel as pn
import json
import fsspec
from kerchunk.grib2 import scan_grib
from kerchunk.combine import MultiZarrToZarr
import ujson
There is a new HRRR forecast every hour, so use forecast hour 1 from past forecasts and then append the latest forecast
fs_read = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)
tau='01'
try:
today = dt.datetime.utcnow().strftime('%Y%m%d')
last_file = fs_read.glob(f's3://noaa-hrrr-bdp-pds/hrrr.{today}/conus/*wrfsfcf{tau}.grib2')[-2]
except:
today = dt.datetime.now().strftime('%Y%m%d')
last_file = fs_read.glob(f's3://noaa-hrrr-bdp-pds/hrrr.{today}/conus/*wrfsfcf{tau}.grib2')[-2]
print(last_file)
noaa-hrrr-bdp-pds/hrrr.20230514/conus/hrrr.t20z.wrfsfcf01.grib2
end_time = dt.datetime.strptime(last_file, f'noaa-hrrr-bdp-pds/hrrr.%Y%m%d/conus/hrrr.t%Hz.wrfsfcf{tau}.grib2')
print(end_time)
2023-05-14 20:00:00
days_back = 3
start_time = end_time - dt.timedelta(days=days_back)
print(start_time)
print(last_file)
2023-05-11 20:00:00 noaa-hrrr-bdp-pds/hrrr.20230514/conus/hrrr.t20z.wrfsfcf01.grib2
dates = pd.date_range(start=start_time, end=end_time, freq='1h')
files = [date.strftime('s3://noaa-hrrr-bdp-pds/hrrr.%Y%m%d/conus/hrrr.t%Hz.wrfsfcf01.grib2') for date in dates]
glob_pattern_for_latest_forecast = end_time.strftime('s3://noaa-hrrr-bdp-pds/hrrr.%Y%m%d/conus/hrrr.t%Hz.wrfsfcf*.grib2')
print(glob_pattern_for_latest_forecast)
s3://noaa-hrrr-bdp-pds/hrrr.20230514/conus/hrrr.t20z.wrfsfcf*.grib2
latest_files = fs_read.glob(glob_pattern_for_latest_forecast)
latest_files = [f's3://{file}' for file in latest_files]
print(len(latest_files))
19
files.extend(latest_files[2:19])
print(files[0])
print(files[-1])
s3://noaa-hrrr-bdp-pds/hrrr.20230511/conus/hrrr.t20z.wrfsfcf01.grib2 s3://noaa-hrrr-bdp-pds/hrrr.20230514/conus/hrrr.t20z.wrfsfcf18.grib2
Unless you are using the ESIP Qhub, you will need to modify the following cell to create a Dask cluster and supply your Dask workers with the necessary credentials to write to the S3 bucket. Or modify to use a local Dask cluster and write JSON to local disk.
import os
import logging
import configparser
aws_profile = 'esip'
aws_s3_endpoint = 'https://ncsa.osn.xsede.org'
awsconfig = configparser.ConfigParser()
awsconfig.read(
os.path.expanduser('~/.aws/credentials') # default location... if yours is elsewhere, change this.
)
## NOTE: The default will be for the OSN / RENCI profile and endpoint. Override this
## by setting environment variables before executing this cell/notebook.
_profile_nm = os.environ.get('AWS_PROFILE', aws_profile)
_endpoint = os.environ.get('AWS_S3_ENDPOINT', aws_s3_endpoint)
# Set environment vars based on parsed awsconfig
try:
os.environ['AWS_ACCESS_KEY_ID'] = awsconfig[_profile_nm]['aws_access_key_id']
os.environ['AWS_SECRET_ACCESS_KEY'] = awsconfig[_profile_nm]['aws_secret_access_key']
os.environ['AWS_S3_ENDPOINT'] = _endpoint
os.environ['AWS_PROFILE'] = _profile_nm
os.environ['AWS_DEFAULT_PROFILE'] = _profile_nm
os.environ['AWS_S3_REGION'] = _profile_nm
except KeyError:
logging.error("Problem parsing the AWS credentials file. ")
%run ./Start_Dask_Cluster_Nebari.ipynb
The 'cluster' object can be used to adjust cluster behavior. i.e. 'cluster.adapt(minimum=10)' The 'client' object can be used to directly interact with the cluster. i.e. 'client.submit(func)' The link to view the client dashboard is: > https://nebari-workshop.esipfed.org/gateway/clusters/dev.90747e6eb0fd4c73aec0d558f241f672/status
fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True, client_kwargs={'endpoint_url': 'https://ncsa.osn.xsede.org'})
json_dir = 's3://esip/rsignell/hrrr/jsons/'
afilter={'typeOfLevel': 'heightAboveGround', 'level': [2, 10]}
so = {"anon": True, "default_cache_type": "readahead"}
common = ['time', 'step', 'latitude', 'longitude', 'valid_time']
def gen_json(u):
date = u.split('/')[3].split('.')[1]
name = u.split('/')[5].split('.')[1:3]
outfname = f'{json_dir}{date}.{name[0]}.{name[1]}.json'
out = scan_grib(u, common, so, inline_threshold=100, filter=afilter)
with fs_write.open(outfname, "w") as f:
f.write(json.dumps(out))
delete_json_dir = True
if delete_json_dir:
try:
fs_write.rm(json_dir, recursive=True)
except:
pass
print(len(files))
90
%%time
_ = dask.compute(*[dask.delayed(gen_json)(u) for u in files], retries=10);
CPU times: user 149 ms, sys: 15.8 ms, total: 165 ms Wall time: 2min 15s
json_list = fs_write.ls(json_dir)
json_list = sorted(['s3://'+f for f in json_list])
print(len(json_list))
print(json_list[0])
print(json_list[-1])
90 s3://esip/rsignell/hrrr/jsons/20230511.t20z.wrfsfcf01.json s3://esip/rsignell/hrrr/jsons/20230514.t20z.wrfsfcf18.json
r_opts = {'anon':True }
s_opts = {'anon':True, 'skip_instance_cache':True, 'client_kwargs':{'endpoint_url': 'https://ncsa.osn.xsede.org'}}
fs = fsspec.filesystem("reference", fo=json_list[0], ref_storage_args=s_opts,
remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds2 = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False),
chunks={'valid_time':1})
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) Cell In[27], line 3 1 r_opts = {'anon':True } 2 s_opts = {'anon':True, 'skip_instance_cache':True, 'client_kwargs':{'endpoint_url': 'https://ncsa.osn.xsede.org'}} ----> 3 fs = fsspec.filesystem("reference", fo=json_list[0], ref_storage_args=s_opts, 4 remote_protocol='s3', remote_options=r_opts) 5 m = fs.get_mapper("") 7 ds2 = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False), 8 chunks={'valid_time':1}) File /home/conda/users/de6ed281a8ed231abd30494ea6878c74c2ba0b5d97420d312f1923b757857956-20230501-133300-898252-3-pangeo/lib/python3.10/site-packages/fsspec/registry.py:261, in filesystem(protocol, **storage_options) 254 warnings.warn( 255 "The 'arrow_hdfs' protocol has been deprecated and will be " 256 "removed in the future. Specify it as 'hdfs'.", 257 DeprecationWarning, 258 ) 260 cls = get_filesystem_class(protocol) --> 261 return cls(**storage_options) File /home/conda/users/de6ed281a8ed231abd30494ea6878c74c2ba0b5d97420d312f1923b757857956-20230501-133300-898252-3-pangeo/lib/python3.10/site-packages/fsspec/spec.py:76, in _Cached.__call__(cls, *args, **kwargs) 74 return cls._cache[token] 75 else: ---> 76 obj = super().__call__(*args, **kwargs) 77 # Setting _fs_token here causes some static linters to complain. 78 obj._fs_token_ = token File /home/conda/users/de6ed281a8ed231abd30494ea6878c74c2ba0b5d97420d312f1923b757857956-20230501-133300-898252-3-pangeo/lib/python3.10/site-packages/fsspec/implementations/reference.py:377, in ReferenceFileSystem.__init__(self, fo, target, ref_storage_args, target_protocol, target_options, remote_protocol, remote_options, fs, template_overrides, simple_templates, max_gap, max_block, cache_size, **kwargs) 375 logger.info("Read reference from URL %s", fo) 376 text = json.load(f) --> 377 self._process_references(text, template_overrides) 378 else: 379 # Lazy parquet refs 380 self.references = LazyReferenceMapper( 381 fo, 382 fs=ref_fs, 383 cache_size=cache_size, 384 ) File /home/conda/users/de6ed281a8ed231abd30494ea6878c74c2ba0b5d97420d312f1923b757857956-20230501-133300-898252-3-pangeo/lib/python3.10/site-packages/fsspec/implementations/reference.py:626, in ReferenceFileSystem._process_references(self, references, template_overrides) 625 def _process_references(self, references, template_overrides=None): --> 626 vers = references.get("version", None) 627 if vers is None: 628 self._process_references0(references) AttributeError: 'list' object has no attribute 'get'
MultiZarrToZarr()
to combine into single reference¶mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options=dict(anon=True),
concat_dims = ['valid_time'],
identical_dims=['latitude', 'longitude', 'heightAboveGround'])
%%time
#takes the list of files in mzz and writes it to an output file
d = mzz.translate()
mzz_kwargs = dict(concat_dims = ['valid_time'],
identical_dims=['latitude', 'longitude', 'heightAboveGround', 'step'])
opts = dict(anon=True, client_kwargs={'endpoint_url': 'https://ncsa.osn.xsede.org'})
from kerchunk.combine import MultiZarrToZarr, auto_dask, JustLoad
worker_max = 30
%%time
d = auto_dask(
json_list,
single_driver=JustLoad,
single_kwargs={"storage_options": opts},
mzz_kwargs=mzz_kwargs,
n_batches=worker_max, # give one batch to each worker
remote_protocol="s3",
remote_options=opts
)
fs_local = fsspec.filesystem('file')
combined_json = f'hrrr_best.json'
with fs_local.open(combined_json, 'wb') as f:
#create json file and write it to the path specified above (f)
#dumps: dictionary to string
#translate: translate contents of HDF5 to Zarr
f.write(ujson.dumps(d).encode());
rpath = 's3://esip-qhub-public/noaa/hrrr/hrrr_best.json'
fs_write.put_file(lpath=combined_json, rpath=rpath)
fs_write.size(rpath)
fs_local.size(combined_json)
#rpath = 's3://esip-qhub-public/noaa/hrrr/hrrr_best.json'
#rpath = rpath
#rpath = combined_json
#_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}
s_opts = {'anon':True, 'skip_instance_cache':True}
fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds2 = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False),
chunks={'valid_time':1})
ds2.valid_time
fs.ls('')
var='t2m'
print(f'Size:{ds2[var].nbytes/1e9} GB')
ds2
ds2.valid_time[0:5]
Hvplot wants lon [-180,180], not [0,360]:
ds2 = ds2.assign_coords(longitude=(((ds2.longitude + 180) % 360) - 180))
now = dt.datetime.utcnow().strftime('%Y-%m-%d %H:00:00')
print(now)
With 30 worker cluster, takes 50 seconds to display, and 15 seconds to change the time step after closing the dask client, it takes 30 seconds to display, 8 seconds to display a time step
ds2[var].sel(valid_time=now).hvplot.quadmesh(x='longitude', y='latitude', geo=True,
rasterize=True, cmap='turbo', title=now)
hvplot has a slider for time steps, but we want a dropdown list, so we use Panel. Let's add a tile layer from Open Street Map also.
extra_dims = list(ds2[var].dims[:-2])
mesh = ds2[var].hvplot.quadmesh(x='longitude', y='latitude', rasterize=True, geo=True, tiles='OSM', cmap='turbo')
pn.panel(mesh, widgets={k: pn.widgets.Select for k in extra_dims})
We are reading GRIB2 files, which compress the entire spatial domain as a single chunk. Therefore reading all the time values at a single point actually needs to load and uncompress all the data for that variable.
%%time
ds2[var][:,500,500].hvplot(x='valid_time', grid=True)
%%time
da = ds2[var].mean('valid_time').compute()
da.hvplot.quadmesh(x='longitude', y='latitude', rasterize=True, geo=True, tiles='OSM', cmap='turbo')
client.close(); cluster.shutdown()