Data Ingestion and Filtering - pipeline for the topology classifier with Apache Spark

1. Data Ingestion is the first stage of the pipeline. Here we will read the ROOT file from HDFS into a Spark dataframe using Spark-ROOT reader and then we will create the Low Level Features (LLF) and High Level Features datasets.

To run this notebook we used the following configuration:

  • Software stack: Spark 2.4.3
  • Platform: CentOS 7, Python 3.6
  • Spark cluster: Analytix
In [1]:
# pip install pyspark or use your favorite way to set Spark Home, here we use findspark
import findspark
findspark.init('/home/luca/Spark/spark-2.4.3-bin-hadoop2.7') #set path to SPARK_HOME
In [2]:
# Configure according to your environment
pyspark_python = "<path to python>/bin/python"
spark_root_jar="https://github.com/diana-hep/spark-root/blob/master/jars/spark-root_2.11-0.1.17.jar?raw=true"

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("1-Data Ingestion") \
        .master("yarn") \
        .config("spark.driver.memory","8g") \
        .config("spark.executor.memory","14g") \
        .config("spark.executor.cores","8") \
        .config("spark.executor.instances","50") \
        .config("spark.dynamicAllocation.enabled","false") \
        .config("spark.jars",spark_root_jar) \
        .config("spark.jars.packages","org.diana-hep:root4j:0.1.6") \
        .config("spark.pyspark.python",pyspark_python) \
        .config("spark.eventLog.enabled","false") \
        .getOrCreate()
In [3]:
# Check if Spark Session has been created correctly
spark
Out[3]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v2.4.3
Master
yarn
AppName
1-Data Ingestion
In [4]:
# Add a file containing functions that we will use later
spark.sparkContext.addPyFile("utilFunctions.py")

Read the data from HDFS


As first step we will read the samples into a Spark dataframe using Spark-Root. We will select only a subset of columns present in the original files.

In [5]:
PATH = "hdfs://analytix/Training/Spark/TopologyClassifier/lepFilter_rawData/"

