#!/usr/bin/env python
# coding: utf-8
#
Processing HDF5 files with Spark
#
# This notebook demonstrates accessing and processing HDF5 files
# ***Author: Prasanth Kothuri***
# ***Contact: Prasanth Kothuri***
#
# To run this notebook we used the following configuration:
# * *Software stack*: LCG_96 Python3
# * *Platform*: centos8-gcct
# * *Spark Cluster: cloud containers
# In[3]:
# Ensure h5py package is available
import os
import h5py, io
print (h5py.__version__)
prefix = "root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/"
HDF5Files = [
"1541897545335000000_162_1.h5",
"1541902534935000000_163_1.h5",
"1541902534935000000_163_1.h5"
]
# In[ ]:
# map to process HDF5 files
def extractHDF5(hdf5file):
prefix = hdf5file[0]
content = hdf5file[1]
f=h5py.File(io.BytesIO(content))
return hdf5file[0],int(f['AwakeEventInfo']['configurationVersionNumber'][()]),int(f['AwakeEventInfo']['eventNumber'][()]),int(f['AwakeEventInfo']['runNumber'][()]),int(f['AwakeEventInfo']['timestamp'][()])
# In[ ]:
# build list of files
files = []
for HDF5File in HDF5Files:
files.append(prefix + HDF5File)
print(files)
# RDD representing tuples of file path and corresponding file content
inputData = sc.binaryFiles(','.join(files))
#inputData = sc.binaryFiles("root://eospublic.cern.ch//eos/experiment/awake/event_data/2018/11/11")
# Apply map function
hdf5_reduced_collection = inputData.map(lambda x: extractHDF5(x))
# convert RDD to DF
df = spark.createDataFrame(hdf5_reduced_collection).toDF("filename", "configurationVersionNumber", "eventNumber", "runNumber", "timestamp")
# In[4]:
# save to eos as csv
try:
df.coalesce(numPartitions = 1) \
.write \
.option(key = "header", value = "true") \
.option(key = "sep", value = ",") \
.option(key = "encoding", value = "UTF-8") \
.option(key = "compresion", value = "none") \
.mode(saveMode = "OVERWRITE") \
.csv(path = "root://eosuser.cern.ch//eos/user/p/pkothuri/result/")
except Exception as e:
# There is a bug in xrootd-connector and we can ignore it
if "ch.cern.eos.XRootDFileSystem.delete" in str(e):
pass
else:
raise Exception(e)
# In[5]:
# and reading back
csvOutput = spark.read.csv("root://eosuser.cern.ch//eos/user/p/pkothuri/result/")
print csvOutput.count()