Pipeline for the topology classifier with Apache Spark

2. Event Filtering and Feature Engineering In this stage we prepare the input files for the three classifier models. Starting from the output of the previous stage (data ingestion) and producing the test and training datasets in Apache Parquet format.

To run this notebook we used the following configuration:

  • Software stack: Spark 3.0.0-SNAPSHOT
  • Platform: CentOS 7, Python 3.6
  • Spark cluster: Analytix
In [1]:
# pip install pyspark or use your favorite way to set Spark Home, here we use findspark
# Note I am using spark 3.0.0 (from master) to optimize DF.orderBy(rand()),
# as in Spark 2.4.3 it appears that this operation requires passing data back to the driver
# and eventually requiring incresing of spark.driver.maxResultSize, to be investigated
import findspark
findspark.init('/home/luca/Spark/spark-3.0.0-SNAPSHOT-bin-20190617_hadoop32') #set path to SPARK_HOME
In [2]:
# Configure according to your environment
pyspark_python = "<path to python>/bin/python"

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("2-Feature Preparation") \
        .master("yarn") \
        .config("spark.driver.memory","8g") \
        .config("spark.executor.memory","14g") \
        .config("spark.executor.cores","4") \
        .config("spark.executor.instances","50") \
        .config("spark.dynamicAllocation.enabled","false") \
        .config("spark.pyspark.python",pyspark_python) \
        .config("spark.eventLog.enabled","false") \
        .config("spark.speculation", "true") \
        .getOrCreate()
In [3]:
# Check if Spark Session has been created correctly
spark
Out[3]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.0.0-SNAPSHOT
Master
yarn
AppName
2-Feature Preparation

This loads info a Spark dataframe the parquet files produced in the previous data ingestion step.

In [4]:
dataset_path = "hdfs://analytix/Training/Spark/TopologyClassifier/dataIngestion_full_13TeV"

data = spark.read \
            .format("parquet") \
            .load(dataset_path)

events = data.count()
print("There are {} events".format(events))
There are 25468470 events

We can also have a look at the distribution between classes after the filtering

In [5]:
labels = ['QCD', 'tt', 'W+jets']
counts = data.groupBy('label').count().collect()

qcd_events = 0
tt_events = 0 
wjets_events = 0

print('There are:')
for i in range(3):
    print('\t* {} {} events (frac = {:.3f})'
          .format(
              counts[i][1],
              labels[counts[i].label],
              counts[i][1]*1.0/events
          ))
    if counts[i].label==0:
        qcd_events = counts[i][1]
    elif counts[i].label==1:
        tt_events = counts[i][1] 
    elif counts[i].label==2:
        wjets_events = counts[i][1]
There are:
	* 14265397 tt events (frac = 0.560)
	* 9776447 W+jets events (frac = 0.384)
	* 1426626 QCD events (frac = 0.056)

The dataset is imbalanced, we may need to undersample it.

Feature preparation

In the parquet produced in the previous step we have three columns:

  1. hfeatures containing the 14 High Level Features
  2. lfeature containing the Low Level Features (list of 801 particles each of them with 19 features)
  3. label identifying the sample
In [6]:
data.printSchema()
root
 |-- hfeatures: vector (nullable = true)
 |-- lfeatures: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)
 |-- label: integer (nullable = true)

We can begin by preparing the input for the HLF classifier which simply requires to scale features and encode the label. To use Spark MinMaxScaler we need to convert the input into dense vectors.

In [7]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

vector_dense_udf = udf(lambda r : Vectors.dense(r),VectorUDT())
data = data.withColumn('hfeatures_dense',vector_dense_udf('hfeatures'))

We can now build the pipeline to scale HLF and encode labels

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler

## One-Hot-Encode
# Use OneHotEncoderEstimator for Spark 2.x and OneHotEncoder for Spark 3.x
encoder = OneHotEncoder(inputCols=["label"],
                        outputCols=["encoded_label"],
                        dropLast=False)

## Scale feature vector
scaler = MinMaxScaler(inputCol="hfeatures_dense",
                      outputCol="HLF_input")

pipeline = Pipeline(stages=[encoder, scaler])

%time fitted_pipeline = pipeline.fit(data)
CPU times: user 383 ms, sys: 316 ms, total: 698 ms
Wall time: 1min 10s
In [9]:
# Apply the pipeline to data
data = fitted_pipeline.transform(data)

New columns has been created, if we want to drop some of them we can use

data = data.drop("col-name")
In [10]:
data.printSchema()
root
 |-- hfeatures: vector (nullable = true)
 |-- lfeatures: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)
 |-- label: integer (nullable = true)
 |-- hfeatures_dense: vector (nullable = true)
 |-- encoded_label: vector (nullable = true)
 |-- HLF_input: vector (nullable = true)

Moving on the particle-sequence classifier we need to sort the particles in each event by decreasing $\Delta R$ distance from the isolated lepton, where $$\Delta R = \sqrt{\Delta \eta^2 + \Delta \phi^2}$$

From the production of the low level features we know that the isolated lepton is the first particle of the list and the 19 features are

features = [
    'Energy', 'Px', 'Py', 'Pz', 'Pt', 'Eta', 'Phi',
    'vtxX', 'vtxY', 'vtxZ', 'ChPFIso', 'GammaPFIso', 'NeuPFIso', 
    'isChHad', 'isNeuHad', 'isGamma', 'isEle', 'isMu', 'Charge'
]

therefore we need features 5 ($\eta$) and 6 ($\phi$) to compute $\Delta R$.

In [11]:
import math

class lepAngularCoordinates():
    """
    This class is used to store the lepton and compute DeltaR 
    from the other particles
    """
    def __init__(self, eta, phi):
        self.Eta = eta
        self.Phi = phi
    
    def DeltaR(self, eta, phi):
        deta = self.Eta - eta
        
        dphi = self.Phi - phi       
        pi = math.pi
        while dphi >  pi: dphi -= 2*pi
        while dphi < -pi: dphi += 2*pi
            
        return math.sqrt(deta*deta + dphi*dphi)
In [12]:
from pyspark.sql.types import ArrayType, DoubleType
from sklearn.preprocessing import StandardScaler

@udf(returnType=ArrayType(ArrayType(DoubleType())))
def transform(particles):
    ## The isolated lepton is the first partiche in the list
    ISOlep = lepAngularCoordinates(particles[0][5], particles[0][6])
    
    ## Sort the particles based on the distance from the isolated lepton
    particles.sort(key = lambda part: ISOlep.DeltaR(part[5], part[6]),
                   reverse=True)
    
    ## Standardize
    particles = StandardScaler().fit_transform(particles).tolist()
    
    return particles
In [13]:
data = data.withColumn('GRU_input', transform('lfeatures'))
In [14]:
data.printSchema()
root
 |-- hfeatures: vector (nullable = true)
 |-- lfeatures: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)
 |-- label: integer (nullable = true)
 |-- hfeatures_dense: vector (nullable = true)
 |-- encoded_label: vector (nullable = true)
 |-- HLF_input: vector (nullable = true)
 |-- GRU_input: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)

Undersample the dataset

In [15]:
qcd = data.filter('label=0')
tt = data.filter('label=1')
wjets = data.filter('label=2')
In [16]:
# Create the undersampled dataframes
# False means to sample without repetition
tt = tt.sample(False, qcd_events*1.0/tt_events) 
wjets = wjets.sample(False, qcd_events*1.0/wjets_events)

dataUndersampled = qcd.union(tt).union(wjets)
In [17]:
dataUndersampled.groupBy('label').count().show()
+-----+-------+
|label|  count|
+-----+-------+
|    1|1427475|
|    2|1425736|
|    0|1426626|
+-----+-------+

Shuffle the dataset

Because of how the dataset has been created it is made by "three blocks" obtained with the union of three samples. Therefore we need to shuffle the dataset. We splid this dataset into train/test and shuffle the train dataset.

In [18]:
from pyspark.sql.functions import rand 
trainUndersampled, testUndersampled = dataUndersampled.randomSplit([0.8, 0.2], seed=42)
trainUndersampled = trainUndersampled.orderBy(rand(seed=42))

Notice that the whole pipeline will be trigger by the action of saving to the parquet files.

Save the datasets as Apache Parquet files

In [19]:
PATH = "hdfs://analytix/Training/Spark/TopologyClassifier/"

numTestPartitions = 800

%time testUndersampled.coalesce(numTestPartitions).write.parquet(PATH + 'testUndersampled.parquet')
CPU times: user 305 ms, sys: 171 ms, total: 476 ms
Wall time: 46min 55s
In [20]:
numTrainPartitions = 800

%time trainUndersampled.coalesce(numTrainPartitions).write.parquet(PATH + 'trainUndersampled.parquet')
CPU times: user 4.07 s, sys: 3.33 s, total: 7.4 s
Wall time: 1h 18min 42s
In [21]:
spark.stop()
In [ ]: