#!/usr/bin/env python # coding: utf-8 # # Use Kerchunk to open a GEFS forecast collection as an xarray-datatree # Data contained in these grib2 files described here: https://www.nco.ncep.noaa.gov/pmb/products/gens/ # In[2]: get_ipython().run_line_magic('xmode', 'minimal') # In[1]: import os import fsspec from datetime import datetime, timedelta import xarray as xr import ujson import kerchunk from kerchunk.grib2 import scan_grib from kerchunk.combine import MultiZarrToZarr from datatree import DataTree from pathlib import Path # In[3]: xr.__version__ # Create a set of fsspec file systems to read the latest GEFS forecast and write the reference files locally # In[5]: fs_local = fsspec.filesystem('', skip_instance_cache = True) fs_s3 = fsspec.filesystem('s3', anon = True) # **s3://noaa-gefs-pds/gefs.{date}/{cycle}/atmos/pgrb2ap5/gep{ensemble_member}.t{cycle}.pgrb2a.0p50.f{forecast_hour}** # In[9]: dates = sorted(fs_s3.glob(f"s3://noaa-gefs-pds/gefs.*")) print(dates[:2]) print(dates[-1]) # In[15]: Path(dates[0]).name # In[16]: date = Path(dates[-2]).name # yesterday [-2] to ensure run is always complete without more complex logic date # In[18]: cycle = '00' # using the 00z cycle (cycles at 00z, 06z, 12z, 18z) # In[19]: flist = fs_s3.glob(f's3://noaa-gefs-pds/gefs.20230410/{cycle}/atmos/pgrb2ap5/gep*.t00z.pgrb2a.0p50.f000') np_members = len(flist) np_members # In[ ]: flist = fs_s3.glob(f's3://noaa-gefs-pds/gefs.20230410/{cycle}/atmos/pgrb2ap5/gep01.t00z.pgrb2a.0p50.f*') flist # In[ ]: flist = fs_s3.glob(f's3://noaa-gefs-pds/gefs.20230410/{cycle}/atmos/pgrb2ap5/gep11.t*z.pgrb2a.0p50.f000') flist # There are 30 perturbation members in the pgrb2ap5 forecast + 1 control run # In[23]: n_members = np_members + 1 # In[24]: members = [] for i in range(n_members): members.append(str(i).zfill(2)) # In[25]: print(members[0]) print(members[-1]) # In[27]: members = [f'{i:02d}' for i in range(n_members)] # In[28]: print(members[0]) print(members[-1]) # In[29]: s3_so = { 'anon': True, 'skip_instance_cache': True } # Create a unique name for each reference file # In[30]: def make_json_name(file_url, message_number): date = file_url.split('/')[3].split('.')[1] member = file_url.split('/')[-1].split('.')[0] name = ''.join(file_url.split('/')[-1].split('.')[-3:]) return f'{json_dir}{date}_{member}_{name}_message{str(message_number).zfill(2)}.json' # Generate kerchunk reference for all ensemble members across all time steps # In[31]: json_dir = 'individual/' # In[ ]: fs_local.rm(json_dir, recursive = True) fs_local.mkdirs(json_dir, exist_ok=True) # In[32]: try: fs_local.rm(json_dir, recursive = True) except: pass fs_local.mkdir(json_dir) # Build up list of grib2 files to process # In[34]: get_ipython().run_cell_magic('time', '', 'flist = []\nfor i, ensemble_member in enumerate(members):\n if ensemble_member == \'00\':\n files = fs_s3.glob(f"s3://noaa-gefs-pds/{date}/{cycle}/atmos/pgrb2ap5/gec{ensemble_member}*")\n else:\n files = fs_s3.glob(f"s3://noaa-gefs-pds/{date}/{cycle}/atmos/pgrb2ap5/gep{ensemble_member}*")\n files = [f for f in files if f.split(\'.\')[-1] != \'idx\']\n files = sorted([\'s3://\'+f for f in files])\n #print(i, len(files))\n flist.extend(files[:2])\n') # Try scan_grib on one grib file # In[38]: flist[0] # In[39]: get_ipython().run_cell_magic('time', '', 'out = scan_grib(flist[0], storage_options= s3_so)\n') # In[49]: n_messages = len(out) n_messages # In[50]: type(out[0]) # In[ ]: out[0] # In[51]: make_json_name(flist[0],0) # In[ ]: flist[0:10] # Define function to generate single JSONs # In[52]: def gen_json(file_url): out = scan_grib(file_url, storage_options= s3_so) for i, message in enumerate(out): # scan_grib outputs a list containing one reference file per grib message out_file_name = make_json_name(file_url, i) #get name with fs_local.open(out_file_name, "w") as f: f.write(ujson.dumps(message)) #write to file # Serial approach to generating jsons # In[ ]: get_ipython().run_cell_magic('time', '', 'for file in flist:\n gen_json(file)\n') # Use dask to create jsons in parallel # In[53]: import dask # We haven't created a Dask client, so we are not using dask distributed, but we are still using Dask! # In[54]: get_ipython().run_cell_magic('time', '', '_ = dask.compute(*[dask.delayed(gen_json)(f) for f in flist])\n') # Use dask.distributed to create Jsons in parallel # In[55]: from dask.distributed import LocalCluster, Client, performance_report # In[56]: client = Client() client # In[58]: get_ipython().run_cell_magic('time', '', '_ = dask.compute(*[dask.delayed(gen_json)(f) for f in flist])\n') # In[ ]: get_ipython().run_cell_magic('time', '', 'with performance_report(filename="dask-report.html"):\n _ = dask.compute(*[dask.delayed(gen_json)(f) for f in flist])\n') # In[ ]: client # In[ ]: cluster = LocalCluster(n_workers=10, threads_per_worker=8) client = Client(cluster) # Now we combine the messages along the time dimension # In[59]: json_dir = 'combined/' try: fs_local.rm(json_dir, recursive = True) except: pass fs_local.mkdirs(json_dir, exist_ok=True) # In[60]: def combine_time(json_urls, i ): mzz = MultiZarrToZarr(json_urls,concat_dims = ['step']) name = f"combined/{'_'.join(json_urls[0].split('/')[-1].split('_')[:-2])}_message{str(i).zfill(2)}_combined.json" with fs_local.open(name, 'w') as f: f.write(ujson.dumps(mzz.translate())) # Serial approach # In[61]: get_ipython().run_cell_magic('time', '', 'for ensemble_member in members:\n for i in range(n_messages): #there are 71 GRIB messages in a GEFS file\n jsons = fs_local.glob(f"individual/*gep{ensemble_member}*_message{str(i).zfill(2)}.json") + fs_local.glob(f"individual/*gec{ensemble_member}*_message{str(i).zfill(2)}.json")\n combine_time(jsons, i)\n') # Dask approach # In[65]: get_ipython().run_cell_magic('time', '', 'def do_task(i,ensemble_member):\n members = fs_local.glob(f"individual/*gep{ensemble_member}*_message{str(i).zfill(2)}.json") \n controls = fs_local.glob(f"individual/*gec{ensemble_member}*_message{str(i).zfill(2)}.json")\n combine_time(members+controls, i)\n \nfor ensemble_member in members:\n _ = dask.compute(*[dask.delayed(do_task)(i,ensemble_member) for i in range(n_messages)])\n') # Next we can combine these across the ensemble members # In[66]: get_ipython().run_cell_magic('time', '', "json_dir = 'ensemble/'\n\ntry:\n fs_local.rm(json_dir, recursive = True)\nexcept:\n pass\nfs_local.mkdirs(json_dir)\n") # In[67]: def combine_ensemble(json_urls,i): mzz = MultiZarrToZarr(json_urls, remote_protocol='s3', remote_options={'anon':True}, concat_dims = ['number'], identical_dims = ['valid_time', 'longitude', 'latitude', 'step', 'time']) name = f"ensemble/{json_urls[0].split('/')[-1].split('_')[0]}_message{str(i).zfill(2)}_combined.json" with fs_local.open(name, 'w') as f: f.write(ujson.dumps(mzz.translate())) # In[68]: get_ipython().run_cell_magic('time', '', 'for i in range(n_messages):\n jsons = fs_local.glob(f"combined/*_message{str(i).zfill(2)}_combined.json")\n combine_ensemble(jsons, i)\n') # Now we need to determine which messages can be combined into a single dataset # In[69]: def return_ds(d): fs_ = fsspec.filesystem("reference", fo=d, remote_protocol='s3', remote_options={'anon':True}) m = fs_.get_mapper("") return xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) # In[70]: get_ipython().run_cell_magic('time', '', 'jsons = fs_local.glob(f"ensemble/*") \n') # Here we open each reference message and determine what type of vertical level it contains. We will use this later to combine the messages alongs these levels # In[71]: get_ipython().run_cell_magic('time', '', "typeoflevel = {}\nfor i,ref in enumerate(jsons):\n try:\n ds = return_ds(ref)\n dim = [dim for dim in list(ds.dims) if dim not in ['valid_time', 'x', 'y', 'step', 'time','latitude', 'longitude', 'number']] #I manually figure out which dims are common\n typeoflevel[i] = dim[0]\n except:\n print(i, 'not included')\n pass\n") # In[72]: get_ipython().run_cell_magic('time', '', 'groups = {}\nfor key, value in sorted(typeoflevel.items()):\n groups.setdefault(value, []).append(key)\n') # In[ ]: # In[73]: groups = {'isobaricInhPa': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55], 'surface': [56, 60, 61], 'depthBelowLandLayer': [58], 'heightAboveGround': [63, 64, 65, 66]} # We can now use this groups dictionary to combine the compatible messages # In[75]: json_dir = 'groups/' try: fs_local.rm(json_dir, recursive = True) except: pass fs_local.mkdirs(json_dir) # In[76]: def combine_groups(jsons, level): mzz = MultiZarrToZarr(jsons,concat_dims = level) #merge messages across level ref = mzz.translate() date = jsons[0].split('/')[-1].split('_')[0] with fs_local.open(f"groups/GEFS_{date}_{level}.json", "w") as f: f.write(ujson.dumps(ref)) #write to file # In[77]: get_ipython().run_cell_magic('time', '', 'for level in groups:\n jsons = []\n for message in groups[level]:\n jsons.append(fs_local.glob(f"ensemble/*_message{str(message).zfill(2)}_combined.json")[0])\n combine_groups(jsons, level)\n') # This leaves us with 4 reference files which we can now use to open the GEFS data as an xarray-datatree # In[78]: jsons = fs_local.glob(f"groups/*.json") jsons # In[79]: datasets = {} for level in groups: datasets[level] = return_ds(fs_local.glob(f"groups/*{level}*")[0]) # In[80]: dt = DataTree.from_dict(datasets) # In[83]: dt.groups # In[84]: dt['isobaricInhPa'] # In[85]: dt['isobaricInhPa']['t'] # In[87]: import hvplot.xarray # In[ ]: dt['isobaricInhPa'].longitude # In[ ]: # In[89]: ds = dt['isobaricInhPa'] # In[92]: da = ds['t'] # In[94]: da = da.assign_coords(longitude=(((ds.longitude + 180) % 360) - 180)).sortby('longitude') # In[96]: da.sel(step='03:00:00').hvplot(x='longitude', y='latitude', rasterize=True, alpha=1.0, cmap='turbo', global_extent=True, geo=True, tiles='OSM') # In[ ]: # In[ ]: