Create ReferenceFileSystem JSON file for a collection of NWM NetCDF files on S3
import os
import fsspec
import ujson # fast json
from fsspec_reference_maker.hdf import SingleHdf5ToZarr
from fsspec_reference_maker.combine import MultiZarrToZarr
import xarray as xr
import dask
import hvplot.xarray
fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)
best_hour='f001'
var = 'channel_rt'
globbing all the files takes a long time (> 5 minutes), so instead, just read the dates and generate 24 files for each date, which of course assumes no missing files
#%%time
#flist = fs.glob(f'noaa-nwm-pds/nwm.*/short_range/nwm.*.short_range.{var}.{best_hour}.conus.nc')
days = fs.glob(f'noaa-nwm-pds/nwm.*')
print(days[0])
print(days[-1])
noaa-nwm-pds/nwm.20210626 noaa-nwm-pds/nwm.20210726
flist=[]
for day in days[2:-2]:
for i in range(24):
flist.append(f'{day}/short_range/nwm.t{i:02d}z.short_range.{var}.{best_hour}.conus.nc')
flist.extend(fs.glob(f'{days[-1]}/short_range/nwm.*.short_range.{var}.{best_hour}.conus.nc'))
fs.size(flist[0])/1e6
13.689784
ds = xr.open_dataset(fs.open(flist[0]))
ds.streamflow.encoding
{'chunksizes': (925580,), 'fletcher32': False, 'shuffle': True, 'zlib': True, 'complevel': 2, 'source': '<File-like object S3FileSystem, noaa-nwm-pds/nwm.20210628/short_range/nwm.t00z.short_range.channel_rt.f001.conus.nc>', 'original_shape': (2776738,), 'dtype': dtype('int32'), 'missing_value': array([-999900], dtype=int32), '_FillValue': array([-999900], dtype=int32), 'scale_factor': array([0.01], dtype=float32), 'add_offset': array([0.], dtype=float32)}
ds.nbytes/1e6
144.390393
print(flist[0])
print(flist[-1])
noaa-nwm-pds/nwm.20210628/short_range/nwm.t00z.short_range.channel_rt.f001.conus.nc noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f001.conus.nc
last_dir = f'{os.path.dirname(flist[-1])}'
last_dir
'noaa-nwm-pds/nwm.20210726/short_range'
last_file = os.path.basename(flist[-1]).split('.')
last_file
['nwm', 't15z', 'short_range', 'channel_rt', 'f001', 'conus', 'nc']
last_files = fs.glob(f'{last_dir}/{last_file[0]}.{last_file[1]}.{last_file[2]}.{var}.*.conus.nc')
last_files
['noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f001.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f002.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f003.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f004.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f005.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f006.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f007.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f008.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f009.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f010.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f011.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f012.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f013.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f014.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f015.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f016.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f017.conus.nc', 'noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f018.conus.nc']
Skip the first of the last_files since it's a duplicate:
flist.extend(last_files[1:])
print(flist[0])
print(flist[-1])
noaa-nwm-pds/nwm.20210628/short_range/nwm.t00z.short_range.channel_rt.f001.conus.nc noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f018.conus.nc
We need to include the "s3://" prefix to the list of files so that fsspec will recognize that these JSON files are on S3. There is no "storage_
urls = ["s3://" + f for f in flist]
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
print(urls[0])
print(urls[-1])
s3://noaa-nwm-pds/nwm.20210628/short_range/nwm.t00z.short_range.channel_rt.f001.conus.nc s3://noaa-nwm-pds/nwm.20210726/short_range/nwm.t15z.short_range.channel_rt.f018.conus.nc
fs.size(urls[10])
13717114
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd
ebd.set_credentials(profile='esip-qhub')
profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 30
client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max,
region=region, use_existing_cluster=True,
adaptive_scaling=False, wait_for_cluster=False,
environment='pangeo', worker_profile='Pangeo Worker',
propagate_env=True)
/home/conda/store/578732b5a39dadc3cadc71a29a33e58ded03e8a9d5c888edac6151d80eda2868-pangeo/lib/python3.8/site-packages/dask_gateway/client.py:21: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead. from distributed.utils import LoopRunner, format_bytes
Existing Dask clusters: Cluster Index c_idx: 0 / Name: dev.ce7242a88a1847958d0b4ac42dad0850 ClusterStatus.RUNNING Using existing cluster [0]. Setting Fixed Scaling workers=30 Reconnect client to clear cache client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub): https://jupyter.qhub.esipfed.org/gateway/clusters/dev.ce7242a88a1847958d0b4ac42dad0850/status Propagating environment variables to workers
We passed AWS credentials to the Dask workers via environment variables above, and the dask workers don't have the AWS credentials file with profiles defined, so we don't define a profile here, we just set anon=False
and let the workers find the credentials via the environment variables:
fs2 = fsspec.filesystem('s3', anon=False)
If the directory exists, remove it (and all the files)
json_dir = 's3://esip-qhub/usgs/nwm_forecast/jsons/'
try:
fs2.rm(json_dir, recursive=True)
except:
pass
def gen_json(u):
with fs.open(u, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
p = u.split('/')
date = p[3]
fname = p[5]
outf = f'{json_dir}{date}.{fname}.json'
print(outf)
with fs2.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode());
%%time
dask.compute(*[dask.delayed(gen_json)(u) for u in urls], retries=10);
CPU times: user 318 ms, sys: 35.6 ms, total: 354 ms Wall time: 1min 7s
(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
flist2 = fs2.ls(json_dir)
furls = sorted(['s3://'+f for f in flist2])
furls[0]
's3://esip-qhub/usgs/nwm_forecast/jsons/nwm.20210628.nwm.t00z.short_range.channel_rt.f001.conus.nc.json'
len(furls)
681
fs.size(flist[0])
13689784
mzz = MultiZarrToZarr(furls[-240:],
storage_options={'anon':False},
remote_protocol='s3',
remote_options={'anon' : 'True'}, #JSON files
xarray_open_kwargs={
'decode_cf' : False,
'mask_and_scale' : False,
'decode_times' : False,
'use_cftime' : False,
'drop_variables': ['reference_time', 'crs'],
'decode_coords' : False
},
xarray_concat_args={
# "data_vars": "minimal",
# "coords": "minimal",
# "compat": "override",
"join": "override",
"combine_attrs": "override",
"dim": "time"
}
)
%%time
#%%prun -D multizarr_profile
mzz.translate('nwm.json')
CPU times: user 15.5 s, sys: 1.69 s, total: 17.2 s Wall time: 53.3 s
rpath = 's3://esip-qhub/usgs/forecast/nwm.json'
fs2.put_file(lpath='nwm.json', rpath=rpath)
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}
fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr")
ds
<xarray.Dataset> Dimensions: (feature_id: 2776738, time: 240) Coordinates: * feature_id (feature_id) float64 101.0 179.0 181.0 ... 1.18e+09 1.18e+09 * time (time) datetime64[ns] 2021-07-16T10:00:00 ... 2021-07-27T... Data variables: nudge (time, feature_id) float64 ... qBtmVertRunoff (time, feature_id) float64 ... qBucket (time, feature_id) float64 ... qSfcLatRunoff (time, feature_id) float64 ... streamflow (time, feature_id) float64 ... velocity (time, feature_id) float64 ... Attributes: (12/20) Conventions: CF-1.6 NWM_version_number: v2.1 TITLE: OUTPUT FROM NWM v2.1 _NCProperties: version=2,netcdf=4.7.4,hdf5=1.10.6, cdm_datatype: Station code_version: v5.2.0-beta2 ... ... model_output_type: channel_rt model_output_valid_time: 2021-07-16_10:00:00 model_total_valid_times: 18 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... station_dimension: feature_id stream_order_output: 1
array([1.010000e+02, 1.790000e+02, 1.810000e+02, ..., 1.180002e+09, 1.180002e+09, 1.180002e+09])
array(['2021-07-16T10:00:00.000000000', '2021-07-16T11:00:00.000000000', '2021-07-16T12:00:00.000000000', ..., '2021-07-27T07:00:00.000000000', '2021-07-27T08:00:00.000000000', '2021-07-27T09:00:00.000000000'], dtype='datetime64[ns]')
[666417120 values with dtype=float64]
[666417120 values with dtype=float64]
[666417120 values with dtype=float64]
[666417120 values with dtype=float64]
[666417120 values with dtype=float64]
[666417120 values with dtype=float64]
%%time
ds.streamflow[:,1000].hvplot(x='time', grid=True)
CPU times: user 4.46 s, sys: 296 ms, total: 4.75 s Wall time: 6.93 s
cluster.shutdown(); client.close()