This notebook demonstrates how to work with the GOES-16 rainfall rate product available as part of the AWS Public Dataset Program (https://registry.opendata.aws/noaa-goes/).
%matplotlib inline
import boto3
import botocore
import datetime
import matplotlib.pyplot as plt
import matplotlib
import xarray as xr
import numpy as np
import s3fs
import fsspec
import dask
from dask.distributed import performance_report
from dask.distributed import Client
font = {'family' : 'sans-serif',
'weight' : 'normal',
'size' : 18}
matplotlib.rc('font', **font)
ecs = boto3.client('ecs')
resp = ecs.list_clusters()
clusters = resp['clusterArns']
if len(clusters) > 1:
print("Please manually select your cluster")
cluster = clusters[0]
cluster
'arn:aws:ecs:us-east-1:816257832715:cluster/era5-Fargate-Dask-Cluster'
numWorkers=70
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])
client = Client('Dask-Scheduler.local-dask:8786')
client
Client
|
Cluster
|
We want to chunk in a similar way for maximum performance
url = 's3://noaa-goes16/ABI-L2-RRQPEF/2020/181/19/OR_ABI-L2-RRQPEF-M6_G16_s20201811900207_e20201811909515_c20201811910042.nc'
ncfile = fsspec.open(url)
ds = xr.open_dataset(ncfile.open())
ds.RRQPE.encoding
{'chunksizes': (226, 226), 'fletcher32': False, 'shuffle': False, 'zlib': True, 'complevel': 1, 'source': <File-like object S3FileSystem, noaa-goes16/ABI-L2-RRQPEF/2020/181/19/OR_ABI-L2-RRQPEF-M6_G16_s20201811900207_e20201811909515_c20201811910042.nc>, 'original_shape': (5424, 5424), 'dtype': dtype('int16'), '_Unsigned': 'true', '_FillValue': array([65535], dtype=uint16), 'scale_factor': array([0.00152602], dtype=float32), 'add_offset': array([0.], dtype=float32), 'coordinates': 'latitude retrieval_local_zenith_angle quantitative_local_zenith_angle solar_zenith_angle t y x'}
%%time
s3 = boto3.client("s3")
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket = 'noaa-goes16', Prefix = 'ABI-L2-RRQPEF/2020/')
@dask.delayed
def s3open_data(path):
fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False)
f = fs.open(path)
ds = xr.open_dataset(f)
return ds['RRQPE'].values
files_mapper = [s3open_data("s3://noaa-goes16/" + file['Key']) for page in page_iterator for file in page['Contents']]
CPU times: user 6.23 s, sys: 105 ms, total: 6.34 s Wall time: 10.2 s
# how many files did we load?
len(files_mapper)
41532
%%time
shape = ds.RRQPE.shape
dtype = ds.RRQPE.dtype
data_mapper = [dask.array.from_delayed(f, shape, dtype=dtype) for f in files_mapper]
all_data = dask.array.stack(data_mapper)
all_data
CPU times: user 4.72 s, sys: 75.5 ms, total: 4.8 s Wall time: 4.87 s
|
%%time
with performance_report(filename="dask-report.html"):
all_data_sum = dask.array.nansum(all_data, axis=0).compute()
CPU times: user 1min 1s, sys: 987 ms, total: 1min 2s Wall time: 13min 20s
all_data_sum = all_data_sum * (1.0 / 6.0)
from matplotlib.colors import LogNorm
fig, ax = plt.subplots(figsize=(12, 12), subplot_kw={'aspect': 'equal'})
pc = ax.pcolormesh(all_data_sum[::-4, ::4])
plt.colorbar(pc)
pc.set_clim([0, 10000])
plt.title('Precipitation Accumulation 2020 YTD')
Text(0.5, 1.0, 'Precipitation Accumulation 2020 YTD')
When we are temporarily done with the cluster we can scale it down to save on costs
numWorkers=0
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])