read all individual referenceFileSystem JSON files and create combined JSON for entire dataset
import fsspec
import xarray as xr
import hvplot.xarray
import metpy
import ujson # fast json
from kerchunk.combine import MultiZarrToZarr
from kerchunk.df import refs_to_dataframe
import kerchunk
json_dir = 's3://esip-qhub/noaa/nwm/grid1km/json'
kerchunk.__version__
'0.1.0+59.g33b00d6.dirty'
For file systems where files are changing, you want skip_instance_cache=True
or else you won't see the changed files
fs_json = fsspec.filesystem('s3', anon=False, skip_instance_cache=True)
Create a dict from the mzz object
year_list = fs_json.glob('esip-qhub/noaa/nwm/grid1km/combined_????.json')
year_list = [f's3://{y}' for y in year_list]
year_list
['s3://esip-qhub/noaa/nwm/grid1km/combined_1979.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1980.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1981.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1982.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1983.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1984.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1985.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1986.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1987.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1988.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1989.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1990.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1991.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1992.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1993.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1994.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1995.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1996.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1997.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1998.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_1999.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2001.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2002.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2003.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2004.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2005.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2006.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2007.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2008.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2009.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2010.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2011.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2012.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2013.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2014.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2015.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2016.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2017.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2019.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_2020.json']
print(len(year_list))
40
year_dict={}
year_dict['a'] = year_list[:10]
year_dict['b'] = year_list[10:20]
year_dict['c'] = year_list[20:30]
year_dict['d'] = year_list[30:40]
def key_combine(key):
combined_json = f's3://esip-qhub/noaa/nwm/grid1km/combined_{key}.json'
mzz = MultiZarrToZarr(year_dict[key],
remote_protocol = 's3',
remote_options = dict(anon=True),
concat_dims = ['time'],
identical_dims=["x", "y", "crs"],
preprocess = kerchunk.combine.drop("reference_time"))
d = mzz.translate()
with fs_json.open(combined_json, 'wb') as f:
f.write(ujson.dumps(d).encode());
#year_dict['martin'] = year_list[:13]
%%time
#key_combine('martin')
%%time
#key_combine('b')
%%time
#key_combine('c')
%%time
#key_combine('d')
First list the decadal JSONs
p_list = fs_json.glob('esip-qhub/noaa/nwm/grid1km/combined_?.json')
p_list = [f's3://{y}' for y in p_list]
p_list
['s3://esip-qhub/noaa/nwm/grid1km/combined_a.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_b.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_c.json', 's3://esip-qhub/noaa/nwm/grid1km/combined_d.json']
fs_json.size(p_list[0])/1e9
2.421433087
year_dict['zz'] = p_list
%%time
key_combine('zz')
CPU times: user 15min 37s, sys: 1min 41s, total: 17min 19s Wall time: 30min 14s
refs = 's3://esip-qhub/noaa/nwm/grid1km/combined_zz.json'
fs_json.info(refs)
{'ETag': '"dc8d55582023ec236cc0507dcffc0518-1865"', 'LastModified': datetime.datetime(2023, 3, 28, 9, 42, 35, tzinfo=tzutc()), 'size': 9780943804, 'name': 'esip-qhub/noaa/nwm/grid1km/combined_zz.json', 'type': 'file', 'StorageClass': 'STANDARD', 'VersionId': None, 'ContentType': 'binary/octet-stream'}
refs_to_dataframe(fs_json.open(refs), fs_json.open('s3://esip-qhub/noaa/nwm/grid1km/parquet/refs_test', mode='wb'))
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) Input In [28], in <cell line: 1>() ----> 1 refs_to_dataframe(fs_json.open(refs), fs_json.open('s3://esip-qhub/noaa/nwm/grid1km/parquet/refs_test', mode='wb')) File ~/miniconda3/envs/pangeo/lib/python3.9/site-packages/kerchunk/df.py:138, in refs_to_dataframe(refs, url, storage_options, record_size, categorical_threshold, **kwargs) 135 if "refs" in refs: 136 refs = refs["refs"] --> 138 fs, _ = fsspec.core.url_to_fs(url) 139 fs.makedirs(url, exist_ok=True) 140 fields = get_variables(refs, consolidated=True) File ~/miniconda3/envs/pangeo/lib/python3.9/site-packages/fsspec/core.py:353, in url_to_fs(url, **kwargs) 333 def url_to_fs(url, **kwargs): 334 """ 335 Turn fully-qualified and potentially chained URL into filesystem instance 336 (...) 351 The file-systems-specific URL for ``url``. 352 """ --> 353 chain = _un_chain(url, kwargs) 354 inkwargs = {} 355 # Reverse iterate the chain, creating a nested target_* structure File ~/miniconda3/envs/pangeo/lib/python3.9/site-packages/fsspec/core.py:306, in _un_chain(path, kwargs) 302 def _un_chain(path, kwargs): 303 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word 304 bits = ( 305 [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")] --> 306 if "::" in path 307 else [path] 308 ) 309 # [[url, protocol, kwargs], ...] 310 out = [] File ~/miniconda3/envs/pangeo/lib/python3.9/site-packages/fsspec/spec.py:1754, in AbstractBufferedFile.__next__(self) 1753 def __next__(self): -> 1754 out = self.readline() 1755 if out: 1756 return out File ~/miniconda3/envs/pangeo/lib/python3.9/site-packages/fsspec/spec.py:1751, in AbstractBufferedFile.readline(self) 1745 def readline(self): 1746 """Read until first occurrence of newline character 1747 1748 Note that, because of character encoding, this is not necessarily a 1749 true line ending. 1750 """ -> 1751 return self.readuntil(b"\n") File ~/miniconda3/envs/pangeo/lib/python3.9/site-packages/fsspec/spec.py:1734, in AbstractBufferedFile.readuntil(self, char, blocks) 1732 while True: 1733 start = self.tell() -> 1734 part = self.read(blocks or self.blocksize) 1735 if len(part) == 0: 1736 break File ~/miniconda3/envs/pangeo/lib/python3.9/site-packages/fsspec/spec.py:1694, in AbstractBufferedFile.read(self, length) 1692 length = -1 if length is None else int(length) 1693 if self.mode != "rb": -> 1694 raise ValueError("File not in read mode") 1695 if length < 0: 1696 length = self.size - self.loc ValueError: File not in read mode