This notebook demonstrates a "Cloud-native" analysis of Normalized Difference Vegetation Index (NDVI) using Landsat 8 data.
What is unique about this workflow is that no data is downloaded to our local computer! All calculations are performed in memory across many distributed machines on AWS.
This workflow is possible because the Landsat 8 data is stored in Cloud-Optimized Geotiff format, which can be accessed remotely via xarray and rasterio Python libraries. Distributed computing is enabled through a Pangeo JupyterHub deployment with Dask Kubernetes.
About Landsat 8: https://landsat.usgs.gov/landsat-8
About the Landsat archive: https://cloud.google.com/storage/docs/public-datasets/landsat
Date: August 30, 2018
Created by: Scott Henderson (scottyh@uw.edu), Daniel Rothenberg
Updated to use hvplot/holoviews by Ryan Abernathy and Rich Signell (rsignell@usgs.gov)
# Import required libraries
import os
import pandas as pd
import rasterio
import xarray as xr
import requests
from dask_kubernetes import KubeCluster
from dask.distributed import Client
from dask.distributed import wait, progress
import hvplot.xarray
import hvplot.pandas
import cartopy.crs as ccrs
import holoviews as hv
# Print package versions
print('xarray version: ', xr.__version__)
print('rasterio version: ', rasterio.__version__)
print('hvplot version: ', hvplot.__version__)
xarray version: 0.10.8 rasterio version: 1.0.3 hvplot version: 0.2.1
# Set environment variables for cloud-optimized-geotiffs efficiency
os.environ['GDAL_DISABLE_READDIR_ON_OPEN']='YES'
os.environ['CPL_VSIL_CURL_ALLOWED_EXTENSIONS']='TIF'
NASA CMR is a new unified way to search for remote sensing assests across many archive centers. If you prefer a graphical user interface, NASA Earthdata Search is built on top of CMR. CMR returns download links through the USGS (https://earthexplorer.usgs.gov), but the same archive is mirrored as a (Google Public Dataset)[https://cloud.google.com/storage/docs/public-datasets/landsat], so we'll make a function that queries CMR and returns URLs to the imagery stored on Google Cloud.
def query_cmr_landsat(collection='Landsat_8_OLI_TIRS_C1',tier='T1', path=47, row=27):
"""Query NASA CMR for Collection1, Tier1 Landsat scenes from a specific path and row."""
data = [f'short_name={collection}',
f'page_size=2000',
f'attribute[]=string,CollectionCategory,{tier}',
f'attribute[]=int,WRSPath,{path}',
f'attribute[]=int,WRSRow,{row}',
]
query = 'https://cmr.earthdata.nasa.gov/search/granules.json?' + '&'.join(data)
r = requests.get(query, timeout=100)
print(r.url)
df = pd.DataFrame(r.json()['feed']['entry'])
# Save results to a file
#print('Saved results to cmr-result.json')
#with open('cmr-result.json', 'w') as j:
# j.write(r.text)
return df
def make_google_archive(pids, bands):
"""Turn list of product_ids into pandas dataframe for NDVI analysis."""
path = pids[0].split('_')[2][1:3]
row = pids[0].split('_')[2][-2:]
baseurl = f'https://storage.googleapis.com/gcp-public-data-landsat/LC08/01/0{path}/0{row}'
# baseurl = f's3://landsat-pds/c1/L8/0{path}/0{row}' # not sure there is an AWS equivalent here
dates = [pd.to_datetime(x.split('_')[3]) for x in pids]
df = pd.DataFrame(dict(product_id=pids, date=dates))
for band in bands:
df[band] = [f'{baseurl}/{x}/{x}_{band}.TIF' for x in pids]
return df
df = query_cmr_landsat()
https://cmr.earthdata.nasa.gov/search/granules.json?short_name=Landsat_8_OLI_TIRS_C1&page_size=2000&attribute[]=string,CollectionCategory,T1&attribute[]=int,WRSPath,47&attribute[]=int,WRSRow,27
pids = df.title.tolist()
df = make_google_archive(pids, ['B4', 'B5'])
df.head()
product_id | date | B4 | B5 | |
---|---|---|---|---|
0 | LC08_L1TP_047027_20130421_20170310_01_T1 | 2013-04-21 | https://storage.googleapis.com/gcp-public-data... | https://storage.googleapis.com/gcp-public-data... |
1 | LC08_L1TP_047027_20130523_20170310_01_T1 | 2013-05-23 | https://storage.googleapis.com/gcp-public-data... | https://storage.googleapis.com/gcp-public-data... |
2 | LC08_L1TP_047027_20130608_20170310_01_T1 | 2013-06-08 | https://storage.googleapis.com/gcp-public-data... | https://storage.googleapis.com/gcp-public-data... |
3 | LC08_L1TP_047027_20130624_20170309_01_T1 | 2013-06-24 | https://storage.googleapis.com/gcp-public-data... | https://storage.googleapis.com/gcp-public-data... |
4 | LC08_L1TP_047027_20130710_20180201_01_T1 | 2013-07-10 | https://storage.googleapis.com/gcp-public-data... | https://storage.googleapis.com/gcp-public-data... |
image_url = df.iloc[0]['B4']
image_url
'https://storage.googleapis.com/gcp-public-data-landsat/LC08/01/047/027/LC08_L1TP_047027_20130421_20170310_01_T1/LC08_L1TP_047027_20130421_20170310_01_T1_B4.TIF'
This will allow us to distribute our analysis across many machines. In the default configuration for Pangeo Binder, each worker has 2 vCPUs and 7Gb of RAM. It may take several minutes to initialize these workers and make them available to Dask.
# Click on the 'Dashboard link' to monitor calculation progress
cluster = KubeCluster(n_workers=10)
cluster
VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n <style scoped>\n .…
# Attach Dask to the cluster
client = Client(cluster)
The rasterio library allows us to read Geotiffs on the web without downloading the entire image. Xarray has a built-in load_rasterio() function that allows us to open the file as a DataArray. Xarray also uses Dask for lazy reading, so we want to make sure the native block tiling of the image matches the dask "chunk size". These dask chunks are automatically distributed among all our workers when a computation is requested, so ideally they will fit in the worker memory. A chunk size of 2048x2048 with a float32 datatype implies a 16Mb array.
# Load with rasterio
image_url = df.iloc[0]['B4']
with rasterio.open(image_url) as src:
print(src.profile)
{'driver': 'GTiff', 'dtype': 'uint16', 'nodata': None, 'width': 7751, 'height': 7531, 'count': 1, 'crs': CRS({'init': 'epsg:32610'}), 'transform': Affine(30.0, 0.0, 356685.0, 0.0, -30.0, 5367615.0), 'blockxsize': 256, 'blockysize': 256, 'tiled': True, 'compress': 'lzw', 'interleave': 'band'}
# Note that the blocksize of the image is 256 by 256, so we want xarray to use some multiple of that
xchunk = 2048
ychunk = 2048
da = xr.open_rasterio(image_url, chunks={'band': 1, 'x': xchunk, 'y': ychunk})
da
<xarray.DataArray (band: 1, y: 7531, x: 7751)> dask.array<shape=(1, 7531, 7751), dtype=uint16, chunksize=(1, 2048, 2048)> Coordinates: * band (band) int64 1 * y (y) float64 5.368e+06 5.368e+06 5.368e+06 5.368e+06 5.367e+06 ... * x (x) float64 3.567e+05 3.567e+05 3.568e+05 3.568e+05 3.568e+05 ... Attributes: transform: (30.0, 0.0, 356685.0, 0.0, -30.0, 5367615.0, 0.0, 0.0, 1.0) crs: +init=epsg:32610 res: (30.0, 30.0) is_tiled: 1 nodatavals: (nan,)
# If we request to compute something or plot these arrays, the necessary data chunks will be accessed on cloud storage:
# Watch the KubeCluster dashboard to see the worker activity when this command is run:
# Note that no data is stored on the disk here, it's all in memory
band1 = da.sel(band=1).persist()
progress(band1)
VBox()
%%time
display(band1.hvplot(rasterize=True, width=600, height=400, cmap='viridis'))
CPU times: user 3.38 s, sys: 728 ms, total: 4.1 s Wall time: 3.17 s
# skip this because it takes minutes to run and doesn't use the dask workers.
# here we use the CRS info (epsg:32610 = UTM zone 10n) to plot in lon/lat
# rather than in projected coordinates.
if 0:
crs = ccrs.UTM(zone='10n')
display(band1.hvplot(crs=crs, rasterize=True, width=600, height=400, cmap='viridis'))
Often we want to analyze a time series of satellite imagery, but we are constrained by computational resources. So we either download all the images, extract a small subset and then do our analysis. Or, we coarsen the resolution of all our images so that the entire set fits into our computer RAM. Because this notebook is running on Google Cloud with access to many resources in our Kube Cluster, we no longer have to worry about the computational constraints, and can conduct our analysis at full resoution!
First we need to construct an xarray dataset object (which has data variables 'band4' and 'band5' in a n-dimensional array with x-coordinates representing UTM easting, y-coordinates representing UTM northing, and a time coordinate representing the image acquisition date).
There are different ways to go about this, but we will load our images with a timestamp index since each image is taken on a different date. Typically, this is a chore if our images are not on the same grid to begin with, but xarray knows how to automatically align images based on their georeferenced coordinates.
# Note that these landsat images are not necessarily the same shape or on the same grid:
for image_url in df.B4[:5]:
with rasterio.open(image_url) as src:
print(src.shape, src.bounds)
(7531, 7751) BoundingBox(left=356685.0, bottom=5141685.0, right=589215.0, top=5367615.0) (7541, 7751) BoundingBox(left=353385.0, bottom=5141685.0, right=585915.0, top=5367915.0) (7541, 7751) BoundingBox(left=354285.0, bottom=5141385.0, right=586815.0, top=5367615.0) (7531, 7751) BoundingBox(left=356385.0, bottom=5141685.0, right=588915.0, top=5367615.0) (7971, 7861) BoundingBox(left=353985.0, bottom=5135085.0, right=589815.0, top=5374215.0)
def create_multiband_dataset(row, bands=['B4','B5'], chunks={'band': 1, 'x': 2048, 'y': 2048}):
'''A function to load multiple landsat bands into an xarray dataset '''
# Each image is a dataset containing both band4 and band5
datasets = []
for band in bands:
url = row[band]
da = xr.open_rasterio(url, chunks=chunks)
da = da.squeeze().drop(labels='band')
ds = da.to_dataset(name=band)
datasets.append(ds)
DS = xr.merge(datasets)
return DS
# Merge all acquisitions into a single large Dataset
datasets = []
for i,row in df.iterrows():
try:
print('loading...', row.date)
ds = create_multiband_dataset(row)
datasets.append(ds)
except Exception as e:
print('ERROR loading, skipping acquistion!')
print(e)
loading... 2013-04-21 00:00:00 loading... 2013-05-23 00:00:00 loading... 2013-06-08 00:00:00 loading... 2013-06-24 00:00:00 loading... 2013-07-10 00:00:00 loading... 2013-07-26 00:00:00 loading... 2013-08-11 00:00:00 loading... 2013-08-27 00:00:00 loading... 2013-09-12 00:00:00 loading... 2013-10-14 00:00:00 loading... 2013-10-30 00:00:00 loading... 2014-01-02 00:00:00 loading... 2014-01-18 00:00:00 loading... 2014-02-03 00:00:00 loading... 2014-02-19 00:00:00 loading... 2014-03-07 00:00:00 loading... 2014-03-23 00:00:00 loading... 2014-04-08 00:00:00 loading... 2014-04-24 00:00:00 loading... 2014-05-10 00:00:00 loading... 2014-05-26 00:00:00 loading... 2014-06-11 00:00:00 loading... 2014-06-27 00:00:00 loading... 2014-07-13 00:00:00 loading... 2014-07-29 00:00:00 loading... 2014-08-14 00:00:00 loading... 2014-09-15 00:00:00 loading... 2014-10-01 00:00:00 loading... 2014-11-18 00:00:00 loading... 2015-01-21 00:00:00 loading... 2015-02-22 00:00:00 loading... 2015-03-10 00:00:00 loading... 2015-03-26 00:00:00 loading... 2015-04-11 00:00:00 loading... 2015-04-27 00:00:00 loading... 2015-05-29 00:00:00 loading... 2015-06-14 00:00:00 loading... 2015-06-30 00:00:00 loading... 2015-07-16 00:00:00 loading... 2015-08-01 00:00:00 loading... 2015-08-17 00:00:00 loading... 2015-09-02 00:00:00 loading... 2015-09-18 00:00:00 loading... 2015-10-04 00:00:00 loading... 2015-10-20 00:00:00 loading... 2015-11-05 00:00:00 loading... 2015-11-21 00:00:00 loading... 2015-12-23 00:00:00 loading... 2016-01-08 00:00:00 loading... 2016-01-24 00:00:00 loading... 2016-02-09 00:00:00 loading... 2016-02-25 00:00:00 loading... 2016-03-12 00:00:00 loading... 2016-03-28 00:00:00 loading... 2016-04-13 00:00:00 loading... 2016-04-29 00:00:00 loading... 2016-05-31 00:00:00 loading... 2016-06-16 00:00:00 loading... 2016-07-02 00:00:00 loading... 2016-07-18 00:00:00 loading... 2016-08-03 00:00:00 loading... 2016-08-19 00:00:00 loading... 2016-09-04 00:00:00 loading... 2016-09-20 00:00:00 loading... 2016-10-22 00:00:00 loading... 2016-11-07 00:00:00 loading... 2016-11-23 00:00:00 loading... 2016-12-25 00:00:00 loading... 2017-01-10 00:00:00 loading... 2017-01-26 00:00:00 loading... 2017-02-11 00:00:00 loading... 2017-03-31 00:00:00 loading... 2017-04-16 00:00:00 loading... 2017-05-02 00:00:00 loading... 2017-05-18 00:00:00 loading... 2017-06-03 00:00:00 loading... 2017-06-19 00:00:00 loading... 2017-07-05 00:00:00 loading... 2017-07-21 00:00:00 loading... 2017-08-06 00:00:00 loading... 2017-08-22 00:00:00 loading... 2017-09-07 00:00:00 loading... 2017-09-23 00:00:00 loading... 2017-10-09 00:00:00 loading... 2017-10-25 00:00:00 loading... 2017-11-10 00:00:00 loading... 2017-12-12 00:00:00 loading... 2018-01-13 00:00:00 loading... 2018-02-14 00:00:00 loading... 2018-03-02 00:00:00 loading... 2018-03-18 00:00:00 loading... 2018-04-03 00:00:00 loading... 2018-04-19 00:00:00 loading... 2018-05-05 00:00:00 loading... 2018-05-21 00:00:00 loading... 2018-06-06 00:00:00 loading... 2018-06-22 00:00:00 loading... 2018-07-08 00:00:00 loading... 2018-07-24 00:00:00 loading... 2018-08-09 00:00:00
%%time
DS = xr.concat(datasets, dim=pd.DatetimeIndex(df.date.tolist(), name='time'))
print('Dataset size (Gb): ', DS.nbytes/1e9)
Dataset size (Gb): 103.572115136 CPU times: user 4.8 s, sys: 124 ms, total: 4.92 s Wall time: 4.78 s
DS
<xarray.Dataset> Dimensions: (time: 100, x: 8121, y: 7971) Coordinates: * x (x) float64 3.492e+05 3.492e+05 3.493e+05 3.493e+05 3.493e+05 ... * y (y) float64 5.135e+06 5.135e+06 5.135e+06 5.135e+06 5.135e+06 ... * time (time) datetime64[ns] 2013-04-21 2013-05-23 2013-06-08 ... Data variables: B4 (time, y, x) float64 dask.array<shape=(100, 7971, 8121), chunksize=(1, 1607, 2048)> B5 (time, y, x) float64 dask.array<shape=(100, 7971, 8121), chunksize=(1, 1607, 2048)>
There is definitely some room for improvement here from a computational efficiency standpoint - in particular the dask chunks are no longer aligned with the image tiles. This is because each image starts at different coordinates and has different shapes, but xarray uses a single chunk size for the entire datasets. There will also be many zeros in this dataset, so future work could take advantage of sparse arrays.
These points aside, our KubeCluster will automatically parallelize our computations for us, so we can not worry too much about optimal efficiency and just go ahead and run our analysis!
%%time
display(DS['B4'].hvplot('x', 'y', groupby='time', rasterize=True, width=600, height=400, cmap='viridis'))
CPU times: user 7.27 s, sys: 1.11 s, total: 8.38 s Wall time: 28.1 s
Set up our NDVI dataset. Note that NDVI is not actually computed until we call the Dask compute(), persist(), or call other functions such as plot() that require actually operate on the data!
NDVI = (DS['B5'] - DS['B4']) / (DS['B5'] + DS['B4'])
NDVI
<xarray.DataArray (time: 100, y: 7971, x: 8121)> dask.array<shape=(100, 7971, 8121), dtype=float64, chunksize=(1, 1607, 2048)> Coordinates: * x (x) float64 3.492e+05 3.492e+05 3.493e+05 3.493e+05 3.493e+05 ... * y (y) float64 5.135e+06 5.135e+06 5.135e+06 5.135e+06 5.135e+06 ... * time (time) datetime64[ns] 2013-04-21 2013-05-23 2013-06-08 ...
Only data for a single landsat acquisition date is pulled from Cloud storage
day='2013-04-21'
s = NDVI.sel(time=day)
s
<xarray.DataArray (y: 7971, x: 8121)> dask.array<shape=(7971, 8121), dtype=float64, chunksize=(1607, 2048)> Coordinates: * x (x) float64 3.492e+05 3.492e+05 3.493e+05 3.493e+05 3.493e+05 ... * y (y) float64 5.135e+06 5.135e+06 5.135e+06 5.135e+06 5.135e+06 ... time datetime64[ns] 2013-04-21
del s['time']
s
<xarray.DataArray (y: 7971, x: 8121)> dask.array<shape=(7971, 8121), dtype=float64, chunksize=(1607, 2048)> Coordinates: * x (x) float64 3.492e+05 3.492e+05 3.493e+05 3.493e+05 3.493e+05 ... * y (y) float64 5.135e+06 5.135e+06 5.135e+06 5.135e+06 5.135e+06 ...
%%time
display(s.hvplot('x', 'y', rasterize=True, width=600, height=400, cmap='viridis', label=day))
CPU times: user 8.82 s, sys: 2.06 s, total: 10.9 s Wall time: 28.7 s
This example calculates the mean NDVI per-pixel (30m) for 2013-2014, storing result in local RAM. This takes a while...
ndvi = NDVI.sel(time=slice('2013-01-01', '2014-01-01')).mean(dim='time').persist()
progress(ndvi)
VBox()
ndvi
<xarray.DataArray (y: 7971, x: 8121)> dask.array<shape=(7971, 8121), dtype=float64, chunksize=(1607, 2048)> Coordinates: * x (x) float64 3.492e+05 3.492e+05 3.493e+05 3.493e+05 3.493e+05 ... * y (y) float64 5.135e+06 5.135e+06 5.135e+06 5.135e+06 5.135e+06 ...
%%time
display(ndvi.hvplot('x','y', rasterize=True, width=600, height=400, cmap='viridis'))
# Point of interest we'll extract a timeseries from
#plt.plot(562370, 5312519, 'ko')
CPU times: user 56 s, sys: 7.2 s, total: 1min 3s Wall time: 1min 29s
We expect to see higher NDVI values in the summer months, corresponding to dense vegetation
# https://www.geoplaner.com
# lat, lon to northing, easting
# 47.962940 --> 5312519
# -122.164483--> 562370
#+ 5 km buffer
#EPSG:32610 WGS 84 / UTM zone 10N
xcen = 562370
ycen = 5312519
buf = 5000 # look at point +/- 5km
ds = NDVI.sel(x=slice(xcen-buf,xcen+buf), y=slice(ycen-buf,ycen+buf))
timeseries = ds.resample(time='1MS').mean().persist()
timeseries
<xarray.DataArray (time: 65)> dask.array<shape=(65,), dtype=float64, chunksize=(1,)> Coordinates: * time (time) datetime64[ns] 2013-04-01 2013-05-01 2013-06-01 ...
s = timeseries.to_series()
(s.hvplot(width=700, height=300, legend=False) * s.hvplot.scatter(width=700, height=300, legend=False)).relabel('Mean NDVI')
ds2015 = ds.sel(time=slice('2015-01-01', '2016-01-01'))
nt = len(ds2015)
Getting small multiple plots going with hvplot
is a bit challenging. Need to use Layout
from holoviews
:
%%time
dss = ds.sel(time=slice('2015-01-01', '2016-01-01'))
ncols = 5
plots = []
for i in range(nt):
ds_panel = dss.isel(time=i)
#if last plot in row, add colorbar and make it wider
is_last_in_row = (i+1)%ncols == 0
height = 200
width = 150 if not is_last_in_row else 200
p = (ds_panel
.hvplot('x', 'y', rasterize=True, width=width, height=height, cmap='viridis',
xaxis=False, yaxis=False, colorbar=is_last_in_row)
.relabel(str(ds_panel.time.values)[:10]))
plots.append(p)
display(hv.Layout(plots).cols(ncols))
CPU times: user 12.9 s, sys: 692 ms, total: 13.6 s Wall time: 1min 28s
%%time
display(NDVI.hvplot('x','y',groupby='time', rasterize=True, width=700, height=500, cmap='viridis'))
CPU times: user 8.13 s, sys: 1.47 s, total: 9.6 s Wall time: 14.3 s