Snow Detection Using Spark

Introduction

In this Jupyter notebook, we will build an SVM classifier for Snow/Ice detection using Spark for the Proba-V 100m Top Of Atmosphere (TOA) Radiometry data.

Data

Radiometry Data

The Radiometry file is contained in a GeoTIFF file format. The file contains 4 raster bands:

  1. RED
  2. NIR
  3. BLUE
  4. SWIR

Each raster band is a pixel grid $Y*X$ where each value of the grid represents the TOA Reflectance value for that band for that pixel.

Status Map

There is also a status map file containing a single band. For each pixel in that band, the value represents the class of that pixel.

In our case, the flag we are interested in is coded in binary as $100$, representing that the corresponding pixel in the Radiometry file is Snow or Ice, as documented in the Proba-V User Manual.

Reading the files

Imports

We will be using the following libraries / frameworks:

  1. numpy: for numerical processing
  2. gdal: for reading GeoTIFF files
  3. seaborn: for plotting
  4. pandas: for handling small DataFrames
  5. spark
In [1]:
import numpy as np
import requests
import gdal

import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
%matplotlib inline

import pyspark

Setting up the Spark Context

In order to be able to access our Spark cluster, we first need to set up our Spark Context. The below snippet shows how to do this and also shows how to modify the default configuration.

In [2]:
from pyspark.conf import SparkConf
conf = SparkConf()
conf.set('spark.yarn.executor.memoryOverhead', 1024)
conf.set('spark.executor.memory', '8g')
conf.set('spark.executor.cores', '2')
conf.set('spark.executor.instances',10)
sc = pyspark.SparkContext(conf=conf)
sqlContext = pyspark.SQLContext(sc)

The files are stored in a shared folder as defined below. The complete list of files can be requested using an API, but in our case we will be focusing on an area of the Alps, where there is plenty of Snow/Ice areas.

The Radiometry files end with _RADIOMETRY.tif whereas the status map ends with _SM.tif.

In [3]:
files = [
    "/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101.tif"
]

bands = [
    "RED",
    "NIR",
    "BLUE",
    "SWIR"
]

def radiometry_file(filename):
    return filename[:-4] + "_RADIOMETRY.tif"

def status_file(filename):
    return filename[:-4] + "_SM.tif"

Processing the files

In order to read the files in a parallel manner, we need to instruct Spark to parallelize our list of files.

In [4]:
data_files = sc.parallelize([(status_file(f), radiometry_file(f)) for f in files]).cache()
data_files
Out[4]:
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175

Because our files are so large, we don't want to read the complete file in a single run. Instead, we will split each file into chunks, read each chunk and then combine the chunks.

This will also enable us to distribute the reading of our files to different Spark Executors, so that each Spark Executor reads a specific part of each file.

In [5]:
def makeSplits( files, splits=100 ):
    statusmap, radiometry = files
    dataset = gdal.Open(statusmap)
    status = dataset.GetRasterBand(1)
    del dataset
    XSize = status.XSize
    YSize = status.YSize
    
    chunks = []
    chunksize = (int(XSize / float(splits)), int(YSize / float(splits)))
    for x in range(0, XSize, chunksize[0]):
        for y in range(0, YSize, chunksize[1]):
            chunks.append({
                    'statusmap': statusmap,
                    'radiometry': radiometry,
                    'x': (x, min(XSize - x, chunksize[0])),
                    'y': (y, min(YSize - y, chunksize[1]))
                })
    
    return chunks

chunks = data_files.flatMap(makeSplits).repartition(100)
chunks.take(1)
Out[5]:
[{'radiometry': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101_RADIOMETRY.tif',
  'statusmap': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101_SM.tif',
  'x': (800, 100),
  'y': (6200, 100)}]
In [6]:
chunks.count()
Out[6]:
10201

We can now define the functions to read each chunk.

In this process, what we want to accomplish is to end up with a list such that each element is a single pixel from the GeoTIFF files containing both the corresponding bitmask from the statusmap and the Reflectance for each individual band.

In [7]:
def parseTargets(statusmap, x, y):
    dataset = gdal.Open(statusmap)
    status = dataset.GetRasterBand(1)
    ret = status.ReadAsArray(x[0], y[0], x[1], y[1])
    del dataset
    return np.array(ret).flatten(order='F').tolist()
    
