This notebook demonstrates accessing and processing HDF5 files
*Author: Prasanth Kothuri*
*Contact: Prasanth Kothuri*
To run this notebook we used the following configuration:
# Ensure h5py package is available
import os
import h5py, io
print (h5py.__version__)
prefix = "root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/"
HDF5Files = [
"1541897545335000000_162_1.h5",
"1541902534935000000_163_1.h5",
"1541902534935000000_163_1.h5"
]
2.9.0
# map to process HDF5 files
def extractHDF5(hdf5file):
prefix = hdf5file[0]
content = hdf5file[1]
f=h5py.File(io.BytesIO(content))
return hdf5file[0],int(f['AwakeEventInfo']['configurationVersionNumber'][()]),int(f['AwakeEventInfo']['eventNumber'][()]),int(f['AwakeEventInfo']['runNumber'][()]),int(f['AwakeEventInfo']['timestamp'][()])
# build list of files
files = []
for HDF5File in HDF5Files:
files.append(prefix + HDF5File)
print(files)
# RDD representing tuples of file path and corresponding file content
inputData = sc.binaryFiles(','.join(files))
#inputData = sc.binaryFiles("root://eospublic.cern.ch//eos/experiment/awake/event_data/2018/11/11")
# Apply map function
hdf5_reduced_collection = inputData.map(lambda x: extractHDF5(x))
# convert RDD to DF
df = spark.createDataFrame(hdf5_reduced_collection).toDF("filename", "configurationVersionNumber", "eventNumber", "runNumber", "timestamp")
['root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/1541897545335000000_162_1.h5', 'root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/1541902534935000000_163_1.h5', 'root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/1541902534935000000_163_1.h5']
# save to eos as csv
try:
df.coalesce(numPartitions = 1) \
.write \
.option(key = "header", value = "true") \
.option(key = "sep", value = ",") \
.option(key = "encoding", value = "UTF-8") \
.option(key = "compresion", value = "none") \
.mode(saveMode = "OVERWRITE") \
.csv(path = "root://eosuser.cern.ch//eos/user/p/pkothuri/result/")
except Exception as e:
# There is a bug in xrootd-connector and we can ignore it
if "ch.cern.eos.XRootDFileSystem.delete" in str(e):
pass
else:
raise Exception(e)
# and reading back
csvOutput = spark.read.csv("root://eosuser.cern.ch//eos/user/p/pkothuri/result/")
print csvOutput.count()
4