1. Data Ingestion is the first stage of the pipeline. Here we will read the ROOT file from HDFS into a Spark dataframe using Spark-ROOT reader and then we will create the Low Level Features (LLF) and High Level Features datasets.
To run this notebook we used the following configuration:
# pip install pyspark or use your favorite way to set Spark Home, here we use findspark
import findspark
findspark.init('/home/luca/Spark/spark-2.4.3-bin-hadoop2.7') #set path to SPARK_HOME
# Configure according to your environment
pyspark_python = "<path to python>/bin/python"
spark_root_jar="https://github.com/diana-hep/spark-root/blob/master/jars/spark-root_2.11-0.1.17.jar?raw=true"
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("1-Data Ingestion") \
.master("yarn") \
.config("spark.driver.memory","8g") \
.config("spark.executor.memory","14g") \
.config("spark.executor.cores","8") \
.config("spark.executor.instances","50") \
.config("spark.dynamicAllocation.enabled","false") \
.config("spark.jars",spark_root_jar) \
.config("spark.jars.packages","org.diana-hep:root4j:0.1.6") \
.config("spark.pyspark.python",pyspark_python) \
.config("spark.eventLog.enabled","false") \
.getOrCreate()
# Check if Spark Session has been created correctly
spark
SparkSession - in-memory
# Add a file containing functions that we will use later
spark.sparkContext.addPyFile("utilFunctions.py")
PATH = "hdfs://analytix/Training/Spark/TopologyClassifier/lepFilter_rawData/"
samples = ["qcd_lepFilter_13TeV, "ttbar_lepFilter_13TeV", "Wlnu_lepFilter_13TeV"]
requiredColumns = [
"EFlowTrack",
"EFlowNeutralHadron",
"EFlowPhoton",
"Electron",
"MuonTight",
"MuonTight_size",
"Electron_size",
"MissingET",
"Jet"
]
from pyspark.sql.functions import lit
dfList = []
for label,sample in enumerate(samples):
print("Loading {} sample...".format(sample))
tmpDF = spark.read \
.format("org.dianahep.sparkroot.experimental") \
.load(PATH + sample + "/*.root") \
.select(requiredColumns) \
.withColumn("label", lit(label))
dfList.append(tmpDF)
Loading Wlnu_lepFilter_13TeV sample... Loading qcd_lepFilter_13TeV sample... Loading ttbar_lepFilter_13TeV sample...
# Merge all samples into a single dataframe
df = dfList[0]
for tmpDF in dfList[1:]:
df = df.union(tmpDF)
Let's have a look at how many events there are for each class. Keep in mind that the labels are mapped as follow
# Print the number of events per sample and the total (label=null)
df.rollup("label").count().orderBy("label", ascending=False).show()
+-----+--------+ |label| count| +-----+--------+ | 2|26335315| | 1|13780026| | 0|14354796| | null|54470137| +-----+--------+
Let's print the schema of one of the required columns. This shows that the
schema is complex and nested (the full schema is even more complex)
df.select("EFlowTrack").printSchema()
root |-- EFlowTrack: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- fUniqueID: integer (nullable = true) | | |-- fBits: integer (nullable = true) | | |-- PID: integer (nullable = true) | | |-- Charge: integer (nullable = true) | | |-- PT: float (nullable = true) | | |-- Eta: float (nullable = true) | | |-- Phi: float (nullable = true) | | |-- EtaOuter: float (nullable = true) | | |-- PhiOuter: float (nullable = true) | | |-- X: float (nullable = true) | | |-- Y: float (nullable = true) | | |-- Z: float (nullable = true) | | |-- T: float (nullable = true) | | |-- XOuter: float (nullable = true) | | |-- YOuter: float (nullable = true) | | |-- ZOuter: float (nullable = true) | | |-- TOuter: float (nullable = true) | | |-- Dxy: float (nullable = true) | | |-- SDxy: float (nullable = true) | | |-- Xd: float (nullable = true) | | |-- Yd: float (nullable = true) | | |-- Zd: float (nullable = true) | | |-- EFlowTrack_Particle: struct (nullable = true) | | | |-- TObject: struct (nullable = true) | | | | |-- fUniqueID: integer (nullable = true) | | | | |-- fBits: integer (nullable = true)
Now we will create the LLF and HLF datasets. This is done by the function convert
below which takes as input an event (i.e. the list of particles present in that event) and do the following steps:
selection
)# Import Lorentz Vector and other functions for pTmaps
from utilFunctions import *
def selection(event, TrkPtMap, NeuPtMap, PhotonPtMap, PTcut=23., ISOcut=0.45):
"""
This function simulates the trigger selection.
Foreach event the presence of one isolated muon or electron with pT >23 GeV is required
"""
if event.Electron_size == 0 and event.MuonTight_size == 0:
return False, False, False
foundMuon = None
foundEle = None
l = LorentzVector()
for ele in event.Electron:
if ele.PT <= PTcut: continue
l.SetPtEtaPhiM(ele.PT, ele.Eta, ele.Phi, 0.)
pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
if foundEle == None and (pfisoCh+pfisoNeu+pfisoGamma)<ISOcut:
foundEle = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
0., 0., 0., 1., 0., float(ele.Charge)]
for muon in event.MuonTight:
if muon.PT <= PTcut: continue
l.SetPtEtaPhiM(muon.PT, muon.Eta, muon.Phi, 0.)
pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
if foundMuon == None and (pfisoCh+pfisoNeu+pfisoGamma)<ISOcut:
foundMuon = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
0., 0., 0., 0., 1., float(muon.Charge)]
if foundEle != None and foundMuon != None:
if foundEle[5] > foundMuon[5]:
return True, foundEle, foundMuon
else:
return True, foundMuon, foundEle
if foundEle != None: return True, foundEle, foundMuon
if foundMuon != None: return True, foundMuon, foundEle
return False, None, None
import math
import numpy as np
from pyspark.ml.linalg import Vectors
def convert(event):
"""
This function takes as input an event, applies trigger selection
and create LLF and HLF datasets
"""
q = LorentzVector()
particles = []
TrkPtMap = ChPtMapp(0.3, event)
NeuPtMap = NeuPtMapp(0.3, event)
PhotonPtMap = PhotonPtMapp(0.3, event)
if TrkPtMap.shape[0] == 0: return Row()
if NeuPtMap.shape[0] == 0: return Row()
if PhotonPtMap.shape[0] == 0: return Row()
#
# Get leptons
#
selected, lep, otherlep = selection(event, TrkPtMap, NeuPtMap, PhotonPtMap)
if not selected: return Row()
particles.append(lep)
lepMomentum = LorentzVector(lep[1], lep[2], lep[3], lep[0])
#
# Select Tracks
#
nTrk = 0
for h in event.EFlowTrack:
if nTrk>=450: continue
if h.PT<=0.5: continue
q.SetPtEtaPhiM(h.PT, h.Eta, h.Phi, 0.)
if lepMomentum.DeltaR(q) > 0.0001:
pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
h.PT, h.Eta, h.Phi, h.X, h.Y, h.Z,
pfisoCh, pfisoGamma, pfisoNeu,
1., 0., 0., 0., 0., float(np.sign(h.PID))])
nTrk += 1
#
# Select Photons
#
nPhoton = 0
for h in event.EFlowPhoton:
if nPhoton >= 150: continue
if h.ET <= 1.: continue
q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.)
pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
h.ET, h.Eta, h.Phi, 0., 0., 0.,
pfisoCh, pfisoGamma, pfisoNeu,
0., 0., 1., 0., 0., 0.])
nPhoton += 1
#
# Select Neutrals
#
nNeu = 0
for h in event.EFlowNeutralHadron:
if nNeu >= 200: continue
if h.ET <= 1.: continue
q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.)
pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
h.ET, h.Eta, h.Phi, 0., 0., 0.,
pfisoCh, pfisoGamma, pfisoNeu,
0., 1., 0., 0., 0., 0.])
nNeu += 1
for iTrk in range(nTrk, 450):
particles.append([0., 0., 0., 0., 0., 0., 0., 0.,0.,
0.,0., 0., 0., 0., 0., 0., 0., 0., 0.])
for iPhoton in range(nPhoton, 150):
particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
for iNeu in range(nNeu, 200):
particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
#
# High Level Features
#
myMET = event.MissingET[0]
MET = myMET.MET
phiMET = myMET.Phi
MT = 2.*MET*lepMomentum.Pt()*(1-math.cos(lepMomentum.Phi()-phiMET))
HT = 0.
nJets = 0.
nBjets = 0.
for jet in event.Jet:
if jet.PT > 30 and abs(jet.Eta)<2.6:
nJets += 1
HT += jet.PT
if jet.BTag>0:
nBjets += 1
LepPt = lep[4]
LepEta = lep[5]
LepPhi = lep[6]
LepIsoCh = lep[10]
LepIsoGamma = lep[11]
LepIsoNeu = lep[12]
LepCharge = lep[18]
LepIsEle = lep[16]
hlf = Vectors.dense([HT, MET, phiMET, MT, nJets, nBjets, LepPt, LepEta, LepPhi,
LepIsoCh, LepIsoGamma, LepIsoNeu, LepCharge, LepIsEle])
#
# return the Row of low level features and high level features
#
return Row(lfeatures=particles, hfeatures=hlf, label=event.label)
Finally apply the function to all the events
features = df.rdd \
.map(convert) \
.filter(lambda row: len(row) > 0) \
.toDF()
features.printSchema()
root |-- hfeatures: vector (nullable = true) |-- label: long (nullable = true) |-- lfeatures: array (nullable = true) | |-- element: array (containsNull = true) | | |-- element: double (containsNull = true)
dataset_path = "hdfs://analytix/Training/Spark/TopologyClassifier/dataIngestion_full_13TeV"
num_partitions = 3000 # used in DataFrame coalesce operation to limit number of output files
%time features.coalesce(num_partitions).write.partitionBy("label").parquet(dataset_path)
CPU times: user 1.07 s, sys: 700 ms, total: 1.77 s Wall time: 3h 13min 36s
print("Number of events written to Parquet:", spark.read.parquet(dataset_path).count())
Number of events written to Parquet: 25468470
spark.stop()