#!/usr/bin/env python # coding: utf-8 #

Processing HDF5 files with Spark

#
# This notebook demonstrates accessing and processing HDF5 files # ***Author: Prasanth Kothuri*** # ***Contact: Prasanth Kothuri*** # # To run this notebook we used the following configuration: # * *Software stack*: LCG_96 Python3 # * *Platform*: centos8-gcct # * *Spark Cluster: cloud containers # In[3]: # Ensure h5py package is available import os import h5py, io print (h5py.__version__) prefix = "root://eosuser.cern.ch//eos/user/p/pkothuri/HDF5/" HDF5Files = [ "1541897545335000000_162_1.h5", "1541902534935000000_163_1.h5", "1541902534935000000_163_1.h5" ] # In[ ]: # map to process HDF5 files def extractHDF5(hdf5file): prefix = hdf5file[0] content = hdf5file[1] f=h5py.File(io.BytesIO(content)) return hdf5file[0],int(f['AwakeEventInfo']['configurationVersionNumber'][()]),int(f['AwakeEventInfo']['eventNumber'][()]),int(f['AwakeEventInfo']['runNumber'][()]),int(f['AwakeEventInfo']['timestamp'][()]) # In[ ]: # build list of files files = [] for HDF5File in HDF5Files: files.append(prefix + HDF5File) print(files) # RDD representing tuples of file path and corresponding file content inputData = sc.binaryFiles(','.join(files)) #inputData = sc.binaryFiles("root://eospublic.cern.ch//eos/experiment/awake/event_data/2018/11/11") # Apply map function hdf5_reduced_collection = inputData.map(lambda x: extractHDF5(x)) # convert RDD to DF df = spark.createDataFrame(hdf5_reduced_collection).toDF("filename", "configurationVersionNumber", "eventNumber", "runNumber", "timestamp") # In[4]: # save to eos as csv try: df.coalesce(numPartitions = 1) \ .write \ .option(key = "header", value = "true") \ .option(key = "sep", value = ",") \ .option(key = "encoding", value = "UTF-8") \ .option(key = "compresion", value = "none") \ .mode(saveMode = "OVERWRITE") \ .csv(path = "root://eosuser.cern.ch//eos/user/p/pkothuri/result/") except Exception as e: # There is a bug in xrootd-connector and we can ignore it if "ch.cern.eos.XRootDFileSystem.delete" in str(e): pass else: raise Exception(e) # In[5]: # and reading back csvOutput = spark.read.csv("root://eosuser.cern.ch//eos/user/p/pkothuri/result/") print csvOutput.count()