This notebook demonstrates how to work with the GOES-16 rainfall rate product available as part of the AWS Public Dataset Program (https://registry.opendata.aws/noaa-goes/).
This notebook utilizes Amazon SageMaker & AWS Fargate for providing an environment with a Jupyter notebook and Dask cluster. There is an example AWS CloudFormation template available at https://github.com/awslabs/amazon-asdi/tree/main/examples/dask for quickly creating this environment in your own AWS account to run this notebook.
%matplotlib inline
import boto3
import botocore
import datetime
import matplotlib.pyplot as plt
import matplotlib
import xarray as xr
import numpy as np
import s3fs
import fsspec
import dask
from dask.distributed import performance_report
from dask.distributed import Client
font = {'family' : 'sans-serif',
'weight' : 'normal',
'size' : 18}
matplotlib.rc('font', **font)
ecs = boto3.client('ecs')
resp = ecs.list_clusters()
clusters = resp['clusterArns']
if len(clusters) > 1:
print("Please manually select your cluster")
cluster = clusters[0]
cluster
'arn:aws:ecs:us-east-1:816257832715:cluster/dask2-Fargate-Dask-Cluster'
numWorkers=70
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])
client = Client('Dask-Scheduler.local-dask:8786')
client
Client
|
Cluster
|
We want to chunk in a similar way for maximum performance
url = 's3://noaa-goes16/ABI-L2-RRQPEF/2020/181/19/OR_ABI-L2-RRQPEF-M6_G16_s20201811900207_e20201811909515_c20201811910042.nc'
ncfile = fsspec.open(url)
ds = xr.open_dataset(ncfile.open())
ds.RRQPE.encoding
{'chunksizes': (226, 226), 'fletcher32': False, 'shuffle': False, 'zlib': True, 'complevel': 1, 'source': '<File-like object S3FileSystem, noaa-goes16/ABI-L2-RRQPEF/2020/181/19/OR_ABI-L2-RRQPEF-M6_G16_s20201811900207_e20201811909515_c20201811910042.nc>', 'original_shape': (5424, 5424), 'dtype': dtype('int16'), '_Unsigned': 'true', '_FillValue': array([65535], dtype=uint16), 'scale_factor': array([0.00152602], dtype=float32), 'add_offset': array([0.], dtype=float32), 'coordinates': 'latitude retrieval_local_zenith_angle quantitative_local_zenith_angle solar_zenith_angle t y x'}
%%time
s3 = boto3.client("s3")
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket = 'noaa-goes16', Prefix = 'ABI-L2-RRQPEF/2020/')
@dask.delayed
def s3open_data(path):
fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False)
f = fs.open(path)
ds = xr.open_dataset(f)
return ds['RRQPE'].values
files_mapper = [s3open_data("s3://noaa-goes16/" + file['Key']) for page in page_iterator for file in page['Contents']]
CPU times: user 7.42 s, sys: 214 ms, total: 7.64 s Wall time: 12.4 s
# how many files did we load?
len(files_mapper)
52507
%%time
shape = ds.RRQPE.shape
dtype = ds.RRQPE.dtype
data_mapper = [dask.array.from_delayed(f, shape, dtype=dtype) for f in files_mapper]
all_data = dask.array.stack(data_mapper)
all_data
CPU times: user 5.62 s, sys: 77.6 ms, total: 5.69 s Wall time: 5.74 s
|
%%time
with performance_report(filename="dask-report.html"):
all_data_sum = dask.array.nansum(all_data, axis=0).compute()
CPU times: user 1min 14s, sys: 1.18 s, total: 1min 15s Wall time: 17min 23s
all_data_sum = all_data_sum * (1.0 / 6.0)
from matplotlib.colors import LogNorm
fig, ax = plt.subplots(figsize=(12, 12), subplot_kw={'aspect': 'equal'})
pc = ax.pcolormesh(all_data_sum[::-4, ::4])
plt.colorbar(pc)
pc.set_clim([0, 10000])
plt.title('Precipitation Accumulation 2020')
Text(0.5, 1.0, 'Precipitation Accumulation 2020')
When we are temporarily done with the cluster we can scale it down to save on costs
numWorkers=0
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])