metrics_file = "metrics/app-20201104211738-0276"
from glom import glom
import json
from collections import defaultdict
evset = defaultdict(lambda: list())
def dequalify_event(ev):
if "." in ev:
idx = ev.rfind(".")
return ev[idx+1:]
return ev
with open(metrics_file, "r") as mf:
for line in mf:
js = json.loads(line)
event = dequalify_event(glom(json.loads(line), "Event"))
evset[event].append(js)
list(evset.keys())
job0 = evset['SparkListenerJobStart'][0]
job0
jobs = evset["SparkListenerJobStart"]
len(jobs)
spec = {
'job_ids': ['Job ID'],
'stage_ids': [
('Stage Infos',
[
{
'stage_id': 'Stage ID',
'parents': 'Parent IDs'
}
]
)
]
}
glom(jobs, spec)
spec = [
{
'job_id': 'Job ID',
'stages': ('Stage Infos',
[
{
'stage_id': 'Stage ID',
'parents': 'Parent IDs'
}
]
)
}
]
glom(jobs, spec)
from glom import S, T, glom, Assign, Spec
spec = [(
S(job_id=T['Job ID']),
{
'stage': ('Stage Infos',
[
{
'stage_id': 'Stage ID',
'job_id': S['job_id'],
'parents': 'Parent IDs'
}
]
)
}
)]
glom(jobs, spec)
from glom import flatten
spec = [(
S(job_id=T['Job ID']),
('Stage Infos',
[
{
'stage_id': 'Stage ID',
'attempt_id': 'Stage Attempt ID',
'name': 'Stage Name',
'details': 'Details',
'accumulables': 'Accumulables',
'job_id': S['job_id'],
'parents': 'Parent IDs'
}
]
)
)]
flatten(glom(jobs, spec))[-1]
from glom import flatten, merge
spec = [
('Stage Infos',
[
(S(stage_id=T['Stage ID'], rdd_info=T['RDD Info']),
merge([{'stage_id': S['stage_id']}, {'rdd_info': S['rdd_info']}])
)
]
)
]
glom(jobs, spec)
from glom import flatten, merge
spec = [
('Stage Infos',
[
(S(stage_id=T['Stage ID'], rdd_info=T['RDD Info']),
[merge([{'stage_id': S['stage_id']}, rddi]) for rddi in S['rdd_info']])
]
)
]
glom(jobs, spec)
from glom import flatten, merge
spec = [
('Stage Infos',
[
(S(stage_id=T['Stage ID']),
[rddi.update({'stage_id': S['stage_id']}) for rddi in T['RDD Info']]
)
]
)
]
glom(jobs, spec)
from glom import flatten, Merge, Iter
spec = [(
S(job_id=T['Job ID'], properties=T['Properties']),
('Properties',
(
T.items(),
Iter().map({'job_id': S['job_id'], 'property': T[0], 'value': T[1]}).all()
)
)
)]
flatten(glom(jobs, spec))
help({'a': 1, 'b': 2}.update)