#!/usr/bin/env python # coding: utf-8 # # Create combined JSON for each year in parallel # read all individual referenceFileSystem JSON files and create combined JSON for entire dataset # In[1]: import fsspec import xarray as xr import hvplot.xarray import metpy # In[2]: import ujson # fast json from kerchunk.combine import MultiZarrToZarr import kerchunk # In[3]: json_dir = 's3://esip-qhub/noaa/nwm/grid1km/json' # For file systems where files are changing, you want `skip_instance_cache=True` or else you won't see the changed files # In[4]: fs_json = fsspec.filesystem('s3', anon=False, skip_instance_cache=True) # In[5]: year_list = range(1979,2021) # In[6]: def year_combine(year): json_list = fs_json.glob(f'{json_dir}/{year}*.json') json_list = [f's3://{json}' for json in json_list] combined_json = f's3://esip-qhub/noaa/nwm/grid1km/combined_{year}.json' mzz = MultiZarrToZarr(json_list, remote_protocol = 's3', remote_options = dict(anon=True), concat_dims = ['time'], identical_dims=["x", "y", "crs"], preprocess = kerchunk.combine.drop("reference_time")) d = mzz.translate() with fs_json.open(combined_json, 'wb') as f: f.write(ujson.dumps(d).encode()); # Create a dict from the mzz object # In[7]: import sys libDir = r'/shared/users/lib' if libDir not in sys.path: sys.path.append(libDir) # In[ ]: # Activate logging # import logging # logging.basicConfig(level=logging.INFO, force=True) # In[ ]: import ebdpy as ebd import os os.environ['AWS_PROFILE'] = 'esip-qhub' client,cluster = ebd.start_dask_cluster( profile=os.environ['AWS_PROFILE'], worker_max=20, region='us-west-2', use_existing_cluster=True, adaptive_scaling=True, wait_for_cluster=False, propagate_env=True) # In[ ]: import dask.bag as db # In[ ]: b = db.from_sequence(year_list, npartitions=40) # In[ ]: b1 = b.map(year_combine) # In[ ]: get_ipython().run_cell_magic('time', '', 'from dask.distributed import performance_report\nwith performance_report(filename="dask-report-whole.html"):\n b1.compute(retries=10)\n') # #### Examine one of the combined kerchunked dataset # In[8]: year = 2005 combined_json = f's3://esip-qhub/noaa/nwm/grid1km/combined_{year}.json' # In[9]: get_ipython().run_cell_magic('time', '', 's_opts = {\'requester_pays\':True, \'skip_instance_cache\':True}\nr_opts = {\'anon\':True}\nfs = fsspec.filesystem("reference", fo=combined_json, ref_storage_args=s_opts,\n remote_protocol=\'s3\', remote_options=r_opts)\nm = fs.get_mapper("")\nds = xr.open_dataset(m, engine="zarr", chunks={}, backend_kwargs=dict(consolidated=False))\n') # In[10]: ds # In[11]: ds = ds[['ACCET', 'SNEQV', 'FSNO', 'crs']] # In[12]: ds = ds.metpy.parse_cf() # In[13]: crs = ds['ACCET'].metpy.cartopy_crs # In[14]: get_ipython().run_cell_magic('time', '', 'da = ds.ACCET.isel(time=500).load()\n') # In[15]: da.plot() # In[16]: json_list = fs_json.glob(f's3://esip-qhub/noaa/nwm/grid1km/combined_????.json') print(len(json_list)) print(json_list[0]) print(json_list[-1]) # In[ ]: da.hvplot(x='x', y='y', rasterize=True, cmap='turbo', tiles='OSM', alpha=0.7) # In[ ]: