Data contained in these grib2 files described here: https://www.nco.ncep.noaa.gov/pmb/products/gens/
%xmode minimal
Exception reporting mode: Minimal
import os
import fsspec
from datetime import datetime, timedelta
import xarray as xr
import ujson
import kerchunk
from kerchunk.grib2 import scan_grib
from kerchunk.combine import MultiZarrToZarr
from datatree import DataTree
from pathlib import Path
xr.__version__
'2023.3.0'
Create a set of fsspec file systems to read the latest GEFS forecast and write the reference files locally
fs_local = fsspec.filesystem('', skip_instance_cache = True)
fs_s3 = fsspec.filesystem('s3', anon = True)
s3://noaa-gefs-pds/gefs.{date}/{cycle}/atmos/pgrb2ap5/gep{ensemble_member}.t{cycle}.pgrb2a.0p50.f{forecast_hour}
dates = sorted(fs_s3.glob(f"s3://noaa-gefs-pds/gefs.*"))
print(dates[:2])
print(dates[-1])
['noaa-gefs-pds/gefs.20170101', 'noaa-gefs-pds/gefs.20170102'] noaa-gefs-pds/gefs.20230411
Path(dates[0]).name
'gefs.20170101'
date = Path(dates[-2]).name # yesterday [-2] to ensure run is always complete without more complex logic
date
'gefs.20230410'
cycle = '00' # using the 00z cycle (cycles at 00z, 06z, 12z, 18z)
flist = fs_s3.glob(f's3://noaa-gefs-pds/gefs.20230410/{cycle}/atmos/pgrb2ap5/gep*.t00z.pgrb2a.0p50.f000')
np_members = len(flist)
np_members
30
flist = fs_s3.glob(f's3://noaa-gefs-pds/gefs.20230410/{cycle}/atmos/pgrb2ap5/gep01.t00z.pgrb2a.0p50.f*')
flist
flist = fs_s3.glob(f's3://noaa-gefs-pds/gefs.20230410/{cycle}/atmos/pgrb2ap5/gep11.t*z.pgrb2a.0p50.f000')
flist
There are 30 perturbation members in the pgrb2ap5 forecast + 1 control run
n_members = np_members + 1
members = []
for i in range(n_members):
members.append(str(i).zfill(2))
print(members[0])
print(members[-1])
00 30
members = [f'{i:02d}' for i in range(n_members)]
print(members[0])
print(members[-1])
00 30
s3_so = {
'anon': True,
'skip_instance_cache': True
}
Create a unique name for each reference file
def make_json_name(file_url, message_number):
date = file_url.split('/')[3].split('.')[1]
member = file_url.split('/')[-1].split('.')[0]
name = ''.join(file_url.split('/')[-1].split('.')[-3:])
return f'{json_dir}{date}_{member}_{name}_message{str(message_number).zfill(2)}.json'
Generate kerchunk reference for all ensemble members across all time steps
json_dir = 'individual/'
fs_local.rm(json_dir, recursive = True)
fs_local.mkdirs(json_dir, exist_ok=True)
try:
fs_local.rm(json_dir, recursive = True)
except:
pass
fs_local.mkdir(json_dir)
Build up list of grib2 files to process
%%time
flist = []
for i, ensemble_member in enumerate(members):
if ensemble_member == '00':
files = fs_s3.glob(f"s3://noaa-gefs-pds/{date}/{cycle}/atmos/pgrb2ap5/gec{ensemble_member}*")
else:
files = fs_s3.glob(f"s3://noaa-gefs-pds/{date}/{cycle}/atmos/pgrb2ap5/gep{ensemble_member}*")
files = [f for f in files if f.split('.')[-1] != 'idx']
files = sorted(['s3://'+f for f in files])
#print(i, len(files))
flist.extend(files[:2])
CPU times: user 389 ms, sys: 192 µs, total: 389 ms Wall time: 385 ms
Try scan_grib on one grib file
flist[0]
's3://noaa-gefs-pds/gefs.20230410/00/atmos/pgrb2ap5/gec00.t00z.pgrb2a.0p50.f000'
%%time
out = scan_grib(flist[0], storage_options= s3_so)
CPU times: user 2.45 s, sys: 54.7 ms, total: 2.51 s Wall time: 4.27 s
n_messages = len(out)
n_messages
71
type(out[0])
dict
out[0]
make_json_name(flist[0],0)
'individual/20230410_gec00_pgrb2a0p50f000_message00.json'
flist[0:10]
Define function to generate single JSONs
def gen_json(file_url):
out = scan_grib(file_url, storage_options= s3_so)
for i, message in enumerate(out): # scan_grib outputs a list containing one reference file per grib message
out_file_name = make_json_name(file_url, i) #get name
with fs_local.open(out_file_name, "w") as f:
f.write(ujson.dumps(message)) #write to file
Serial approach to generating jsons
%%time
for file in flist:
gen_json(file)
Use dask to create jsons in parallel
import dask
We haven't created a Dask client, so we are not using dask distributed, but we are still using Dask!
%%time
_ = dask.compute(*[dask.delayed(gen_json)(f) for f in flist])
CPU times: user 2min 52s, sys: 5.43 s, total: 2min 58s Wall time: 1min 1s
Use dask.distributed to create Jsons in parallel
from dask.distributed import LocalCluster, Client, performance_report
client = Client()
client
2023-04-11 14:38:17,784 - distributed.diskutils - INFO - Found stale lock file and directory '/home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-6y0rqvgu', purging 2023-04-11 14:38:17,787 - distributed.diskutils - INFO - Found stale lock file and directory '/home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-icpyyqwn', purging 2023-04-11 14:38:17,790 - distributed.diskutils - INFO - Found stale lock file and directory '/home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-y41hywju', purging 2023-04-11 14:38:17,793 - distributed.diskutils - INFO - Found stale lock file and directory '/home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-ctzjmg9r', purging
Client-64891e02-d8a0-11ed-b163-000101000601
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: /proxy/8787/status |
c1d0ef76
Dashboard: /proxy/8787/status | Workers: 10 |
Total threads: 80 | Total memory: 187.58 GiB |
Status: running | Using processes: True |
Scheduler-77f0eb9d-78bf-43eb-805b-408236a3c561
Comm: tcp://127.0.0.1:39599 | Workers: 10 |
Dashboard: /proxy/8787/status | Total threads: 80 |
Started: Just now | Total memory: 187.58 GiB |
Comm: tcp://127.0.0.1:45165 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:40347 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-e850usvr |
Comm: tcp://127.0.0.1:38879 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:33971 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-wfxwt03m |
Comm: tcp://127.0.0.1:45615 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:36155 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-ggxdo8a_ |
Comm: tcp://127.0.0.1:44549 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:43331 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-hxa9ta13 |
Comm: tcp://127.0.0.1:40535 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:33227 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-5rsg1yxv |
Comm: tcp://127.0.0.1:41179 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:42599 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-agaqdhhp |
Comm: tcp://127.0.0.1:45305 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:39083 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-vc7i9n09 |
Comm: tcp://127.0.0.1:43495 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:39467 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-oyuzn0g3 |
Comm: tcp://127.0.0.1:46811 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:37413 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-2tzolors |
Comm: tcp://127.0.0.1:46133 | Total threads: 8 |
Dashboard: /proxy/8787/status | Memory: 18.76 GiB |
Nanny: tcp://127.0.0.1:38839 | |
Local directory: /home/rsignell/EarthMap/Projects/notebooks/gefs/dask-worker-space/worker-tm7gh3lr |
%%time
_ = dask.compute(*[dask.delayed(gen_json)(f) for f in flist])
CPU times: user 2.5 s, sys: 396 ms, total: 2.9 s Wall time: 20 s
%%time
with performance_report(filename="dask-report.html"):
_ = dask.compute(*[dask.delayed(gen_json)(f) for f in flist])
client
cluster = LocalCluster(n_workers=10, threads_per_worker=8)
client = Client(cluster)
Now we combine the messages along the time dimension
json_dir = 'combined/'
try:
fs_local.rm(json_dir, recursive = True)
except:
pass
fs_local.mkdirs(json_dir, exist_ok=True)
def combine_time(json_urls, i ):
mzz = MultiZarrToZarr(json_urls,concat_dims = ['step'])
name = f"combined/{'_'.join(json_urls[0].split('/')[-1].split('_')[:-2])}_message{str(i).zfill(2)}_combined.json"
with fs_local.open(name, 'w') as f:
f.write(ujson.dumps(mzz.translate()))
Serial approach
%%time
for ensemble_member in members:
for i in range(n_messages): #there are 71 GRIB messages in a GEFS file
jsons = fs_local.glob(f"individual/*gep{ensemble_member}*_message{str(i).zfill(2)}.json") + fs_local.glob(f"individual/*gec{ensemble_member}*_message{str(i).zfill(2)}.json")
combine_time(jsons, i)
KeyboardInterrupt
Dask approach
%%time
def do_task(i,ensemble_member):
members = fs_local.glob(f"individual/*gep{ensemble_member}*_message{str(i).zfill(2)}.json")
controls = fs_local.glob(f"individual/*gec{ensemble_member}*_message{str(i).zfill(2)}.json")
combine_time(members+controls, i)
for ensemble_member in members:
_ = dask.compute(*[dask.delayed(do_task)(i,ensemble_member) for i in range(n_messages)])
CPU times: user 11.2 s, sys: 1.58 s, total: 12.7 s Wall time: 1min 35s
Next we can combine these across the ensemble members
%%time
json_dir = 'ensemble/'
try:
fs_local.rm(json_dir, recursive = True)
except:
pass
fs_local.mkdirs(json_dir)
CPU times: user 2.11 ms, sys: 4.42 ms, total: 6.53 ms Wall time: 16 ms
def combine_ensemble(json_urls,i):
mzz = MultiZarrToZarr(json_urls,
remote_protocol='s3',
remote_options={'anon':True},
concat_dims = ['number'],
identical_dims = ['valid_time', 'longitude', 'latitude', 'step', 'time'])
name = f"ensemble/{json_urls[0].split('/')[-1].split('_')[0]}_message{str(i).zfill(2)}_combined.json"
with fs_local.open(name, 'w') as f:
f.write(ujson.dumps(mzz.translate()))
%%time
for i in range(n_messages):
jsons = fs_local.glob(f"combined/*_message{str(i).zfill(2)}_combined.json")
combine_ensemble(jsons, i)
CPU times: user 7.95 s, sys: 3.53 s, total: 11.5 s Wall time: 10.6 s
Now we need to determine which messages can be combined into a single dataset
def return_ds(d):
fs_ = fsspec.filesystem("reference", fo=d, remote_protocol='s3', remote_options={'anon':True})
m = fs_.get_mapper("")
return xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False))
%%time
jsons = fs_local.glob(f"ensemble/*")
CPU times: user 0 ns, sys: 3.24 ms, total: 3.24 ms Wall time: 3.35 ms
Here we open each reference message and determine what type of vertical level it contains. We will use this later to combine the messages alongs these levels
%%time
typeoflevel = {}
for i,ref in enumerate(jsons):
try:
ds = return_ds(ref)
dim = [dim for dim in list(ds.dims) if dim not in ['valid_time', 'x', 'y', 'step', 'time','latitude', 'longitude', 'number']] #I manually figure out which dims are common
typeoflevel[i] = dim[0]
except:
print(i, 'not included')
pass
/home/rsignell/miniconda3/envs/pangeo/lib/python3.9/site-packages/xarray/backends/plugins.py:71: RuntimeWarning: Engine 'rasterio' loading failed: libLerc.so.4: cannot open shared object file: No such file or directory warnings.warn(f"Engine {name!r} loading failed:\n{ex}", RuntimeWarning)
57 not included 59 not included 62 not included 67 not included 68 not included 69 not included 70 not included CPU times: user 1.73 s, sys: 295 ms, total: 2.03 s Wall time: 4.25 s
%%time
groups = {}
for key, value in sorted(typeoflevel.items()):
groups.setdefault(value, []).append(key)
CPU times: user 25 µs, sys: 4 µs, total: 29 µs Wall time: 32.9 µs
groups = {'isobaricInhPa': [0,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31,
32,
33,
34,
35,
36,
37,
38,
39,
40,
41,
42,
43,
44,
45,
46,
47,
48,
49,
50,
51,
52,
53,
54,
55],
'surface': [56, 60, 61],
'depthBelowLandLayer': [58],
'heightAboveGround': [63, 64, 65, 66]}
We can now use this groups dictionary to combine the compatible messages
json_dir = 'groups/'
try:
fs_local.rm(json_dir, recursive = True)
except:
pass
fs_local.mkdirs(json_dir)
def combine_groups(jsons, level):
mzz = MultiZarrToZarr(jsons,concat_dims = level) #merge messages across level
ref = mzz.translate()
date = jsons[0].split('/')[-1].split('_')[0]
with fs_local.open(f"groups/GEFS_{date}_{level}.json", "w") as f:
f.write(ujson.dumps(ref)) #write to file
%%time
for level in groups:
jsons = []
for message in groups[level]:
jsons.append(fs_local.glob(f"ensemble/*_message{str(message).zfill(2)}_combined.json")[0])
combine_groups(jsons, level)
CPU times: user 340 ms, sys: 107 ms, total: 447 ms Wall time: 452 ms
/home/rsignell/miniconda3/envs/pangeo/lib/python3.9/site-packages/kerchunk/combine.py:262: UserWarning: Concatenated coordinate 'surface' contains less than expectednumber of values across the datasets: [0] warnings.warn( /home/rsignell/miniconda3/envs/pangeo/lib/python3.9/site-packages/kerchunk/combine.py:262: UserWarning: Concatenated coordinate 'depthBelowLandLayer' contains less than expectednumber of values across the datasets: [0] warnings.warn(
This leaves us with 4 reference files which we can now use to open the GEFS data as an xarray-datatree
jsons = fs_local.glob(f"groups/*.json")
jsons
['/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/GEFS_20230410_depthBelowLandLayer.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/GEFS_20230410_heightAboveGround.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/GEFS_20230410_isobaricInhPa.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/GEFS_20230410_surface.json']
datasets = {}
for level in groups:
datasets[level] = return_ds(fs_local.glob(f"groups/*{level}*")[0])
dt = DataTree.from_dict(datasets)
dt.groups
('/', '/isobaricInhPa', '/surface', '/depthBelowLandLayer', '/heightAboveGround')
dt['isobaricInhPa']
<xarray.DatasetView> Dimensions: (isobaricInhPa: 12, number: 31, step: 2, latitude: 361, longitude: 720, time: 1, valid_time: 1) Coordinates: * isobaricInhPa (isobaricInhPa) int64 10 50 100 200 250 ... 700 850 925 1000 * latitude (latitude) float64 90.0 89.5 89.0 88.5 ... -89.0 -89.5 -90.0 * longitude (longitude) float64 0.0 0.5 1.0 1.5 ... 358.5 359.0 359.5 * number (number) int64 0 1 2 3 4 5 6 7 8 ... 23 24 25 26 27 28 29 30 * step (step) timedelta64[ns] NaT 03:00:00 * time (time) datetime64[ns] 2023-04-10 * valid_time (valid_time) datetime64[ns] 2023-04-10 Data variables: gh (isobaricInhPa, number, step, latitude, longitude) float64 ... r (isobaricInhPa, number, step, latitude, longitude) float64 ... t (isobaricInhPa, number, step, latitude, longitude) float64 ... u (isobaricInhPa, number, step, latitude, longitude) float64 ... v (isobaricInhPa, number, step, latitude, longitude) float64 ... w (isobaricInhPa, number, step, latitude, longitude) float64 ... Attributes: centre: kwbc centreDescription: US National Weather Service - NCEP edition: 2 subCentre: 2
dt['isobaricInhPa']['t']
<xarray.DataArray 't' (isobaricInhPa: 12, number: 31, step: 2, latitude: 361, longitude: 720)> [193380480 values with dtype=float64] Coordinates: * isobaricInhPa (isobaricInhPa) int64 10 50 100 200 250 ... 700 850 925 1000 * latitude (latitude) float64 90.0 89.5 89.0 88.5 ... -89.0 -89.5 -90.0 * longitude (longitude) float64 0.0 0.5 1.0 1.5 ... 358.5 359.0 359.5 * number (number) int64 0 1 2 3 4 5 6 7 8 ... 23 24 25 26 27 28 29 30 * step (step) timedelta64[ns] NaT 03:00:00 Attributes: (12/19) NV: 0 cfName: air_temperature cfVarName: t dataDate: 20230410 dataTime: 0 dataType: cf ... ... shortName: t stepType: instant stepUnits: 1 totalNumber: 30 typeOfLevel: isobaricInhPa units: K
import hvplot.xarray
dt['isobaricInhPa'].longitude
ds = dt['isobaricInhPa']
da = ds['t']
da = da.assign_coords(longitude=(((ds.longitude + 180) % 360) - 180)).sortby('longitude')
da.sel(step='03:00:00').hvplot(x='longitude', y='latitude', rasterize=True, alpha=1.0, cmap='turbo',
global_extent=True, geo=True, tiles='OSM')