samples = ["qcd_lepFilter_13TeV, "ttbar_lepFilter_13TeV", "Wlnu_lepFilter_13TeV"]

requiredColumns = [
    "EFlowTrack",
    "EFlowNeutralHadron",
    "EFlowPhoton",
    "Electron",
    "MuonTight",
    "MuonTight_size",
    "Electron_size",
    "MissingET",
    "Jet"
]
In [6]:
from pyspark.sql.functions import lit

dfList = []

for label,sample in enumerate(samples):
    print("Loading {} sample...".format(sample))
    tmpDF = spark.read \
                .format("org.dianahep.sparkroot.experimental") \
                .load(PATH + sample + "/*.root") \
                .select(requiredColumns) \
                .withColumn("label", lit(label))
    dfList.append(tmpDF)
Loading Wlnu_lepFilter_13TeV sample...
Loading qcd_lepFilter_13TeV sample...
Loading ttbar_lepFilter_13TeV sample...
In [7]:
# Merge all samples into a single dataframe
df = dfList[0]
for tmpDF in dfList[1:]:
    df = df.union(tmpDF)

Let's have a look at how many events there are for each class. Keep in mind that the labels are mapped as follow

  • $0=\text{QCD}$
  • $1=\text{t}\bar{\text{t}}$
  • $2=\text{W}+\text{jets}$
In [9]:
# Print the number of events per sample and the total (label=null)
df.rollup("label").count().orderBy("label", ascending=False).show()
+-----+--------+
|label|   count|
+-----+--------+
|    2|26335315|
|    1|13780026|
|    0|14354796|
| null|54470137|
+-----+--------+

Let's print the schema of one of the required columns. This shows that the
schema is complex and nested (the full schema is even more complex)

In [8]:
df.select("EFlowTrack").printSchema()
root
 |-- EFlowTrack: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- fUniqueID: integer (nullable = true)
 |    |    |-- fBits: integer (nullable = true)
 |    |    |-- PID: integer (nullable = true)
 |    |    |-- Charge: integer (nullable = true)
 |    |    |-- PT: float (nullable = true)
 |    |    |-- Eta: float (nullable = true)
 |    |    |-- Phi: float (nullable = true)
 |    |    |-- EtaOuter: float (nullable = true)
 |    |    |-- PhiOuter: float (nullable = true)
 |    |    |-- X: float (nullable = true)
 |    |    |-- Y: float (nullable = true)
 |    |    |-- Z: float (nullable = true)
 |    |    |-- T: float (nullable = true)
 |    |    |-- XOuter: float (nullable = true)
 |    |    |-- YOuter: float (nullable = true)
 |    |    |-- ZOuter: float (nullable = true)
 |    |    |-- TOuter: float (nullable = true)
 |    |    |-- Dxy: float (nullable = true)
 |    |    |-- SDxy: float (nullable = true)
 |    |    |-- Xd: float (nullable = true)
 |    |    |-- Yd: float (nullable = true)
 |    |    |-- Zd: float (nullable = true)
 |    |    |-- EFlowTrack_Particle: struct (nullable = true)
 |    |    |    |-- TObject: struct (nullable = true)
 |    |    |    |    |-- fUniqueID: integer (nullable = true)
 |    |    |    |    |-- fBits: integer (nullable = true)

Create derivate datasets

Now we will create the LLF and HLF datasets. This is done by the function convert below which takes as input an event (i.e. the list of particles present in that event) and do the following steps:

  1. Select the events with at least one isolated electron/muon (implemented in selection)
  2. Create the list of 801 particles and the 19 low level features for each of them
  3. Compute the high level features
In [9]:
# Import Lorentz Vector and other functions for pTmaps
from utilFunctions import *

def selection(event, TrkPtMap, NeuPtMap, PhotonPtMap, PTcut=23., ISOcut=0.45):
    """
    This function simulates the trigger selection. 
    Foreach event the presence of one isolated muon or electron with pT >23 GeV is required
    """
    if event.Electron_size == 0 and event.MuonTight_size == 0: 
        return False, False, False
    
    foundMuon = None 
    foundEle =  None 
    
    l = LorentzVector()
    
    for ele in event.Electron:
        if ele.PT <= PTcut: continue
        l.SetPtEtaPhiM(ele.PT, ele.Eta, ele.Phi, 0.)
        
        pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
        if foundEle == None and (pfisoCh+pfisoNeu+pfisoGamma)<ISOcut:
            foundEle = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
                        0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
                        0., 0., 0., 1., 0., float(ele.Charge)]
    
    for muon in event.MuonTight:
        if muon.PT <= PTcut: continue
        l.SetPtEtaPhiM(muon.PT, muon.Eta, muon.Phi, 0.)
        
        pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
        if foundMuon == None and (pfisoCh+pfisoNeu+pfisoGamma)<ISOcut:
            foundMuon = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
                         0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
                         0., 0., 0., 0., 1., float(muon.Charge)]
            
    if foundEle != None and foundMuon != None:
        if foundEle[5] > foundMuon[5]:
            return True, foundEle, foundMuon
        else:
            return True, foundMuon, foundEle
    if foundEle != None: return True, foundEle, foundMuon
    if foundMuon != None: return True, foundMuon, foundEle
    
    return False, None, None
In [10]:
import math
import numpy as np
from pyspark.ml.linalg import Vectors

def convert(event):
    """
    This function takes as input an event, applies trigger selection 
    and create LLF and HLF datasets
    """
    q = LorentzVector()
    particles = []
    TrkPtMap = ChPtMapp(0.3, event)
    NeuPtMap = NeuPtMapp(0.3, event)
    PhotonPtMap = PhotonPtMapp(0.3, event)
    if TrkPtMap.shape[0] == 0: return Row()
    if NeuPtMap.shape[0] == 0: return Row()
    if PhotonPtMap.shape[0] == 0: return Row()
    
    #
    # Get leptons
    #
    selected, lep, otherlep = selection(event, TrkPtMap, NeuPtMap, PhotonPtMap)
    if not selected: return Row()
    particles.append(lep)
    lepMomentum = LorentzVector(lep[1], lep[2], lep[3], lep[0])
    
    #
    # Select Tracks
    #
    nTrk = 0
    for h in event.EFlowTrack:
        if nTrk>=450: continue
        if h.PT<=0.5: continue
        q.SetPtEtaPhiM(h.PT, h.Eta, h.Phi, 0.)
        if lepMomentum.DeltaR(q) > 0.0001:
            pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
            pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
            pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
            particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                              h.PT, h.Eta, h.Phi, h.X, h.Y, h.Z,
                              pfisoCh, pfisoGamma, pfisoNeu,
                              1., 0., 0., 0., 0., float(np.sign(h.PID))])
            nTrk += 1
    
    #
    # Select Photons
    #
    nPhoton = 0
    for h in event.EFlowPhoton:
        if nPhoton >= 150: continue
        if h.ET <= 1.: continue
        q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.)
        pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
        particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                          h.ET, h.Eta, h.Phi, 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 0., 1., 0., 0., 0.])
        nPhoton += 1
    
    #
    # Select Neutrals
    #
    nNeu = 0
    for h in event.EFlowNeutralHadron:
        if nNeu >= 200: continue
        if h.ET <= 1.: continue
        q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.)
        pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
        particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                          h.ET, h.Eta, h.Phi, 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 1., 0., 0., 0., 0.])
        nNeu += 1
        
    for iTrk in range(nTrk, 450):
        particles.append([0., 0., 0., 0., 0., 0., 0., 0.,0.,
                          0.,0., 0., 0., 0., 0., 0., 0., 0., 0.])
    for iPhoton in range(nPhoton, 150):
        particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0.,
                          0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
    for iNeu in range(nNeu, 200):
        particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0.,
                          0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])        
    #
    # High Level Features
    #
    myMET = event.MissingET[0]
    MET = myMET.MET
    phiMET = myMET.Phi
    MT = 2.*MET*lepMomentum.Pt()*(1-math.cos(lepMomentum.Phi()-phiMET))
    HT = 0.
    nJets = 0.
    nBjets = 0.
    for jet in event.Jet:
        if jet.PT > 30 and abs(jet.Eta)<2.6:
            nJets += 1
            HT += jet.PT
            if jet.BTag>0: 
                nBjets += 1
    LepPt = lep[4]
    LepEta = lep[5]
    LepPhi = lep[6]
    LepIsoCh = lep[10]
    LepIsoGamma = lep[11]
    LepIsoNeu = lep[12]
    LepCharge = lep[18]
    LepIsEle = lep[16]
    hlf = Vectors.dense([HT, MET, phiMET, MT, nJets, nBjets, LepPt, LepEta, LepPhi,
           LepIsoCh, LepIsoGamma, LepIsoNeu, LepCharge, LepIsEle])     
    #
    # return the Row of low level features and high level features
    #
    return Row(lfeatures=particles, hfeatures=hlf, label=event.label)

Finally apply the function to all the events

In [11]:
features = df.rdd \
            .map(convert) \
            .filter(lambda row: len(row) > 0) \
            .toDF()
In [12]:
features.printSchema()
root
 |-- hfeatures: vector (nullable = true)
 |-- label: long (nullable = true)
 |-- lfeatures: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)

Save the datasets as Parquet files

In [13]:
dataset_path = "hdfs://analytix/Training/Spark/TopologyClassifier/dataIngestion_full_13TeV"
num_partitions = 3000 # used in DataFrame coalesce operation to limit number of output files

%time features.coalesce(num_partitions).write.partitionBy("label").parquet(dataset_path)
CPU times: user 1.07 s, sys: 700 ms, total: 1.77 s
Wall time: 3h 13min 36s
In [14]:
print("Number of events written to Parquet:", spark.read.parquet(dataset_path).count())
Number of events written to Parquet: 25468470
In [15]:
spark.stop()
In [ ]: