Processing GOES-16 data with Dask & AWS Fargate

This notebook demonstrates how to work with the GOES-16 rainfall rate product available as part of the AWS Public Dataset Program (https://registry.opendata.aws/noaa-goes/).

This notebook utilizes Amazon SageMaker & AWS Fargate for providing an environment with a Jupyter notebook and Dask cluster. There is an example AWS CloudFormation template available at https://github.com/awslabs/amazon-asdi/tree/main/examples/dask for quickly creating this environment in your own AWS account to run this notebook.

Python Imports

In [1]:
%matplotlib inline
import boto3
import botocore
import datetime
import matplotlib.pyplot as plt
import matplotlib
import xarray as xr
import numpy as np
import s3fs
import fsspec
import dask
from dask.distributed import performance_report
from dask.distributed import Client

font = {'family' : 'sans-serif',
        'weight' : 'normal',
        'size'   : 18}
matplotlib.rc('font', **font)

Scale out Dask Workers

In [2]:
ecs = boto3.client('ecs')
resp = ecs.list_clusters()
clusters = resp['clusterArns']
if len(clusters) > 1:
    print("Please manually select your cluster")
cluster = clusters[0]
cluster
Out[2]:
'arn:aws:ecs:us-east-1:816257832715:cluster/dask2-Fargate-Dask-Cluster'
In [3]:
numWorkers=70
ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers)
ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker'])

Set up the Dask Client to talk to our Fargate Dask Distributed Cluster

In [4]:
client = Client('Dask-Scheduler.local-dask:8786')
client
Out[4]:

Client

Cluster

  • Workers: 70
  • Cores: 70
  • Memory: 490.00 GB

Open an Example File and Check the Native Chunking

We want to chunk in a similar way for maximum performance

In [5]:
url = 's3://noaa-goes16/ABI-L2-RRQPEF/2020/181/19/OR_ABI-L2-RRQPEF-M6_G16_s20201811900207_e20201811909515_c20201811910042.nc'
ncfile = fsspec.open(url)
ds = xr.open_dataset(ncfile.open())

ds.RRQPE.encoding
Out[5]:
{'chunksizes': (226, 226),
 'fletcher32': False,
 'shuffle': False,
 'zlib': True,
 'complevel': 1,
 'source': '<File-like object S3FileSystem, noaa-goes16/ABI-L2-RRQPEF/2020/181/19/OR_ABI-L2-RRQPEF-M6_G16_s20201811900207_e20201811909515_c20201811910042.nc>',
 'original_shape': (5424, 5424),
 'dtype': dtype('int16'),
 '_Unsigned': 'true',
 '_FillValue': array([65535], dtype=uint16),
 'scale_factor': array([0.00152602], dtype=float32),
 'add_offset': array([0.], dtype=float32),
 'coordinates': 'latitude retrieval_local_zenith_angle quantitative_local_zenith_angle solar_zenith_angle t y x'}

Open all the data from 2020 as a single dataset

In [6]:
%%time

s3 = boto3.client("s3")
paginator = s3.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(Bucket = 'noaa-goes16', Prefix = 'ABI-L2-RRQPEF/2020/')

@dask.delayed
def s3open_data(path):
    fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False)
    f = fs.open(path)
    ds = xr.open_dataset(f) 
    return ds['RRQPE'].values


files_mapper = [s3open_data("s3://noaa-goes16/" + file['Key']) for page in page_iterator for file in page['Contents']]
CPU times: user 7.42 s, sys: 214 ms, total: 7.64 s
Wall time: 12.4 s
In [7]:
# how many files did we load?

len(files_mapper)
Out[7]:
52507
In [8]:
%%time
shape = ds.RRQPE.shape
dtype = ds.RRQPE.dtype
data_mapper = [dask.array.from_delayed(f, shape, dtype=dtype) for f in files_mapper]
all_data = dask.array.stack(data_mapper)
all_data
CPU times: user 5.62 s, sys: 77.6 ms, total: 5.69 s
Wall time: 5.74 s
Out[8]:
Array Chunk
Bytes 6.18 TB 117.68 MB
Shape (52507, 5424, 5424) (1, 5424, 5424)
Count 157521 Tasks 52507 Chunks
Type float32 numpy.ndarray