#!/usr/bin/env python # coding: utf-8 # # Reading node feature in Python # # We read the BHSA feature `g_word_utf8`, which maps nearly half a million integers to Hebrew word occurrences # in the Hebrew Bible. # # We measure the execution time of a second run of the last cell, so that we do not count warming up effects. # In[1]: import os import sys from typing import Dict # # Choice of test feature # In[2]: base = f'~/text-fabric-data/etcbc/bhsa/tf/c' feature = 'g_word_utf8' featurePath = f'{os.path.expanduser(base)}/{feature}.tf' # # Auxiliary functions for reading a TF feature # In[3]: def error(msg): sys.stderr.write(f'{msg}\n') # In[4]: def showResults(errors, data): if errors == 0: maxNode = max(data.keys()) if type(data) is dict else len(data) print(f'{len(data)} results, last node {maxNode}') print(data[1]) print(data[2]) print(data[maxNode if type(data) is dict else maxNode - 1]) else: print(f'{errors} errors') # In[5]: def valueFromTf(tf): return '\\'.join(x.replace('\\t', '\t').replace('\\n', '\n') for x in tf.split('\\\\')) def setFromSpec(spec): covered = set() for r_str in spec.split(','): bounds = r_str.split('-') if len(bounds) == 1: covered.add(int(r_str)) else: b = int(bounds[0]) e = int(bounds[1]) if (e < b): (b, e) = (e, b) for n in range(b, e + 1): covered.add(n) return covered # Just reading a TF feature from disk, get through the metadata, en deliver all lines in memory, plus the starting line for the data. # # The whole file gets slurped. # In[6]: def readFile(path): if not os.path.exists(path): error('TF reading: feature file "{}" does not exist'.format(path)) return False with open(path, encoding='utf8') as fh: contents = fh.read() lines = contents.split('\n') if lines[-1] == '': lines.pop() i = 0 for line in lines: i += 1 if line.startswith('@'): continue else: if line != '': error('Line {}: missing blank line after metadata'.format(i)) return False else: break return (lines, i) # The readTf function as done in Text-Fabric. # In[7]: def readTf(path): if not os.path.exists(path): error('TF reading: feature file "{}" does not exist'.format(path)) return False fh = open(path, encoding='utf8') i = 0 for line in fh: i += 1 text = line.rstrip() if text.startswith('@'): continue else: if text != '': error('Line {}: missing blank line after metadata'.format(i)) fh.close() return False else: break result = readDataTf(fh, i) fh.close() return result # Reading the data part pf a feature and storing it in a dict. # In[8]: def readDataTf(fh, firstI): i = firstI implicit_node = 1 data = {} normFields = 2 isNum = False errors = 0 for line in fh: i += 1 fields = line.rstrip('\n').split('\t') lfields = len(fields) if lfields > normFields: error(f'{i}: wrongFields') errors += 1 continue if lfields == normFields: nodes = setFromSpec(fields[0]) valTf = fields[-1] else: nodes = {implicit_node} if lfields == 1: valTf = fields[0] else: valTf = '' implicit_node = max(nodes) + 1 value = ( int(valTf) if isNum and valTf != '' else None if isNum else '' if valTf == '' else valueFromTf(valTf) ) for n in nodes: if value is not None: data[n] = value return (errors, data) # A variant: read a TF feature and store it in a list. # In[9]: def readTfList(path): if not os.path.exists(path): error('TF reading: feature file "{}" does not exist'.format(path)) return False fh = open(path, encoding='utf8') i = 0 for line in fh: i += 1 text = line.rstrip() if text.startswith('@'): continue else: if text != '': error('Line {}: missing blank line after metadata'.format(i)) fh.close() return False else: break result = readDataTfList(fh, i) fh.close() return result # In[10]: def readDataTfList(fh, firstI): i = firstI implicit_node = 1 data = [] normFields = 2 isNum = False errors = 0 for line in fh: i += 1 fields = line.rstrip('\n').split('\t') lfields = len(fields) if lfields > normFields: error(f'{i}: wrongFields') errors += 1 continue if lfields == normFields: nodes = setFromSpec(fields[0]) valTf = fields[-1] else: nodes = {implicit_node} if lfields == 1: valTf = fields[0] else: valTf = '' implicit_node = max(nodes) + 1 value = ( int(valTf) if isNum and valTf != '' else None if isNum else '' if valTf == '' else valueFromTf(valTf) ) for n in nodes: if value is not None: data.append(value) return (errors, data) # Read a TF feature by slurping. # In[11]: def readTfSlurp(path): if not os.path.exists(path): error('TF reading: feature file "{}" does not exist'.format(path)) return False with open(path, encoding='utf8') as fh: contents = fh.read() lines = contents.split('\n') if lines[-1] == '': lines.pop() i = 0 for line in lines: i += 1 if line.startswith('@'): continue else: if line != '': error('Line {}: missing blank line after metadata'.format(i)) return False else: break result = readDataTfSlurp(lines, i) return result # In[12]: def readDataTfSlurp(lines, firstI): i = firstI - 1 implicit_node = 1 data = {} normFields = 2 isNum = False errors = 0 for line in lines[firstI:]: i += 1 fields = line.split('\t') lfields = len(fields) if lfields > normFields: error(f'{i}: wrongFields') errors += 1 continue if lfields == normFields: nodes = setFromSpec(fields[0]) valTf = fields[-1] else: nodes = {implicit_node} if lfields == 1: valTf = fields[0] else: valTf = '' implicit_node = max(nodes) + 1 value = ( int(valTf) if isNum and valTf != '' else None if isNum else '' if valTf == '' else valueFromTf(valTf) ) for n in nodes: if value is not None: data[n] = value return (errors, data) # In[27]: def readDataTfSlurpOpt(lines, firstI): i = firstI - 1 implicit_node = 1 data: Dict[int, str] = dict() normFields = 2 isNum = False errors = 0 for line in lines[firstI:]: i += 1 fields = line.split('\t') lfields = len(fields) if lfields > normFields: error(f'{i}: wrongFields') errors += 1 continue if lfields == normFields: nodes = setFromSpec(fields[0]) valTf = fields[-1] else: nodes = {implicit_node} if lfields == 1: valTf = fields[0] else: valTf = '' implicit_node = max(nodes) + 1 value = ( int(valTf) if isNum and valTf != '' else None if isNum else '' if valTf == '' else valueFromTf(valTf) ) for n in nodes: if value is not None: data[n] = value return (errors, data) # # Test: straight TF reading # In[14]: (errors, data) = readTf(featurePath) # Execution time: around 1.2s # In[15]: showResults(errors, data) # # Test: TF reading as list # In[16]: (errors, data) = readTfList(featurePath) # Execution time: around 1.2s # In[17]: showResults(errors, data) # # Test: TF slurping # In[18]: (errors, data) = readTfSlurp(featurePath) # In[19]: showResults(errors, data) # Execution time: around 1.1s # # Test: slurping and then optimized TF processing # In[30]: (lines, first) = readFile(featurePath) # Execution time: around 0.1s # In[31]: (errors, data) = readDataTfSlurpOpt(lines, first) # Execution time: around 1.0s # In[29]: showResults(errors, data) # In[ ]: