In this Jupyter notebook, we will build an SVM classifier for Snow/Ice detection using Spark for the Proba-V 100m Top Of Atmosphere (TOA) Radiometry data.
The Radiometry file is contained in a GeoTIFF file format. The file contains 4 raster bands:
Each raster band is a pixel grid $Y*X$ where each value of the grid represents the TOA Reflectance value for that band for that pixel.
There is also a status map file containing a single band. For each pixel in that band, the value represents the class of that pixel.
In our case, the flag we are interested in is coded in binary as $100$, representing that the corresponding pixel in the Radiometry file is Snow or Ice, as documented in the Proba-V User Manual.
import numpy as np
import requests
import gdal
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
%matplotlib inline
import pyspark
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) <ipython-input-1-1436ac20f3bd> in <module> 1 import numpy as np 2 import requests ----> 3 import gdal 4 5 import matplotlib ModuleNotFoundError: No module named 'gdal'
In order to be able to access our Spark cluster, we first need to set up our Spark Context. The below snippet shows how to do this and also shows how to modify the default configuration.
from pyspark.conf import SparkConf
conf = SparkConf()
conf.set('spark.yarn.executor.memoryOverhead', 1024)
conf.set('spark.executor.memory', '8g')
conf.set('spark.executor.cores', '2')
conf.set('spark.executor.instances',10)
sc = pyspark.SparkContext(conf=conf)
sqlContext = pyspark.SQLContext(sc)
The files are stored in a shared folder as defined below. The complete list of files can be requested using an API, but in our case we will be focusing on an area of the Alps, where there is plenty of Snow/Ice areas.
The Radiometry files end with _RADIOMETRY.tif whereas the status map ends with _SM.tif.
files = [
"/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101.tif"
]
bands = [
"RED",
"NIR",
"BLUE",
"SWIR"
]
def radiometry_file(filename):
return filename[:-4] + "_RADIOMETRY.tif"
def status_file(filename):
return filename[:-4] + "_SM.tif"
In order to read the files in a parallel manner, we need to instruct Spark to parallelize our list of files.
data_files = sc.parallelize([(status_file(f), radiometry_file(f)) for f in files]).cache()
data_files
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:175
Because our files are so large, we don't want to read the complete file in a single run. Instead, we will split each file into chunks, read each chunk and then combine the chunks.
This will also enable us to distribute the reading of our files to different Spark Executors, so that each Spark Executor reads a specific part of each file.
def makeSplits( files, splits=100 ):
statusmap, radiometry = files
dataset = gdal.Open(statusmap)
status = dataset.GetRasterBand(1)
del dataset
XSize = status.XSize
YSize = status.YSize
chunks = []
chunksize = (int(XSize / float(splits)), int(YSize / float(splits)))
for x in range(0, XSize, chunksize[0]):
for y in range(0, YSize, chunksize[1]):
chunks.append({
'statusmap': statusmap,
'radiometry': radiometry,
'x': (x, min(XSize - x, chunksize[0])),
'y': (y, min(YSize - y, chunksize[1]))
})
return chunks
chunks = data_files.flatMap(makeSplits).repartition(100)
chunks.take(1)
[{'radiometry': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101_RADIOMETRY.tif', 'statusmap': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101_SM.tif', 'x': (800, 100), 'y': (6200, 100)}]
chunks.count()
10201
We can now define the functions to read each chunk.
In this process, what we want to accomplish is to end up with a list such that each element is a single pixel from the GeoTIFF files containing both the corresponding bitmask from the statusmap and the Reflectance for each individual band.
def parseTargets(statusmap, x, y):
dataset = gdal.Open(statusmap)
status = dataset.GetRasterBand(1)
ret = status.ReadAsArray(x[0], y[0], x[1], y[1])
del dataset
return np.array(ret).flatten(order='F').tolist()
def parseFeatures( radiometry, x, y ):
raster = gdal.Open(radiometry)
raster_bands = [ raster.GetRasterBand(i).ReadAsArray(x[0], y[0], x[1], y[1]) for i in range(1, raster.RasterCount + 1) ]
# 4 * Y * X
del raster
raster_bands = np.transpose(raster_bands)
# Y * 4 * X
raster_bands = raster_bands.reshape((len(raster_bands) * len(raster_bands[0]), len(raster_bands[0][0])))
# Y * X * 4
return raster_bands.tolist()
def parseChunk(chunk):
return zip(
parseTargets(chunk['statusmap'], chunk['x'], chunk['y']),
parseFeatures(chunk['radiometry'], chunk['x'], chunk['y'])
)
dataset = chunks.flatMap(parseChunk)
dataset.take(5)
[(251, [1099, 1154, 1225, 567]), (251, [1057, 1117, 1178, 576]), (251, [1056, 1106, 1195, 609]), (251, [1116, 1162, 1244, 632]), (251, [1154, 1207, 1276, 642])]
Some pixels contain invalid data. In such cases, one of the Reflectance value will be equal to $-1$. Since those pixels contain incomplete data, we might as well filter them out.
def is_valid(row):
for v in row[1]:
if v == -1:
return False
return True
dataset = dataset.filter(is_valid).repartition(100)
dataset.take(5)
[(243, [1656, 1813, 1449, 1129]), (243, [1700, 1873, 1450, 1174]), (243, [1751, 1937, 1450, 1206]), (243, [1830, 2011, 1451, 1227]), (243, [1900, 2090, 1451, 1235])]
As mentioned earlier, the mask for Snow/Ice is $100$. Since we are only interested in those, we can define a function to convert the complete bitmask into a single bit equal to 1 if the pixel is Snow/Ice and 0 otherwise.
def is_snow(row):
return (int(row[0] & 0b100 != 0) , row[1])
dataset = dataset.map(is_snow).cache()
dataset.take(5)
[(0, [1356, 1459, 1449, 536]), (0, [1324, 1430, 1404, 521]), (0, [1290, 1381, 1354, 511]), (0, [1239, 1340, 1306, 499]), (0, [1194, 1281, 1264, 487])]
Since this is a dataset we will be using very often, we might as well cache it. The following snippet instructs Spark exactly of this.
dataset = dataset.cache()
dataset.countByKey()
defaultdict(int, {0: 101558714, 1: 47685})
Now we are ready to do some visualizations. Before we go further, let's take a balanced sample from our dataset, meaning about the same number of positive (snow/ice) and negative samples.
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
def parseSample(row):
return Row(label=row[0], features=Vectors.dense(row[1]))
def sample(size):
sizes = dataset.countByKey()
sample_fractions = {
0.0: float(size / 2) / sizes[0.0],
1.0: float(size / 2) / sizes[1.0]
}
print(sample_fractions)
samples = dataset.sampleByKey(
withReplacement = False,
fractions = sample_fractions
).map(parseSample).cache()
return samples
samples = sample(500)
#flatten our samples
samples_df = samples.map(lambda r: Row(snow = r.label, RED=float(r.features[0]),NIR=float(r.features[1]),BLUE=float(r.features[2]),SWIR=float(r.features[3]))).toDF()
df = samples_df.toPandas()
df
{0.0: 2.461630225053854e-06, 1.0: 0.005242738806752648}
BLUE | NIR | RED | SWIR | snow | |
---|---|---|---|---|---|
0 | 1053.0 | 734.0 | 765.0 | 89.0 | 1 |
1 | 851.0 | 577.0 | 577.0 | 265.0 | 0 |
2 | 1751.0 | 1836.0 | 1740.0 | 418.0 | 1 |
3 | 640.0 | 606.0 | 489.0 | 478.0 | 0 |
4 | 717.0 | 594.0 | 555.0 | 204.0 | 0 |
5 | 1016.0 | 832.0 | 834.0 | 232.0 | 1 |
6 | 462.0 | 648.0 | 258.0 | 425.0 | 0 |
7 | 957.0 | 972.0 | 960.0 | 279.0 | 1 |
8 | 342.0 | 323.0 | 246.0 | 198.0 | 0 |
9 | 1324.0 | 1868.0 | 1638.0 | 824.0 | 0 |
10 | 822.0 | 700.0 | 651.0 | 62.0 | 1 |
11 | 499.0 | 491.0 | 379.0 | 387.0 | 0 |
12 | 1412.0 | 1484.0 | 1350.0 | 245.0 | 1 |
13 | 1359.0 | 1613.0 | 1507.0 | 260.0 | 1 |
14 | 1268.0 | 1569.0 | 1525.0 | 412.0 | 1 |
15 | 1436.0 | 1610.0 | 1436.0 | 783.0 | 0 |
16 | 1093.0 | 1012.0 | 937.0 | 88.0 | 1 |
17 | 1157.0 | 1534.0 | 1490.0 | 304.0 | 1 |
18 | 1038.0 | 1087.0 | 961.0 | 560.0 | 0 |
19 | 963.0 | 757.0 | 744.0 | 157.0 | 1 |
20 | 1054.0 | 1167.0 | 988.0 | 476.0 | 0 |
21 | 821.0 | 825.0 | 739.0 | 148.0 | 1 |
22 | 1097.0 | 1045.0 | 1012.0 | 378.0 | 1 |
23 | 1364.0 | 1679.0 | 1555.0 | 305.0 | 1 |
24 | 1106.0 | 938.0 | 911.0 | 524.0 | 0 |
25 | 1130.0 | 1033.0 | 980.0 | 332.0 | 0 |
26 | 1249.0 | 1222.0 | 1149.0 | 184.0 | 1 |
27 | 1303.0 | 1211.0 | 1168.0 | 326.0 | 0 |
28 | 1444.0 | 1554.0 | 1428.0 | 390.0 | 0 |
29 | 835.0 | 873.0 | 701.0 | 763.0 | 0 |
... | ... | ... | ... | ... | ... |
457 | 1063.0 | 1348.0 | 1090.0 | 937.0 | 0 |
458 | 405.0 | 649.0 | 232.0 | 557.0 | 0 |
459 | 2048.0 | 2130.0 | 2101.0 | 322.0 | 1 |
460 | 1379.0 | 1557.0 | 1450.0 | 161.0 | 1 |
461 | 258.0 | 80.0 | 93.0 | 50.0 | 0 |
462 | 987.0 | 720.0 | 749.0 | 171.0 | 1 |
463 | 1232.0 | 1245.0 | 1140.0 | 411.0 | 0 |
464 | 1091.0 | 1040.0 | 979.0 | 452.0 | 0 |
465 | 972.0 | 722.0 | 720.0 | 75.0 | 1 |
466 | 1122.0 | 1046.0 | 970.0 | 427.0 | 0 |
467 | 401.0 | 398.0 | 201.0 | 328.0 | 0 |
468 | 388.0 | 541.0 | 185.0 | 337.0 | 0 |
469 | 1089.0 | 1027.0 | 949.0 | 557.0 | 0 |
470 | 1063.0 | 912.0 | 900.0 | 145.0 | 1 |
471 | 1427.0 | 1709.0 | 1592.0 | 271.0 | 1 |
472 | 1323.0 | 1516.0 | 1305.0 | 1053.0 | 0 |
473 | 2074.0 | 1473.0 | 1411.0 | 351.0 | 1 |
474 | 748.0 | 732.0 | 579.0 | 365.0 | 0 |
475 | 1352.0 | 1257.0 | 1198.0 | 211.0 | 1 |
476 | 759.0 | 663.0 | 631.0 | 349.0 | 0 |
477 | 1090.0 | 1044.0 | 960.0 | 626.0 | 0 |
478 | 733.0 | 984.0 | 726.0 | 612.0 | 0 |
479 | 740.0 | 854.0 | 734.0 | 326.0 | 1 |
480 | 1489.0 | 1691.0 | 1466.0 | 760.0 | 0 |
481 | 823.0 | 622.0 | 544.0 | 628.0 | 0 |
482 | 1499.0 | 1818.0 | 1698.0 | 335.0 | 1 |
483 | 413.0 | 338.0 | 252.0 | 209.0 | 0 |
484 | 1192.0 | 1277.0 | 1215.0 | 155.0 | 1 |
485 | 861.0 | 638.0 | 650.0 | 211.0 | 1 |
486 | 987.0 | 959.0 | 909.0 | 373.0 | 0 |
487 rows × 5 columns
We can now visualize our sample.
The following box plots shows us that there are indeed notable differences in the distributions for snow and not snow, confirming that building a classifier should be possible.
for band in bands:
sns.boxplot(x='snow', y=band, order=[0, 1], data=df)
plt.show()
Another way to look at the data is using scatter plots, to see if there is any correlations between the different bands but also to see if there is any interaction between the bands for the snow class.
The following pair plots show that:
This confirms the following:
sns.pairplot(df, hue='snow', vars=bands, hue_order=[0, 1])
<seaborn.axisgrid.PairGrid at 0x7fb529f8f410>
First, we need to do some preprocessing before we can build our classifier.
So let's do just that, as follows:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
def transform(data):
polyExpansion = PolynomialExpansion(
inputCol="features",
outputCol="polyFeatures",
degree=2
)
scaler = StandardScaler(
withMean=True,
withStd=True,
inputCol="polyFeatures",
outputCol="scaledFeatures"
)
pipeline = Pipeline(stages=[polyExpansion, scaler])
X = data.toDF()
transformer = pipeline.fit(X)
X = transformer.transform(X)
return (transformer, X.select('label',col('scaledFeatures').alias('features')))
transformer, dataset_df = transform(sample(10000))
dataset_df = dataset_df.cache()
dataset_df.take(1)
{0.0: 4.923260450107708e-05, 1.0: 0.10485477613505295}
[Row(label=0, features=DenseVector([-0.5164, -0.6487, -0.3164, -0.5642, -0.4701, -0.7729, -0.7521, -0.6646, -0.7856, 0.5405, -0.0306, 0.1045, -0.1247, 0.238]))]
dataset_df.show()
+-----+--------------------+ |label| features| +-----+--------------------+ | 0|[-0.5164386994949...| | 1|[-0.2580713275324...| | 1|[1.30585152576273...| | 1|[1.11379082448086...| | 1|[0.29295997019287...| | 0|[-0.3701067366135...| | 1|[-0.5530216902153...| | 1|[1.09092645528064...| | 1|[-0.7405095176571...| | 1|[1.86145569732814...| | 1|[-0.7016400900168...| | 1|[-0.6650570992964...| | 1|[0.51931722527508...| | 0|[-1.7282502671067...| | 1|[1.04291127996017...| | 1|[1.27841428272247...| | 1|[-0.6833485946566...| | 1|[-0.5598810009754...| | 1|[0.72052367423704...| | 1|[-0.3243779982131...| +-----+--------------------+ only showing top 20 rows
We will train our classifier using 75% of the dataset and test it on the remaining 25%. So let's split it first:
train_data, test_data = dataset_df.randomSplit([0.75, 0.25])
train_data = train_data.cache()
test_data = test_data.cache()
Finally, we can train and test our model.
from sklearn.metrics import precision_recall_fscore_support
from pyspark.ml.classification import LinearSVC
def train(training_data, iterations, regParam, step):
svm = LinearSVC(maxIter=iterations, regParam=regParam)
model = svm.fit(training_data)
return model
def evaluate(model, train_data, test_data):
train_predictions = model.transform(train_data).select('label','prediction').toPandas()
test_predictions = model.transform(test_data).select('label','prediction').toPandas()
_, _, train_f, _ = precision_recall_fscore_support(train_predictions['label'], train_predictions['prediction'], average='binary')
_, _, test_f, _ = precision_recall_fscore_support(test_predictions['label'], test_predictions['prediction'], average='binary')
return (train_f, test_f)
def train_evaluate(train_data, test_data, iterations, regParam, step):
print("Training with", train_data.count(), "samples")
print("Params: ", iterations, regParam, step)
model = train(train_data, iterations, regParam, step)
train_f, test_f = evaluate(model, train_data, test_data)
print("Train F1", train_f)
print("Test F1", test_f)
print("")
return (model, (train_data.count(), iterations, regParam, step, train_f, test_f))
model, results = train_evaluate(train_data,
test_data,
iterations=40,
step=1.0,
regParam=0.)
Training with 7580 samples Params: 40 0.0 1.0 Train F1 0.923511224627 Test F1 0.936677631579
We can draw the GeoTIFF using matplotlib. But the files are so big that we also need to reduce its resolution. We can re-use the chunking mechanism used previously and plot the average values for every chunk instead.
However, we will need to keep track of the position of the chunk, so we add that to our function.
def makeSplits( files, splits=100 ):
statusmap, radiometry = files
sm = gdal.Open(statusmap)
status = sm.GetRasterBand(1)
del sm
XSize = status.XSize
YSize = status.YSize
chunks = []
chunksize = (int(XSize / float(splits)), int(YSize / float(splits)))
for x in range(0, splits):
for y in range(0, splits):
chunks.append({
'statusmap': statusmap,
'radiometry': radiometry,
'chunk': (x, y),
'x_range': (x * chunksize[0], chunksize[0]),
'y_range': (y * chunksize[1], chunksize[1])
})
return chunks
chunks = data_files.flatMap(makeSplits).repartition(100)
chunks.take(1)
[{'chunk': (8, 70), 'radiometry': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101_RADIOMETRY.tif', 'statusmap': '/data/MTDA/TIFFDERIVED/PROBAV_L3_S5_TOA_100M/2015/20151121/PROBAV_S5_TOA_20151121_100M_V101/PROBAV_S5_TOA_X18Y02_20151121_100M_V101_SM.tif', 'x_range': (800, 100), 'y_range': (7000, 100)}]
Now that our chunks have a position, we need to propagate the position up to the pixel. We then take the average for every chunk and then plot the average.
def is_snow_mask(mask):
return (int(mask & 0b100 != 0))
def parseChunk(chunk):
statusmap = map(is_snow_mask, parseTargets(chunk['statusmap'], chunk['x_range'], chunk['y_range']))
features = parseFeatures(chunk['radiometry'], chunk['x_range'], chunk['y_range'])
return (chunk['chunk'], map(parseSample, zip(statusmap, features)))
all_data = chunks.map(parseChunk)
def average_snow(data):
return np.mean(list(map(lambda x: x.label, data)))
averaged_by_chunk = all_data.map(lambda x: (x[0], average_snow(x[1]))).cache()
averaged_by_chunk.count()
10000
img_flat = averaged_by_chunk.collect()
Here is how our original data looked like.
img = np.array(img_flat)[:, 1].reshape(100, 100, order='F').astype('float')
plt.imshow(img, cmap='winter')
plt.colorbar()
<matplotlib.colorbar.Colorbar at 0x7fb584494410>
Now, instead of drawing the original Snow/Ice component of our pixels, we will instead use our classifier to predict that value and then draw that one instead.
all_df = all_data.flatMap(lambda x: [{'x': x[0][0], 'y': x[0][1], 'features': p.features, 'label': p.label} for p in x[1]]).toDF()
/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/session.py:366: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead warnings.warn("Using RDD of dict to inferSchema is deprecated. "
predictions = model.transform(transformer.transform(all_df).select('label',col('scaledFeatures').alias('features'),'x','y'))
predicted_snow_mask = predictions.select("x","y","prediction").cache()
predicted_snow_mask.show()
+---+---+----------+ | x| y|prediction| +---+---+----------+ | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| | 8| 70| 0.0| +---+---+----------+ only showing top 20 rows
averaged_by_chunk_df = predicted_snow_mask.groupBy("x", "y").agg(avg("prediction"))
averaged_by_chunk_df.show()
+---+---+---------------+ | x| y|avg(prediction)| +---+---+---------------+ | 51| 18| 0.0414| | 86| 9| 0.0024| | 3| 0| 0.0032| | 17| 93| 0.0| | 1| 94| 0.181| | 11| 97| 0.0| | 94| 68| 0.0| | 62| 15| 0.0033| | 82| 10| 0.0052| | 92| 10| 0.0| | 27| 0| 0.0| | 61| 63| 0.0| | 16| 59| 0.1365| | 21| 50| 1.0E-4| | 81| 56| 0.4316| | 52| 72| 0.0| | 50| 64| 0.0| | 30| 70| 0.0| | 60| 73| 0.5916| | 83| 25| 0.0| +---+---+---------------+ only showing top 20 rows
averaged_by_chunk = averaged_by_chunk_df.toPandas()
img = np.zeros((100, 100))
for row in np.array(averaged_by_chunk):
x,y, v = row
img[int(x)][int(y)] = v
Here's how our classifier estimated snow. It looks roughly the same as the original one, except it looks to be slightly more permissive for the snow class.
plt.imshow(img, cmap='winter')
plt.colorbar()
<matplotlib.colorbar.Colorbar at 0x7fb57b318b50>