def parseFeatures( radiometry, x, y ):
    raster = gdal.Open(radiometry)
    raster_bands = [ raster.GetRasterBand(i).ReadAsArray(x[0], y[0], x[1], y[1]) for i in xrange(1, raster.RasterCount + 1) ]
    # 4 * Y * X
    
    del raster
    raster_bands = np.transpose(raster_bands)
    # Y * 4 * X

    raster_bands = raster_bands.reshape((len(raster_bands) * len(raster_bands[0]), len(raster_bands[0][0])))
    
    # Y * X * 4
    return raster_bands.tolist()

def parseChunk(chunk):
    return zip(
        parseTargets(chunk['statusmap'], chunk['x'], chunk['y']), 
        parseFeatures(chunk['radiometry'], chunk['x'], chunk['y'])
    )

dataset = chunks.flatMap(parseChunk)
dataset.take(5)
Out[7]:
[(251, [1099, 1154, 1225, 567]),
 (251, [1057, 1117, 1178, 576]),
 (251, [1056, 1106, 1195, 609]),
 (251, [1116, 1162, 1244, 632]),
 (251, [1154, 1207, 1276, 642])]

Some pixels contain invalid data. In such cases, one of the Reflectance value will be equal to $-1$. Since those pixels contain incomplete data, we might as well filter them out.

In [8]:
def is_valid(row):
    for v in row[1]:
        if v == -1:
            return False
    return True

dataset = dataset.filter(is_valid).repartition(100)
dataset.take(5)
Out[8]:
[(243, [1656, 1813, 1449, 1129]),
 (243, [1700, 1873, 1450, 1174]),
 (243, [1751, 1937, 1450, 1206]),
 (243, [1830, 2011, 1451, 1227]),
 (243, [1900, 2090, 1451, 1235])]

As mentioned earlier, the mask for Snow/Ice is $100$. Since we are only interested in those, we can define a function to convert the complete bitmask into a single bit equal to 1 if the pixel is Snow/Ice and 0 otherwise.

In [9]:
def is_snow(row):
    return (int(row[0] & 0b100 != 0) , row[1])

dataset = dataset.map(is_snow).cache()
dataset.take(5)
Out[9]:
[(0, [1356, 1459, 1449, 536]),
 (0, [1324, 1430, 1404, 521]),
 (0, [1290, 1381, 1354, 511]),
 (0, [1239, 1340, 1306, 499]),
 (0, [1194, 1281, 1264, 487])]

Since this is a dataset we will be using very often, we might as well cache it. The following snippet instructs Spark exactly of this.

In [10]:
dataset = dataset.cache()
In [11]:
dataset.countByKey()
Out[11]:
defaultdict(int, {0: 101558714, 1: 47685})

Visualizing the data

Now we are ready to do some visualizations. Before we go further, let's take a balanced sample from our dataset, meaning about the same number of positive (snow/ice) and negative samples.

In [12]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors


def parseSample(row):
    return Row(label=row[0], features=Vectors.dense(row[1]))

def sample(size):
    sizes = dataset.countByKey()
    sample_fractions = {
        0.0: float(size / 2) / sizes[0.0],
        1.0: float(size / 2) / sizes[1.0] 
    }
    print(sample_fractions)
    
    samples = dataset.sampleByKey( 
        withReplacement = False, 
        fractions = sample_fractions
    ).map(parseSample).cache()
    
    return samples

samples = sample(500)
#flatten our samples
samples_df = samples.map(lambda r: Row(snow = r.label, RED=float(r.features[0]),NIR=float(r.features[1]),BLUE=float(r.features[2]),SWIR=float(r.features[3]))).toDF()
df = samples_df.toPandas()
df
{0.0: 2.461630225053854e-06, 1.0: 0.005242738806752648}
Out[12]:
BLUE NIR RED SWIR snow
0 1053.0 734.0 765.0 89.0 1
1 851.0 577.0 577.0 265.0 0
2 1751.0 1836.0 1740.0 418.0 1
3 640.0 606.0 489.0 478.0 0
4 717.0 594.0 555.0 204.0 0
5 1016.0 832.0 834.0 232.0 1
6 462.0 648.0 258.0 425.0 0
7 957.0 972.0 960.0 279.0 1
8 342.0 323.0 246.0 198.0 0
9 1324.0 1868.0 1638.0 824.0 0
10 822.0 700.0 651.0 62.0 1
11 499.0 491.0 379.0 387.0 0
12 1412.0 1484.0 1350.0 245.0 1
13 1359.0 1613.0 1507.0 260.0 1
14 1268.0 1569.0 1525.0 412.0 1
15 1436.0 1610.0 1436.0 783.0 0
16 1093.0 1012.0 937.0 88.0 1
17 1157.0 1534.0 1490.0 304.0 1
18 1038.0 1087.0 961.0 560.0 0
19 963.0 757.0 744.0 157.0 1
20 1054.0 1167.0 988.0 476.0 0
21 821.0 825.0 739.0 148.0 1
22 1097.0 1045.0 1012.0 378.0 1
23 1364.0 1679.0 1555.0 305.0 1
24 1106.0 938.0 911.0 524.0 0
25 1130.0 1033.0 980.0 332.0 0
26 1249.0 1222.0 1149.0 184.0 1
27 1303.0 1211.0 1168.0 326.0 0
28 1444.0 1554.0 1428.0 390.0 0
29 835.0 873.0 701.0 763.0 0
... ... ... ... ... ...
457 1063.0 1348.0 1090.0 937.0 0
458 405.0 649.0 232.0 557.0 0
459 2048.0 2130.0 2101.0 322.0 1
460 1379.0 1557.0 1450.0 161.0 1
461 258.0 80.0 93.0 50.0 0
462 987.0 720.0 749.0 171.0 1
463 1232.0 1245.0 1140.0 411.0 0
464 1091.0 1040.0 979.0 452.0 0
465 972.0 722.0 720.0 75.0 1
466 1122.0 1046.0 970.0 427.0 0
467 401.0 398.0 201.0 328.0 0
468 388.0 541.0 185.0 337.0 0
469 1089.0 1027.0 949.0 557.0 0
470 1063.0 912.0 900.0 145.0 1
471 1427.0 1709.0 1592.0 271.0 1
472 1323.0 1516.0 1305.0 1053.0 0
473 2074.0 1473.0 1411.0 351.0 1
474 748.0 732.0 579.0 365.0 0
475 1352.0 1257.0 1198.0 211.0 1
476 759.0 663.0 631.0 349.0 0
477 1090.0 1044.0 960.0 626.0 0
478 733.0 984.0 726.0 612.0 0
479 740.0 854.0 734.0 326.0 1
480 1489.0 1691.0 1466.0 760.0 0
481 823.0 622.0 544.0 628.0 0
482 1499.0 1818.0 1698.0 335.0 1
483 413.0 338.0 252.0 209.0 0
484 1192.0 1277.0 1215.0 155.0 1
485 861.0 638.0 650.0 211.0 1
486 987.0 959.0 909.0 373.0 0

487 rows × 5 columns

We can now visualize our sample.

The following box plots shows us that there are indeed notable differences in the distributions for snow and not snow, confirming that building a classifier should be possible.

In [13]:
for band in bands:
    sns.boxplot(x='snow', y=band, order=[0, 1], data=df)
    plt.show()

Another way to look at the data is using scatter plots, to see if there is any correlations between the different bands but also to see if there is any interaction between the bands for the snow class.

The following pair plots show that:

  1. There is a high correlation between RED, NIR and BLUE.
  2. There is clearly a cutoff point when RED > 500 and SWIR < 500

This confirms the following:

  1. The classes are linearly separable but
  2. Separation requires interaction features
In [14]:
sns.pairplot(df, hue='snow', vars=bands, hue_order=[0, 1])
Out[14]:
<seaborn.axisgrid.PairGrid at 0x7fb529f8f410>

Building the classifier

First, we need to do some preprocessing before we can build our classifier.

  1. SVM works better when the data is rescaled
  2. We need to introduce interaction variables, e.g. by building a polynomial expansion
  3. SVM generally requires the dataset to be balanced (or use class weights). Since we have so many available positive samples, we will simply balance our dataset by undersampling our negative class.

So let's do just that, as follows:

In [15]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

def transform(data):
    polyExpansion = PolynomialExpansion(
        inputCol="features",
        outputCol="polyFeatures",
        degree=2
    )

    scaler = StandardScaler(
        withMean=True,
        withStd=True,
        inputCol="polyFeatures",
        outputCol="scaledFeatures"
    )

    pipeline = Pipeline(stages=[polyExpansion, scaler])

    X = data.toDF()
    transformer = pipeline.fit(X)
    X = transformer.transform(X)
    return (transformer, X.select('label',col('scaledFeatures').alias('features')))

transformer, dataset_df = transform(sample(10000))
dataset_df = dataset_df.cache()
dataset_df.take(1)
{0.0: 4.923260450107708e-05, 1.0: 0.10485477613505295}
Out[15]:
[Row(label=0, features=DenseVector([-0.5164, -0.6487, -0.3164, -0.5642, -0.4701, -0.7729, -0.7521, -0.6646, -0.7856, 0.5405, -0.0306, 0.1045, -0.1247, 0.238]))]
In [16]:
dataset_df.show()
+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|[-0.5164386994949...|
|    1|[-0.2580713275324...|
|    1|[1.30585152576273...|
|    1|[1.11379082448086...|
|    1|[0.29295997019287...|
|    0|[-0.3701067366135...|
|    1|[-0.5530216902153...|
|    1|[1.09092645528064...|
|    1|[-0.7405095176571...|
|    1|[1.86145569732814...|
|    1|[-0.7016400900168...|
|    1|[-0.6650570992964...|
|    1|[0.51931722527508...|
|    0|[-1.7282502671067...|
|    1|[1.04291127996017...|
|    1|[1.27841428272247...|
|    1|[-0.6833485946566...|
|    1|[-0.5598810009754...|
|    1|[0.72052367423704...|
|    1|[-0.3243779982131...|
+-----+--------------------+
only showing top 20 rows

We will train our classifier using 75% of the dataset and test it on the remaining 25%. So let's split it first:

In [17]:
train_data, test_data = dataset_df.randomSplit([0.75, 0.25])
train_data = train_data.cache()
test_data = test_data.cache()

Finally, we can train and test our model.

In [18]:
from sklearn.metrics import precision_recall_fscore_support
from pyspark.ml.classification import LinearSVC

def train(training_data, iterations, regParam, step):
    svm = LinearSVC(maxIter=iterations, regParam=regParam)
    model = svm.fit(training_data)
    return model

def evaluate(model, train_data, test_data):    
    train_predictions = model.transform(train_data).select('label','prediction').toPandas()
    test_predictions = model.transform(test_data).select('label','prediction').toPandas()
    
    _, _, train_f, _ = precision_recall_fscore_support(train_predictions['label'], train_predictions['prediction'], average='binary')
    _, _, test_f, _ = precision_recall_fscore_support(test_predictions['label'], test_predictions['prediction'], average='binary')
    
    return (train_f, test_f)

def train_evaluate(train_data, test_data, iterations, regParam, step):    
    print "Training with", train_data.count(), "samples"
    print "Params: ", iterations, regParam, step
    model = train(train_data, iterations, regParam, step)
    train_f, test_f = evaluate(model, train_data, test_data)
    
    print "Train F1", train_f
    print "Test F1", test_f
    print ""
    return (model, (train_data.count(), iterations, regParam, step, train_f, test_f))

model, results = train_evaluate(train_data,
                         test_data,
                         iterations=40, 
                         step=1.0, 
                         regParam=0.)
Training with 7580 samples
Params:  40 0.0 1.0
Train F1 0.923511224627
Test F1 0.936677631579

Visualising the output

We can draw the GeoTIFF using matplotlib. But the files are so big that we also need to reduce its resolution. We can re-use the chunking mechanism used previously and plot the average values for every chunk instead.

However, we will need to keep track of the position of the chunk, so we add that to our function.

In [19]:
def makeSplits( files, splits=100 ):
    statusmap, radiometry = files
    sm = gdal.Open(statusmap)
    status = sm.GetRasterBand(1)
    del sm
    XSize = status.XSize
    YSize = status.YSize
    
    chunks = []
    chunksize = (int(XSize / float(splits)), int(YSize / float(splits)))
    for x in range(0, splits):
        for y in range(0, splits):
            chunks.append({
                    'statusmap': statusmap,
                    'radiometry': radiometry,
                    'chunk': (x, y),
                    'x_range': (x * chunksize[0], chunksize[0]),
                    'y_range': (y * chunksize[1], chunksize[1])
                })
    
    return chunks

chunks = data_files.flatMap(makeSplits).repartition(100)
chunks.take(1)
Out[19]:
[{'chunk': (8, 70),
  'radiometry': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101_RADIOMETRY.tif',
  'statusmap': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101_SM.tif',
  'x_range': (800, 100),
  'y_range': (7000, 100)}]

Now that our chunks have a position, we need to propagate the position up to the pixel. We then take the average for every chunk and then plot the average.

In [20]:
def is_snow_mask(mask):
    return (int(mask & 0b100 != 0))

def parseChunk(chunk):
    statusmap = map(is_snow_mask, parseTargets(chunk['statusmap'], chunk['x_range'], chunk['y_range']))
    features = parseFeatures(chunk['radiometry'], chunk['x_range'], chunk['y_range'])
    return (chunk['chunk'], map(parseSample, zip(statusmap, features)))

all_data = chunks.map(parseChunk)

def average_snow(data):
    return np.mean(map(lambda x: x.label, data))

averaged_by_chunk = all_data.map(lambda x: (x[0], average_snow(x[1]))).cache()
averaged_by_chunk.count()
Out[20]:
10000
In [21]:
img_flat = averaged_by_chunk.collect()

Here is how our original data looked like.

In [22]:
img = np.array(img_flat)[:, 1].reshape(100, 100, order='F').astype('float')
plt.imshow(img, cmap='winter')
plt.colorbar()
Out[22]:
<matplotlib.colorbar.Colorbar at 0x7fb584494410>

Now, instead of drawing the original Snow/Ice component of our pixels, we will instead use our classifier to predict that value and then draw that one instead.

In [23]:
all_df = all_data.flatMap(lambda x: [{'x': x[0][0], 'y': x[0][1], 'features': p.features, 'label': p.label} for p in x[1]]).toDF()
/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/session.py:366: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "
In [24]:
predictions = model.transform(transformer.transform(all_df).select('label',col('scaledFeatures').alias('features'),'x','y'))

predicted_snow_mask = predictions.select("x","y","prediction").cache()
predicted_snow_mask.show()
+---+---+----------+
|  x|  y|prediction|
+---+---+----------+
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
|  8| 70|       0.0|
+---+---+----------+
only showing top 20 rows

In [25]:
averaged_by_chunk_df = predicted_snow_mask.groupBy("x", "y").agg(avg("prediction"))
averaged_by_chunk_df.show()
+---+---+---------------+
|  x|  y|avg(prediction)|
+---+---+---------------+
| 51| 18|         0.0414|
| 86|  9|         0.0024|
|  3|  0|         0.0032|
| 17| 93|            0.0|
|  1| 94|          0.181|
| 11| 97|            0.0|
| 94| 68|            0.0|
| 62| 15|         0.0033|
| 82| 10|         0.0052|
| 92| 10|            0.0|
| 27|  0|            0.0|
| 61| 63|            0.0|
| 16| 59|         0.1365|
| 21| 50|         1.0E-4|
| 81| 56|         0.4316|
| 52| 72|            0.0|
| 50| 64|            0.0|
| 30| 70|            0.0|
| 60| 73|         0.5916|
| 83| 25|            0.0|
+---+---+---------------+
only showing top 20 rows

In [26]:
averaged_by_chunk = averaged_by_chunk_df.toPandas()
In [27]:
img = np.zeros((100, 100))
for row in np.array(averaged_by_chunk):
    x,y, v = row
    img[int(x)][int(y)] = v

Final result

Here's how our classifier estimated snow. It looks roughly the same as the original one, except it looks to be slightly more permissive for the snow class.

In [28]:
plt.imshow(img, cmap='winter')
plt.colorbar()
Out[28]:
<matplotlib.colorbar.Colorbar at 0x7fb57b318b50>