In [ ]:
metrics_file = "metrics/app-20201104211738-0276"
In [ ]:
from glom import glom
In [ ]:
import json
from collections import defaultdict

evset = defaultdict(lambda: list())

def dequalify_event(ev):
    if "." in ev:
        idx = ev.rfind(".")
        return ev[idx+1:]
    return ev

with open(metrics_file, "r") as mf:
    for line in mf:
        js = json.loads(line)
        event = dequalify_event(glom(json.loads(line), "Event"))
        evset[event].append(js)
In [ ]:
list(evset.keys())
In [ ]:
job0 = evset['SparkListenerJobStart'][0]
job0
In [ ]:
jobs = evset["SparkListenerJobStart"]
In [ ]:
len(jobs)
In [ ]:
spec = {
    'job_ids': ['Job ID'],
    'stage_ids': [
        ('Stage Infos', 
         [
             {
                 'stage_id': 'Stage ID', 
                 'parents': 'Parent IDs'
             }
         ]
        )
    ]
}

glom(jobs, spec)
In [ ]:
spec = [
    {
        'job_id': 'Job ID',
        'stages': ('Stage Infos', 
         [
             {
                 'stage_id': 'Stage ID', 
                 'parents': 'Parent IDs'
             }
         ]
        )
    }
]

glom(jobs, spec)
In [ ]:
from glom import S, T, glom, Assign, Spec

spec = [(
    S(job_id=T['Job ID']),
    {
        'stage': ('Stage Infos', 
         [
             {
                 'stage_id': 'Stage ID', 
                 'job_id': S['job_id'],
                 'parents': 'Parent IDs'
             }
         ]
        )
    }
)]

glom(jobs, spec)
In [ ]:
from glom import flatten

spec = [(
    S(job_id=T['Job ID']),
    ('Stage Infos', 
         [
             {
                 'stage_id': 'Stage ID', 
                 'attempt_id': 'Stage Attempt ID',
                 'name': 'Stage Name',
                 'details': 'Details',
                 'accumulables': 'Accumulables',
                 'job_id': S['job_id'],
                 'parents': 'Parent IDs'
             }
         ]
    )
)]

flatten(glom(jobs, spec))[-1]
In [ ]:
from glom import flatten, merge

spec = [
    ('Stage Infos', 
     [
         (S(stage_id=T['Stage ID'], rdd_info=T['RDD Info']),
          merge([{'stage_id': S['stage_id']}, {'rdd_info': S['rdd_info']}])
         )
     ]
    )
]
glom(jobs, spec)
In [ ]:
from glom import flatten, merge

spec = [
    ('Stage Infos', 
     [
         (S(stage_id=T['Stage ID'], rdd_info=T['RDD Info']),
          [merge([{'stage_id': S['stage_id']}, rddi]) for rddi in S['rdd_info']])
     ]
    )
]
glom(jobs, spec)
In [ ]:
from glom import flatten, merge

spec = [
    ('Stage Infos', 
     [
         (S(stage_id=T['Stage ID']),
          [rddi.update({'stage_id': S['stage_id']}) for rddi in T['RDD Info']]
         )
         
     ]
    )
]
glom(jobs, spec)
In [ ]:
from glom import flatten, Merge, Iter

spec = [(
    S(job_id=T['Job ID'], properties=T['Properties']),
    ('Properties', 
             (
                T.items(),
                Iter().map({'job_id': S['job_id'], 'property': T[0], 'value': T[1]}).all()
             )
    )
)]

flatten(glom(jobs, spec))
In [ ]:
help({'a': 1, 'b': 2}.update)
In [ ]: