Process a tree distributedly with Spark, ROOT and C++ on IT clusters

Create a list with the input file names.

In [1]:
inputfile = "/afs/cern.ch/user/e/etejedor/public/0.root"
numfiles = 2
files = [inputfile for _ in xrange(numfiles)]
print "Will be processing", numfiles, "files"
Will be processing 2 files

ROOT imports.

In [2]:
import ROOT
from DistROOT import DistTree
Welcome to JupyROOT 6.07/09

Define the mapper and reducer functions in C++.

In [3]:
%%cpp -d
TH1F* fillCpp(TTreeReader& reader) {
  TTreeReaderValue<std::vector<ROOT::Math::PxPyPzEVector>> tracksRV(reader, "tracks");
  TH1F *h = new TH1F("hpt", "Pt histogram", 64, 0, 50);

  while (reader.Next()) {
    auto tracks = *tracksRV;
    for (auto&& track : tracks) {
      auto pt = track.Pt();
      h->Fill(pt);
    }
  } 

  return h;
}

TH1F* mergeCpp(TH1F *h1, const TH1F *h2) {
  h1->Add(h2);
  return h1;
}

Create a distributed tree with the list of file names, name of the tree and number of logical partitions. The environment was previously configured with CVMFS to use the hadalytic cluster.

In [4]:
dTree = DistTree(filelist = files,
                 treename = "events",
                 npartitions = 4)
16/11/07 10:12:49 INFO SparkContext: Running Spark version 1.6.0
16/11/07 10:12:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/11/07 10:12:50 INFO SecurityManager: Changing view acls to: etejedor
16/11/07 10:12:50 INFO SecurityManager: Changing modify acls to: etejedor
16/11/07 10:12:50 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(etejedor); users with modify permissions: Set(etejedor)
16/11/07 10:12:50 INFO Utils: Successfully started service 'sparkDriver' on port 36923.
16/11/07 10:12:50 INFO Slf4jLogger: Slf4jLogger started
16/11/07 10:12:50 INFO Remoting: Starting remoting
16/11/07 10:12:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:43819]
16/11/07 10:12:51 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 43819.
16/11/07 10:12:51 INFO SparkEnv: Registering MapOutputTracker
16/11/07 10:12:51 INFO SparkEnv: Registering BlockManagerMaster
16/11/07 10:12:51 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-01566d2d-a74f-4e16-881c-b88c71ad59f8
16/11/07 10:12:51 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/11/07 10:12:51 INFO SparkEnv: Registering OutputCommitCoordinator
16/11/07 10:12:51 INFO Server: jetty-8.y.z-SNAPSHOT
16/11/07 10:12:51 INFO AbstractConnector: Started [email protected]:4040
16/11/07 10:12:51 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/11/07 10:12:51 INFO SparkUI: Started SparkUI at http://188.184.91.42:4040
16/11/07 10:12:52 INFO RMProxy: Connecting to ResourceManager at p01001532067275.cern.ch/128.142.35.237:8032
16/11/07 10:12:52 INFO Client: Requesting a new application from cluster with 14 NodeManagers
16/11/07 10:12:52 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
16/11/07 10:12:52 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
16/11/07 10:12:52 INFO Client: Setting up container launch context for our AM
16/11/07 10:12:52 INFO Client: Setting up the launch environment for our AM container
16/11/07 10:12:52 INFO Client: Preparing resources for our AM container
16/11/07 10:12:53 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
16/11/07 10:12:53 INFO YarnSparkHadoopUtil: getting token for namenode: hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668
16/11/07 10:12:53 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 2629586 for etejedor on 128.142.35.237:8020
16/11/07 10:12:54 INFO Client: Uploading resource file:/cvmfs/sft.cern.ch/lcg/releases/spark/1.6.0-95ed9/x86_64-slc6-gcc49-opt/lib/spark-assembly-1.6.0-hadoop2.6.0.jar -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/spark-assembly-1.6.0-hadoop2.6.0.jar
16/11/07 10:12:55 INFO Client: Uploading resource file:/cvmfs/sft.cern.ch/lcg/releases/spark/1.6.0-95ed9/x86_64-slc6-gcc49-opt/python/lib/pyspark.zip -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/pyspark.zip
16/11/07 10:12:55 INFO Client: Uploading resource file:/cvmfs/sft.cern.ch/lcg/releases/spark/1.6.0-95ed9/x86_64-slc6-gcc49-opt/python/lib/py4j-0.9-src.zip -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/py4j-0.9-src.zip
16/11/07 10:12:56 INFO Client: Uploading resource file:/tmp/spark-fec88f51-128c-448c-9c4a-939f582df7ae/__spark_conf__4669209039456569521.zip -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/__spark_conf__4669209039456569521.zip
16/11/07 10:12:56 INFO DFSClient: Exception in createBlockOutputStream
java.net.NoRouteToHostException: No route to host
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
	at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1610)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588)
16/11/07 10:12:56 INFO DFSClient: Abandoning BP-2054394024-128.142.35.237-1427464967087:blk_1139996772_66875988
16/11/07 10:12:56 INFO DFSClient: Excluding datanode 128.142.210.232:1004
16/11/07 10:12:56 INFO SecurityManager: Changing view acls to: etejedor
16/11/07 10:12:56 INFO SecurityManager: Changing modify acls to: etejedor
16/11/07 10:12:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(etejedor); users with modify permissions: Set(etejedor)
16/11/07 10:12:56 INFO Client: Submitting application 2668 to ResourceManager
16/11/07 10:12:56 INFO YarnClientImpl: Submitted application application_1478008051180_2668
16/11/07 10:12:57 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)
16/11/07 10:12:57 INFO Client: 
	 client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
	 diagnostics: N/A
	 ApplicationMaster host: N/A
	 ApplicationMaster RPC port: -1
	 queue: root.standard
	 start time: 1478509976232
	 final status: UNDEFINED
	 tracking URL: http://p01001532067275.cern.ch:8088/proxy/application_1478008051180_2668/
	 user: etejedor
16/11/07 10:12:58 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)
16/11/07 10:12:59 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)
16/11/07 10:13:00 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)
16/11/07 10:13:01 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)
16/11/07 10:13:02 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)
16/11/07 10:13:03 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)
16/11/07 10:13:04 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)
16/11/07 10:13:05 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED)
16/11/07 10:13:05 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
16/11/07 10:13:05 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> p01001532067275.cern.ch, PROXY_URI_BASES -> http://p01001532067275.cern.ch:8088/proxy/application_1478008051180_2668), /proxy/application_1478008051180_2668
16/11/07 10:13:05 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
16/11/07 10:13:06 INFO Client: Application report for application_1478008051180_2668 (state: RUNNING)
16/11/07 10:13:06 INFO Client: 
	 client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
	 diagnostics: N/A
	 ApplicationMaster host: 128.142.210.235
	 ApplicationMaster RPC port: 0
	 queue: root.standard
	 start time: 1478509976232
	 final status: UNDEFINED
	 tracking URL: http://p01001532067275.cern.ch:8088/proxy/application_1478008051180_2668/
	 user: etejedor
16/11/07 10:13:06 INFO YarnClientSchedulerBackend: Application application_1478008051180_2668 has started running.
16/11/07 10:13:06 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40031.
16/11/07 10:13:06 INFO NettyBlockTransferService: Server created on 40031
16/11/07 10:13:06 INFO BlockManagerMaster: Trying to register BlockManager
16/11/07 10:13:06 INFO BlockManagerMasterEndpoint: Registering block manager 188.184.91.42:40031 with 511.1 MB RAM, BlockManagerId(driver, 188.184.91.42, 40031)
16/11/07 10:13:06 INFO BlockManagerMaster: Registered BlockManager
16/11/07 10:13:06 INFO EventLoggingListener: Logging events to hdfs:///user/spark/applicationHistory/application_1478008051180_2668
16/11/07 10:13:06 INFO DFSClient: Exception in createBlockOutputStream
java.net.NoRouteToHostException: No route to host
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
	at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1610)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588)
16/11/07 10:13:06 INFO DFSClient: Abandoning BP-2054394024-128.142.35.237-1427464967087:blk_1139996774_66875990
16/11/07 10:13:06 INFO DFSClient: Excluding datanode 128.142.210.232:1004
16/11/07 10:13:15 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (itrac1508.cern.ch:37986) with ID 2
16/11/07 10:13:15 WARN TableMapping: /etc/hadoop/conf/topology.table.file cannot be read.
java.io.FileNotFoundException: /etc/hadoop/conf/topology.table.file (No such file or directory)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at java.io.FileInputStream.<init>(FileInputStream.java:93)
	at java.io.FileReader.<init>(FileReader.java:58)
	at org.apache.hadoop.net.TableMapping$RawTableMapping.load(TableMapping.java:101)
	at org.apache.hadoop.net.TableMapping$RawTableMapping.resolve(TableMapping.java:134)
	at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
	at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
	at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
	at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)
	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:292)
	at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:284)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:284)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:196)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:167)
	at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/11/07 10:13:15 WARN TableMapping: Failed to read topology table. /default-rack will be used for all nodes.
16/11/07 10:13:15 INFO BlockManagerMasterEndpoint: Registering block manager itrac1508.cern.ch:52262 with 511.5 MB RAM, BlockManagerId(2, itrac1508.cern.ch, 52262)
16/11/07 10:13:16 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (itrac1501.cern.ch:55824) with ID 1
16/11/07 10:13:16 INFO BlockManagerMasterEndpoint: Registering block manager itrac1501.cern.ch:43424 with 511.5 MB RAM, BlockManagerId(1, itrac1501.cern.ch, 43424)
16/11/07 10:13:16 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8

Check the tree partitions.

In [5]:
print "Tree entries partitioning:", dTree.GetPartitions()
Tree entries partitioning: [(0,499), (500,999), (1000,1499), (1500,1999)]

Request the processing of the tree. This will fire up a map-reduce chain with fillCpp as mapper and mergeCpp as reducer functions. The final result is a histogram.

In [6]:
h = dTree.ProcessAndMerge(ROOT.fillCpp, ROOT.mergeCpp)

Plot the resulting histogram.

In [7]:
%jsroot on
c = ROOT.TCanvas()
h.Draw()
c.Draw()