#!/usr/bin/env python # coding: utf-8 # ## Data Ingestion and Filtering - pipeline for the topology classifier with Apache Spark # # **1. Data Ingestion** is the first stage of the pipeline. Here we will read the ROOT file from HDFS into a Spark dataframe using [Spark-ROOT](https://github.com/diana-hep/spark-root) reader and then we will create the Low Level Features (LLF) and High Level Features datasets. # # To run this notebook we used the following configuration: # * *Software stack*: Spark 2.4.3 # * *Platform*: CentOS 7, Python 3.6 # * *Spark cluster*: Analytix # In[1]: # pip install pyspark or use your favorite way to set Spark Home, here we use findspark import findspark findspark.init('/home/luca/Spark/spark-2.4.3-bin-hadoop2.7') #set path to SPARK_HOME # In[2]: # Configure according to your environment pyspark_python = "/bin/python" spark_root_jar="https://github.com/diana-hep/spark-root/blob/master/jars/spark-root_2.11-0.1.17.jar?raw=true" from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("1-Data Ingestion") \ .master("yarn") \ .config("spark.driver.memory","8g") \ .config("spark.executor.memory","14g") \ .config("spark.executor.cores","8") \ .config("spark.executor.instances","50") \ .config("spark.dynamicAllocation.enabled","false") \ .config("spark.jars",spark_root_jar) \ .config("spark.jars.packages","org.diana-hep:root4j:0.1.6") \ .config("spark.pyspark.python",pyspark_python) \ .config("spark.eventLog.enabled","false") \ .getOrCreate() # In[3]: # Check if Spark Session has been created correctly spark # In[4]: # Add a file containing functions that we will use later spark.sparkContext.addPyFile("utilFunctions.py") # ## Read the data from HDFS #
# As first step we will read the samples into a Spark dataframe using Spark-Root. We will select only a subset of columns present in the original files. # In[5]: PATH = "hdfs://analytix/Training/Spark/TopologyClassifier/lepFilter_rawData/" samples = ["qcd_lepFilter_13TeV, "ttbar_lepFilter_13TeV", "Wlnu_lepFilter_13TeV"] requiredColumns = [ "EFlowTrack", "EFlowNeutralHadron", "EFlowPhoton", "Electron", "MuonTight", "MuonTight_size", "Electron_size", "MissingET", "Jet" ] # In[6]: from pyspark.sql.functions import lit dfList = [] for label,sample in enumerate(samples): print("Loading {} sample...".format(sample)) tmpDF = spark.read \ .format("org.dianahep.sparkroot.experimental") \ .load(PATH + sample + "/*.root") \ .select(requiredColumns) \ .withColumn("label", lit(label)) dfList.append(tmpDF) # In[7]: # Merge all samples into a single dataframe df = dfList[0] for tmpDF in dfList[1:]: df = df.union(tmpDF) # Let's have a look at how many events there are for each class. Keep in mind that the labels are mapped as follow # * $0=\text{QCD}$ # * $1=\text{t}\bar{\text{t}}$ # * $2=\text{W}+\text{jets}$ # In[9]: # Print the number of events per sample and the total (label=null) df.rollup("label").count().orderBy("label", ascending=False).show() # Let's print the schema of one of the required columns. This shows that the # **schema is complex and nested** (the full schema is even more complex) # In[8]: df.select("EFlowTrack").printSchema() # ## Create derivate datasets # # Now we will create the LLF and HLF datasets. This is done by the function `convert` below which takes as input an event (i.e. the list of particles present in that event) and do the following steps: # 1. Select the events with at least one isolated electron/muon (implemented in `selection`) # 2. Create the list of 801 particles and the 19 low level features for each of them # 3. Compute the high level features # In[9]: # Import Lorentz Vector and other functions for pTmaps from utilFunctions import * def selection(event, TrkPtMap, NeuPtMap, PhotonPtMap, PTcut=23., ISOcut=0.45): """ This function simulates the trigger selection. Foreach event the presence of one isolated muon or electron with pT >23 GeV is required """ if event.Electron_size == 0 and event.MuonTight_size == 0: return False, False, False foundMuon = None foundEle = None l = LorentzVector() for ele in event.Electron: if ele.PT <= PTcut: continue l.SetPtEtaPhiM(ele.PT, ele.Eta, ele.Phi, 0.) pfisoCh = PFIso(l, 0.3, TrkPtMap, True) pfisoNeu = PFIso(l, 0.3, NeuPtMap, False) pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False) if foundEle == None and (pfisoCh+pfisoNeu+pfisoGamma) foundMuon[5]: return True, foundEle, foundMuon else: return True, foundMuon, foundEle if foundEle != None: return True, foundEle, foundMuon if foundMuon != None: return True, foundMuon, foundEle return False, None, None # In[10]: import math import numpy as np from pyspark.ml.linalg import Vectors def convert(event): """ This function takes as input an event, applies trigger selection and create LLF and HLF datasets """ q = LorentzVector() particles = [] TrkPtMap = ChPtMapp(0.3, event) NeuPtMap = NeuPtMapp(0.3, event) PhotonPtMap = PhotonPtMapp(0.3, event) if TrkPtMap.shape[0] == 0: return Row() if NeuPtMap.shape[0] == 0: return Row() if PhotonPtMap.shape[0] == 0: return Row() # # Get leptons # selected, lep, otherlep = selection(event, TrkPtMap, NeuPtMap, PhotonPtMap) if not selected: return Row() particles.append(lep) lepMomentum = LorentzVector(lep[1], lep[2], lep[3], lep[0]) # # Select Tracks # nTrk = 0 for h in event.EFlowTrack: if nTrk>=450: continue if h.PT<=0.5: continue q.SetPtEtaPhiM(h.PT, h.Eta, h.Phi, 0.) if lepMomentum.DeltaR(q) > 0.0001: pfisoCh = PFIso(q, 0.3, TrkPtMap, True) pfisoNeu = PFIso(q, 0.3, NeuPtMap, False) pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False) particles.append([q.E(), q.Px(), q.Py(), q.Pz(), h.PT, h.Eta, h.Phi, h.X, h.Y, h.Z, pfisoCh, pfisoGamma, pfisoNeu, 1., 0., 0., 0., 0., float(np.sign(h.PID))]) nTrk += 1 # # Select Photons # nPhoton = 0 for h in event.EFlowPhoton: if nPhoton >= 150: continue if h.ET <= 1.: continue q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.) pfisoCh = PFIso(q, 0.3, TrkPtMap, True) pfisoNeu = PFIso(q, 0.3, NeuPtMap, False) pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False) particles.append([q.E(), q.Px(), q.Py(), q.Pz(), h.ET, h.Eta, h.Phi, 0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu, 0., 0., 1., 0., 0., 0.]) nPhoton += 1 # # Select Neutrals # nNeu = 0 for h in event.EFlowNeutralHadron: if nNeu >= 200: continue if h.ET <= 1.: continue q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.) pfisoCh = PFIso(q, 0.3, TrkPtMap, True) pfisoNeu = PFIso(q, 0.3, NeuPtMap, False) pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False) particles.append([q.E(), q.Px(), q.Py(), q.Pz(), h.ET, h.Eta, h.Phi, 0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu, 0., 1., 0., 0., 0., 0.]) nNeu += 1 for iTrk in range(nTrk, 450): particles.append([0., 0., 0., 0., 0., 0., 0., 0.,0., 0.,0., 0., 0., 0., 0., 0., 0., 0., 0.]) for iPhoton in range(nPhoton, 150): particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) for iNeu in range(nNeu, 200): particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]) # # High Level Features # myMET = event.MissingET[0] MET = myMET.MET phiMET = myMET.Phi MT = 2.*MET*lepMomentum.Pt()*(1-math.cos(lepMomentum.Phi()-phiMET)) HT = 0. nJets = 0. nBjets = 0. for jet in event.Jet: if jet.PT > 30 and abs(jet.Eta)<2.6: nJets += 1 HT += jet.PT if jet.BTag>0: nBjets += 1 LepPt = lep[4] LepEta = lep[5] LepPhi = lep[6] LepIsoCh = lep[10] LepIsoGamma = lep[11] LepIsoNeu = lep[12] LepCharge = lep[18] LepIsEle = lep[16] hlf = Vectors.dense([HT, MET, phiMET, MT, nJets, nBjets, LepPt, LepEta, LepPhi, LepIsoCh, LepIsoGamma, LepIsoNeu, LepCharge, LepIsEle]) # # return the Row of low level features and high level features # return Row(lfeatures=particles, hfeatures=hlf, label=event.label) # Finally apply the function to all the events # In[11]: features = df.rdd \ .map(convert) \ .filter(lambda row: len(row) > 0) \ .toDF() # In[12]: features.printSchema() # ## Save the datasets as Parquet files # In[13]: dataset_path = "hdfs://analytix/Training/Spark/TopologyClassifier/dataIngestion_full_13TeV" num_partitions = 3000 # used in DataFrame coalesce operation to limit number of output files get_ipython().run_line_magic('time', 'features.coalesce(num_partitions).write.partitionBy("label").parquet(dataset_path)') # In[14]: print("Number of events written to Parquet:", spark.read.parquet(dataset_path).count()) # In[15]: spark.stop() # In[ ]: