In [ ]:
# replace this with a Spark history log of your own or parameterize with Papermill!

metrics_file = "metrics/application_1601392010735_0030"

wide_output_file = "output.db"
output_file = "output.db"

transform_structure_prefixes = True
use_structure_prefixes = False
driver_memory = '8g'
executor_memory = '8g'
master = 'local[*]'

debug_me = False
interactive = True
store_parquet = True
In [ ]:
import pandas as pd

if debug_me:
    pd.options.display.max_columns = None
    pd.options.display.max_rows = None
In [ ]:
import pyspark
import pyspark.sql.functions as F
from pyspark import SparkConf

import json
import eventlog

spark = pyspark.sql.SparkSession.\
    builder.\
    master(master).\
    config("spark.ui.showConsoleProgress", False).\
    config("spark.driver.memory", driver_memory).\
    config("spark.executor.memory", executor_memory).\
    getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("OFF")

logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
In [ ]:
metrics = spark.read.json(metrics_file)
eventlog.init_eventlog(metrics, use_structure_prefixes=use_structure_prefixes, transform_structure_prefixes=transform_structure_prefixes, debug_me=debug_me)
In [ ]:
from eventlog import *

app_id, app_name = metrics.select("App ID", "App Name").dropna().collect()[0]
In [ ]:
app_name
In [ ]:
accumulator_updates = driver_accumulator_updates(metrics)
In [ ]:
appevents = drop_null_cols(metrics.where((metrics.Event == "SparkListenerApplicationEnd") | (metrics.Event == "SparkListenerApplicationStart")))
In [ ]:
if debug_me:
    appevents.toPandas()
In [ ]:
app_runtime = app_timeline(metrics)
In [ ]:
if debug_me:
    app_runtime.toPandas()
In [ ]:
job_runtime = job_timeline(metrics)
In [ ]:
sql_runtime = sql_timeline(metrics)
In [ ]:
jobs_to_stages = all_stage_meta(metrics).select('Job ID', 'Stage ID').distinct()
In [ ]:
plan_nodes, accumulable_nodes, metadata_nodes = plan_dfs(metrics)
In [ ]:
if debug_me:
    accumulable_nodes.groupBy("name", "accumulatorId", "execution_id").count().where(F.col("count") > 1).orderBy("name").show()
In [ ]:
accumulator_updates = driver_accumulator_updates(metrics)
In [ ]:
if debug_me:
    accumulable_nodes.groupBy("name", "accumulatorId", "execution_id").count().orderBy(-F.col("count"), F.col("accumulatorId")).toPandas()
In [ ]:
if debug_me:
    accumulator_updates.orderBy("accumulator").toPandas()
In [ ]:
sql_info_df = with_appmeta(sql_info(metrics))
In [ ]:
metric_meta = spark.createDataFrame(data=eventlog.metric_metas)
task_metrics = tidy_tasks(metrics)
In [ ]:
tasks_to_plans = with_appmeta(task_metrics.join(accumulable_nodes, "accumulatorId").join(plan_nodes, "plan_node"))
In [ ]:
import altair as alt
alt.data_transformers.disable_max_rows()

Shuffle metrics

In [ ]:
task_byte_metrics = tidy_tasks(metrics).join(
    metric_meta.withColumnRenamed("MetricName", "Metric Name"), 
    "Metric Name", 
    how="leftouter"
).where(F.col("unit") == "bytes").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value").toPandas()

task_shuffle_metrics = task_byte_metrics[task_byte_metrics['Metric Name'].str.contains('internal.metrics.shuffle')].sort_values('Task ID')
shuffle_replacer = lambda match: "Shuffle %s" % match.group('metric')
task_shuffle_metrics['Metric Name'] = task_shuffle_metrics['Metric Name'].str.replace(r'internal\.metrics\.shuffle\.(?P<kind>read|write).(?P<metric>.*)$', shuffle_replacer)
In [ ]:
stage_and_task_charts(task_shuffle_metrics, "bytes")

Executor time metrics

In [ ]:
task_metrics = tidy_tasks(metrics).join(
    metric_meta.withColumnRenamed("MetricName", "Metric Name"), 
    "Metric Name",
    how="leftouter"
).withColumn("Metric Value", F.col("Metric Value").cast("float"))

task_ms_metrics = task_metrics.where(F.col("unit") == "ms").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value")
task_ns_metrics = task_metrics.where(F.col("unit") == "ns").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value").withColumn("Metric Value", F.col("Metric Value").cast("float") / 1000000)

task_time_metrics = task_ms_metrics.union(task_ns_metrics).toPandas()

stage_and_task_charts(task_time_metrics[~task_time_metrics["Metric Name"].str.contains("internal.metrics")])
In [ ]:
task_metrics.where(F.col("unit").isin("ms", "ns")).select("Metric Name", "unit").distinct().toPandas()
In [ ]:
stage_and_task_charts(
    task_time_metrics[task_time_metrics["Metric Name"].str.contains("CPU", case=False) | 
                      task_time_metrics["Metric Name"].str.contains("GPU", case=False) |
                      task_time_metrics["Metric Name"].str.contains("JVM GC", case=False) |
                      task_time_metrics["Metric Name"].str.contains("JVMGC", case=False)
                     ])

Plotting wall-clock vs CPU time with layered charts

This gives us some sense of the relationship between CPU time and system time.

In [ ]:
cputime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorCpuTime')]
runtime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorRunTime')]
layered_stage_and_task_charts([runtime, cputime])

Memory and spill metrics

In [ ]:
stage_and_task_charts(task_byte_metrics[task_byte_metrics['Metric Name'].str.contains(' memory', case=False) | task_byte_metrics['Metric Name'].str.contains('size') | task_byte_metrics['Metric Name'].str.contains('Bytes Spilled')], "bytes")
In [ ]:
stage_and_task_charts(task_byte_metrics, "bytes")

Task metrics and metadata

In [ ]:
task_metrics_table, task_meta_table = split_metrics(task_metrics)
task_all_spark = task_metrics_table.join(task_meta_table, ["Task ID", "Stage ID", "Application ID", "Application Name"]).join(jobs_to_stages, "Stage ID")

Query plan node metrics

In [ ]:
plan_metrics = plan_nodes.join(accumulable_nodes, ["plan_node", "Application ID", "Application Name"]).join(task_all_spark, ["accumulatorId", "Application ID", "Application Name"])

plan_metrics_full = plan_metrics_rollup(plan_metrics)

Configuration information

In [ ]:
configs = meltconfig(metrics, ["SparkListenerEnvironmentUpdate","SparkListenerJobStart"])
In [ ]:
configs.toPandas()

Exporting tabular data

In [ ]:
import sqlite3
conn = sqlite3.Connection(output_file)
wide_conn = sqlite3.Connection(wide_output_file)
In [ ]:
if wide_output_file != output_file:
    with_appmeta(configs).toPandas().to_sql("configs", wide_conn, index=False, if_exists='append')
    
with_appmeta(configs).toPandas().to_sql("configs", conn, index=False, if_exists='append')
In [ ]:
task_metrics_table.join(jobs_to_stages, "Stage ID").toPandas().to_sql('task_metrics', conn, index=False, if_exists='append')
task_meta_table.join(jobs_to_stages, "Stage ID").toPandas().to_sql('task_meta', conn, index=False, if_exists='append')
In [ ]:
task_all = task_all_spark.toPandas()
In [ ]:
project_columns = ['Application ID', 'Application Name', 'Attempt', 'Executor ID', 'Failed', 'Finish Time', 'Getting Result Time', 'Host', 'Index', 'Killed', 'Launch Time', 'Locality', 'Metric Name', 'Metric Value', 'Speculative', 'Job ID', 'Stage ID', 'Task ID']

index_columns = ['Application ID','Application Name','Attempt','Executor ID','Failed','Finish Time','Getting Result Time','Host','Index','Killed','Launch Time','Locality','Speculative','Job ID', 'Stage ID','Task ID']

wide_tasks = task_all[
    project_columns
].pivot_table(index=index_columns, columns="Metric Name", values="Metric Value").reset_index().rename_axis(None, axis=1)

# wide_tasks.to_sql('wide_tasks', conn, index=False, if_exists='append')

if wide_output_file != output_file:
    safe_write(wide_tasks, 'wide_tasks', wide_conn, index=False)

safe_write(wide_tasks, 'wide_tasks', conn, index=False)

Exporting query plans

In [ ]:
ppn = plan_nodes.toPandas()
pan = accumulable_nodes.toPandas()
sqli = with_appmeta(sql_info_df).toPandas()

sqli.to_sql('sql_info', conn, index=False, if_exists='append')
ppn.to_sql('plans', conn, index=False, if_exists='append')
pan.to_sql('accumulables', conn, index=False, if_exists='append')

if wide_output_file != output_file:
    sqli.to_sql('sql_info', wide_conn, index=False, if_exists='append')
    ppn.to_sql('plans', wide_conn, index=False, if_exists='append')
    pan.to_sql('accumulables', wide_conn, index=False, if_exists='append')
In [ ]:
metric_names = metric_names_for(plan_metrics)
wide_plan_metrics = plan_metrics_full.groupBy(["plan_node", "accumulatorId", "Application ID", "Application Name", "Task ID", "Stage ID", "Job ID"]).pivot("Metric Name", metric_names).agg(F.sum("Metric Value"))
In [ ]:
pmf = plan_metrics_full.toPandas()
wpm = wide_plan_metrics.toPandas()

if wide_output_file != output_file:
    safe_write(pmf, 'plan_metrics', wide_conn, index=False)
    safe_write(wpm, 'wide_plans', wide_conn, index=False)
    
safe_write(pmf, 'plan_metrics', conn, index=False)
safe_write(wpm, 'wide_plans', conn, index=False)
In [ ]:
safe_write(plan_nodes.toPandas(), 'plan_nodes', conn, index=False)
safe_write(accumulable_nodes.toPandas(), 'accumulable_nodes', conn, index=False)
safe_write(metadata_nodes.toPandas(), 'metadata_nodes', conn, index=False)

Miscellaneous metadata

In [ ]:
for table in [job_info, stage_meta, stage_parents, stage_rddmeta, stage_rddparents, app_timeline, sql_timeline, driver_accumulator_updates]:
    raw = table(metrics)
    the_df = with_appmeta(raw).toPandas()
    the_df.to_sql(table.__name__, conn, index=False, if_exists='append')
In [ ]:
if wide_output_file != output_file:
    wide_conn.execute('CREATE INDEX IF NOT EXISTS accumulable_apps on accumulables ([Application ID])')
    wide_conn.execute('CREATE INDEX IF NOT EXISTS plan_apps on plans ([Application ID])')
    wide_conn.execute('CREATE INDEX IF NOT EXISTS plan_metrics_apps on plan_metrics ([Application ID])')
    wide_conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_task on wide_tasks ([Application ID], [Task ID])')
    wide_conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_app on wide_tasks ([Application ID])')
    
conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_task on wide_tasks ([Application ID], [Task ID])')
conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_app on wide_tasks ([Application ID])')
conn.execute('CREATE INDEX IF NOT EXISTS accumulable_apps on accumulables ([Application ID])')
conn.execute('CREATE INDEX IF NOT EXISTS task_metric_apps on task_metrics ([Application ID])')
conn.execute('CREATE INDEX IF NOT EXISTS task_metric_names on task_metrics ([Metric Name])')
conn.execute('CREATE INDEX IF NOT EXISTS task_metric_kinds on task_metrics ([kind])')
conn.execute('CREATE INDEX IF NOT EXISTS task_metric_agg on task_metrics ([Task ID], [Metric Name])')
conn.execute('CREATE INDEX IF NOT EXISTS stage_metric_agg on task_metrics ([Stage ID], [Metric Name])')
In [ ]:
conn.close()
wide_conn.close()
In [ ]:
if not interactive:
    spark.stop()
In [ ]: