This notebook demonstrates how to work with the ECMWF ERA5 reanalysis available as part of the AWS Public Dataset Program (https://registry.opendata.aws/ecmwf-era5/).
This notebook utilizes Amazon SageMaker & AWS Fargate for providing an environment with a Jupyter notebook and Dask cluster. There is an example AWS CloudFormation template available at https://github.com/awslabs/amazon-asdi/tree/main/examples/dask for quickly creating this environment in your own AWS account to run this notebook.
%matplotlib inline
import boto3
import botocore
import datetime
import matplotlib.pyplot as plt
import matplotlib
import xarray as xr
import numpy as np
import s3fs
import fsspec
import dask
from dask.distributed import performance_report
from dask.distributed import Client
font = {'family' : 'sans-serif',
'weight' : 'normal',
'size' : 18}
matplotlib.rc('font', **font)
ecs = boto3.client('ecs')
resp = ecs.list_clusters()
clusters = resp['clusterArns']
if len(clusters) > 1:
print("Please manually select your cluster")
cluster = clusters[0]
cluster
You will need to update the --cluster
option in the comamnd below to make your cluster name from above
numWorkers=70
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])
client = Client('Dask-Scheduler.local-dask:8786')
client
We want to chunk in a similar way for maximum performance
url = 's3://era5-pds/2010/01/data/air_temperature_at_2_metres.nc'
ncfile = fsspec.open(url)
ds = xr.open_dataset(ncfile.open())
ds.air_temperature_at_2_metres.encoding
start_year = 2010
end_year = 2020
years = list(np.arange(start_year, end_year+1, 1))
months = ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]
file_pattern = 's3://era5-pds/{year}/{month}/data/air_temperature_at_2_metres.nc'
@dask.delayed
def s3open(path):
fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False)
return fs.open(path)
files_mapper = [s3open(file_pattern.format(year=year,month=month)) for year in years for month in months]
%%time
ds = xr.open_mfdataset(files_mapper, engine='h5netcdf', chunks={'lon':200,'lat':200,'time0':720}, concat_dim='time0', combine='nested', coords='minimal', compat='override', parallel=True)
print('ds size in GB {:0.2f}\n'.format(ds.nbytes / 1e9))
ds.info
The ds.info
output above shows us that there are four dimensions to the data: lat, lon, and time0; and two data variables: air_temperature_at_2_metres, and air_pressure_at_mean_sea_level.
ds['air_temperature_at_2_metres'] = (ds.air_temperature_at_2_metres - 273.15) * 9.0 / 5.0 + 32.0
ds.air_temperature_at_2_metres.attrs['units'] = 'F'
# calculates the mean along the time dimension
temp_mean = ds['air_temperature_at_2_metres'].mean(dim='time0')
The expressions above didn’t actually compute anything. They just build the dask task graph. To do the computations, we call the compute
method:
%%time
with performance_report(filename="dask-report.html"):
temp_mean = temp_mean.compute()
temp_mean.plot(figsize=(20, 10))
plt.title('2010-2020 Mean 2-m Air Temperature')
temp_std = ds['air_temperature_at_2_metres'].std(dim='time0')
%time temp_std = temp_std.compute()
temp_std.plot(figsize=(20, 10))
plt.title('2010-2020 Standard Deviation 2-m Air Temperature')
# location coordinates
locs = [
{'name': 'Santa Barbara', 'lon': -119.70, 'lat': 34.42},
{'name': 'Colorado Springs', 'lon': -104.82, 'lat': 38.83},
{'name': 'Honolulu', 'lon': -157.84, 'lat': 21.29},
{'name': 'Seattle', 'lon': -122.33, 'lat': 47.61},
]
# convert westward longitudes to degrees east
for l in locs:
if l['lon'] < 0:
l['lon'] = 360 + l['lon']
locs
ds_locs = xr.Dataset()
air_temp_ds = ds
# interate through the locations and create a dataset
# containing the temperature values for each location
for l in locs:
name = l['name']
lon = l['lon']
lat = l['lat']
var_name = name
ds2 = air_temp_ds.sel(lon=lon, lat=lat, method='nearest')
lon_attr = '%s_lon' % name
lat_attr = '%s_lat' % name
ds2.attrs[lon_attr] = ds2.lon.values.tolist()
ds2.attrs[lat_attr] = ds2.lat.values.tolist()
ds2 = ds2.rename({'air_temperature_at_2_metres' : var_name}).drop(('lat', 'lon'))
ds_locs = xr.merge([ds_locs, ds2])
ds_locs.data_vars
df_f = ds_locs.to_dataframe()
df_f.describe()
ax = df_f.plot(figsize=(20, 10), title="ERA5", grid=1)
ax.set(xlabel='Date', ylabel='2-m Air Temperature (deg F)')
plt.show()
When we are temporarily done with the cluster we can scale it down to save on costs
numWorkers=0
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])