Create a list with the input file names.
inputfile = "/afs/cern.ch/user/e/etejedor/public/0.root"
numfiles = 2
files = [inputfile for _ in xrange(numfiles)]
print "Will be processing", numfiles, "files"
Will be processing 2 files
ROOT imports.
import ROOT
from DistROOT import DistTree
Welcome to JupyROOT 6.07/09
Define the mapper and reducer functions in C++.
%%cpp -d
TH1F* fillCpp(TTreeReader& reader) {
TTreeReaderValue<std::vector<ROOT::Math::PxPyPzEVector>> tracksRV(reader, "tracks");
TH1F *h = new TH1F("hpt", "Pt histogram", 64, 0, 50);
while (reader.Next()) {
auto tracks = *tracksRV;
for (auto&& track : tracks) {
auto pt = track.Pt();
h->Fill(pt);
}
}
return h;
}
TH1F* mergeCpp(TH1F *h1, const TH1F *h2) {
h1->Add(h2);
return h1;
}
Create a distributed tree with the list of file names, name of the tree and number of logical partitions. The environment was previously configured with CVMFS to use the hadalytic
cluster.
dTree = DistTree(filelist = files,
treename = "events",
npartitions = 4)
16/11/07 10:12:49 INFO SparkContext: Running Spark version 1.6.0 16/11/07 10:12:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/11/07 10:12:50 INFO SecurityManager: Changing view acls to: etejedor 16/11/07 10:12:50 INFO SecurityManager: Changing modify acls to: etejedor 16/11/07 10:12:50 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(etejedor); users with modify permissions: Set(etejedor) 16/11/07 10:12:50 INFO Utils: Successfully started service 'sparkDriver' on port 36923. 16/11/07 10:12:50 INFO Slf4jLogger: Slf4jLogger started 16/11/07 10:12:50 INFO Remoting: Starting remoting 16/11/07 10:12:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@188.184.91.42:43819] 16/11/07 10:12:51 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 43819. 16/11/07 10:12:51 INFO SparkEnv: Registering MapOutputTracker 16/11/07 10:12:51 INFO SparkEnv: Registering BlockManagerMaster 16/11/07 10:12:51 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-01566d2d-a74f-4e16-881c-b88c71ad59f8 16/11/07 10:12:51 INFO MemoryStore: MemoryStore started with capacity 511.1 MB 16/11/07 10:12:51 INFO SparkEnv: Registering OutputCommitCoordinator 16/11/07 10:12:51 INFO Server: jetty-8.y.z-SNAPSHOT 16/11/07 10:12:51 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 16/11/07 10:12:51 INFO Utils: Successfully started service 'SparkUI' on port 4040. 16/11/07 10:12:51 INFO SparkUI: Started SparkUI at http://188.184.91.42:4040 16/11/07 10:12:52 INFO RMProxy: Connecting to ResourceManager at p01001532067275.cern.ch/128.142.35.237:8032 16/11/07 10:12:52 INFO Client: Requesting a new application from cluster with 14 NodeManagers 16/11/07 10:12:52 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 16/11/07 10:12:52 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 16/11/07 10:12:52 INFO Client: Setting up container launch context for our AM 16/11/07 10:12:52 INFO Client: Setting up the launch environment for our AM container 16/11/07 10:12:52 INFO Client: Preparing resources for our AM container 16/11/07 10:12:53 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 16/11/07 10:12:53 INFO YarnSparkHadoopUtil: getting token for namenode: hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668 16/11/07 10:12:53 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 2629586 for etejedor on 128.142.35.237:8020 16/11/07 10:12:54 INFO Client: Uploading resource file:/cvmfs/sft.cern.ch/lcg/releases/spark/1.6.0-95ed9/x86_64-slc6-gcc49-opt/lib/spark-assembly-1.6.0-hadoop2.6.0.jar -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/spark-assembly-1.6.0-hadoop2.6.0.jar 16/11/07 10:12:55 INFO Client: Uploading resource file:/cvmfs/sft.cern.ch/lcg/releases/spark/1.6.0-95ed9/x86_64-slc6-gcc49-opt/python/lib/pyspark.zip -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/pyspark.zip 16/11/07 10:12:55 INFO Client: Uploading resource file:/cvmfs/sft.cern.ch/lcg/releases/spark/1.6.0-95ed9/x86_64-slc6-gcc49-opt/python/lib/py4j-0.9-src.zip -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/py4j-0.9-src.zip 16/11/07 10:12:56 INFO Client: Uploading resource file:/tmp/spark-fec88f51-128c-448c-9c4a-939f582df7ae/__spark_conf__4669209039456569521.zip -> hdfs://p01001532067275.cern.ch/user/etejedor/.sparkStaging/application_1478008051180_2668/__spark_conf__4669209039456569521.zip 16/11/07 10:12:56 INFO DFSClient: Exception in createBlockOutputStream java.net.NoRouteToHostException: No route to host at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1610) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588) 16/11/07 10:12:56 INFO DFSClient: Abandoning BP-2054394024-128.142.35.237-1427464967087:blk_1139996772_66875988 16/11/07 10:12:56 INFO DFSClient: Excluding datanode 128.142.210.232:1004 16/11/07 10:12:56 INFO SecurityManager: Changing view acls to: etejedor 16/11/07 10:12:56 INFO SecurityManager: Changing modify acls to: etejedor 16/11/07 10:12:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(etejedor); users with modify permissions: Set(etejedor) 16/11/07 10:12:56 INFO Client: Submitting application 2668 to ResourceManager 16/11/07 10:12:56 INFO YarnClientImpl: Submitted application application_1478008051180_2668 16/11/07 10:12:57 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED) 16/11/07 10:12:57 INFO Client: client token: Token { kind: YARN_CLIENT_TOKEN, service: } diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: root.standard start time: 1478509976232 final status: UNDEFINED tracking URL: http://p01001532067275.cern.ch:8088/proxy/application_1478008051180_2668/ user: etejedor 16/11/07 10:12:58 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED) 16/11/07 10:12:59 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED) 16/11/07 10:13:00 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED) 16/11/07 10:13:01 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED) 16/11/07 10:13:02 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED) 16/11/07 10:13:03 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED) 16/11/07 10:13:04 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED) 16/11/07 10:13:05 INFO Client: Application report for application_1478008051180_2668 (state: ACCEPTED) 16/11/07 10:13:05 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null) 16/11/07 10:13:05 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> p01001532067275.cern.ch, PROXY_URI_BASES -> http://p01001532067275.cern.ch:8088/proxy/application_1478008051180_2668), /proxy/application_1478008051180_2668 16/11/07 10:13:05 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 16/11/07 10:13:06 INFO Client: Application report for application_1478008051180_2668 (state: RUNNING) 16/11/07 10:13:06 INFO Client: client token: Token { kind: YARN_CLIENT_TOKEN, service: } diagnostics: N/A ApplicationMaster host: 128.142.210.235 ApplicationMaster RPC port: 0 queue: root.standard start time: 1478509976232 final status: UNDEFINED tracking URL: http://p01001532067275.cern.ch:8088/proxy/application_1478008051180_2668/ user: etejedor 16/11/07 10:13:06 INFO YarnClientSchedulerBackend: Application application_1478008051180_2668 has started running. 16/11/07 10:13:06 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40031. 16/11/07 10:13:06 INFO NettyBlockTransferService: Server created on 40031 16/11/07 10:13:06 INFO BlockManagerMaster: Trying to register BlockManager 16/11/07 10:13:06 INFO BlockManagerMasterEndpoint: Registering block manager 188.184.91.42:40031 with 511.1 MB RAM, BlockManagerId(driver, 188.184.91.42, 40031) 16/11/07 10:13:06 INFO BlockManagerMaster: Registered BlockManager 16/11/07 10:13:06 INFO EventLoggingListener: Logging events to hdfs:///user/spark/applicationHistory/application_1478008051180_2668 16/11/07 10:13:06 INFO DFSClient: Exception in createBlockOutputStream java.net.NoRouteToHostException: No route to host at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1610) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588) 16/11/07 10:13:06 INFO DFSClient: Abandoning BP-2054394024-128.142.35.237-1427464967087:blk_1139996774_66875990 16/11/07 10:13:06 INFO DFSClient: Excluding datanode 128.142.210.232:1004 16/11/07 10:13:15 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (itrac1508.cern.ch:37986) with ID 2 16/11/07 10:13:15 WARN TableMapping: /etc/hadoop/conf/topology.table.file cannot be read. java.io.FileNotFoundException: /etc/hadoop/conf/topology.table.file (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at java.io.FileInputStream.<init>(FileInputStream.java:93) at java.io.FileReader.<init>(FileReader.java:58) at org.apache.hadoop.net.TableMapping$RawTableMapping.load(TableMapping.java:101) at org.apache.hadoop.net.TableMapping$RawTableMapping.resolve(TableMapping.java:134) at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119) at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101) at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81) at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:292) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$1.apply(TaskSchedulerImpl.scala:284) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:284) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:196) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:167) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:104) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/11/07 10:13:15 WARN TableMapping: Failed to read topology table. /default-rack will be used for all nodes. 16/11/07 10:13:15 INFO BlockManagerMasterEndpoint: Registering block manager itrac1508.cern.ch:52262 with 511.5 MB RAM, BlockManagerId(2, itrac1508.cern.ch, 52262) 16/11/07 10:13:16 INFO YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (itrac1501.cern.ch:55824) with ID 1 16/11/07 10:13:16 INFO BlockManagerMasterEndpoint: Registering block manager itrac1501.cern.ch:43424 with 511.5 MB RAM, BlockManagerId(1, itrac1501.cern.ch, 43424) 16/11/07 10:13:16 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
Check the tree partitions.
print "Tree entries partitioning:", dTree.GetPartitions()
Tree entries partitioning: [(0,499), (500,999), (1000,1499), (1500,1999)]
Request the processing of the tree. This will fire up a map-reduce chain with fillCpp
as mapper and mergeCpp
as reducer functions. The final result is a histogram.
h = dTree.ProcessAndMerge(ROOT.fillCpp, ROOT.mergeCpp)
Plot the resulting histogram.
%jsroot on
c = ROOT.TCanvas()
h.Draw()
c.Draw()