# replace this with a Spark history log of your own or parameterize with Papermill!
metrics_file = "metrics/application_1601392010735_0030"
wide_output_file = "output.db"
output_file = "output.db"
transform_structure_prefixes = True
use_structure_prefixes = False
driver_memory = '8g'
executor_memory = '8g'
master = 'local[*]'
debug_me = False
interactive = True
store_parquet = True
import pandas as pd
if debug_me:
pd.options.display.max_columns = None
pd.options.display.max_rows = None
import pyspark
import pyspark.sql.functions as F
from pyspark import SparkConf
import json
import eventlog
spark = pyspark.sql.SparkSession.\
builder.\
master(master).\
config("spark.ui.showConsoleProgress", False).\
config("spark.driver.memory", driver_memory).\
config("spark.executor.memory", executor_memory).\
getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF")
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
metrics = spark.read.json(metrics_file)
eventlog.init_eventlog(metrics, use_structure_prefixes=use_structure_prefixes, transform_structure_prefixes=transform_structure_prefixes, debug_me=debug_me)
from eventlog import *
app_id, app_name = metrics.select("App ID", "App Name").dropna().collect()[0]
jobs_to_stages = all_stage_meta(metrics).select('Job ID', 'Stage ID').distinct()
plan_nodes, accumulable_nodes = plan_dfs(metrics)
metric_meta = spark.createDataFrame(data=eventlog.metric_metas)
task_metrics = tidy_tasks(metrics)
tasks_to_plans = with_appmeta(task_metrics.join(accumulable_nodes, "accumulatorId").join(plan_nodes, "plan_node"))
import altair as alt
alt.data_transformers.disable_max_rows()
task_byte_metrics = tidy_tasks(metrics).join(
metric_meta.withColumnRenamed("MetricName", "Metric Name"),
"Metric Name",
how="outer"
).where(F.col("unit") == "bytes").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value").toPandas()
task_shuffle_metrics = task_byte_metrics[task_byte_metrics['Metric Name'].str.contains('internal.metrics.shuffle')].sort_values('Task ID')
shuffle_replacer = lambda match: "Shuffle %s" % match.group('metric')
task_shuffle_metrics['Metric Name'] = task_shuffle_metrics['Metric Name'].str.replace(r'internal\.metrics\.shuffle\.(?P<kind>read|write).(?P<metric>.*)$', shuffle_replacer)
stage_and_task_charts(task_shuffle_metrics, "bytes")
task_metrics = tidy_tasks(metrics).join(
metric_meta.withColumnRenamed("MetricName", "Metric Name"),
"Metric Name",
how="outer"
).withColumn("Metric Value", F.col("Metric Value").cast("float"))
task_ms_metrics = task_metrics.where(F.col("unit") == "ms").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value")
task_ns_metrics = task_metrics.where(F.col("unit") == "ns").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value").withColumn("Metric Value", F.col("Metric Value").cast("float") / 1000000)
task_time_metrics = task_ms_metrics.union(task_ns_metrics).toPandas()
stage_and_task_charts(task_time_metrics[~task_time_metrics["Metric Name"].str.contains("internal.metrics")])
stage_and_task_charts(
task_time_metrics[task_time_metrics["Metric Name"].str.contains("CPU") |
task_time_metrics["Metric Name"].str.contains("GPU") |
task_time_metrics["Metric Name"].str.contains("JVM GC")
])
This gives us some sense of the relationship between CPU time and system time.
cputime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorCpuTime')]
runtime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorRunTime')]
layered_stage_and_task_charts([runtime, cputime])
stage_and_task_charts(task_byte_metrics[task_byte_metrics['Metric Name'].str.contains(' memory') | task_byte_metrics['Metric Name'].str.contains('size') | task_byte_metrics['Metric Name'].str.contains('Bytes Spilled')], "bytes")
stage_and_task_charts(task_byte_metrics, "bytes")
task_metrics_table, task_meta_table = split_metrics(task_metrics)
task_all_spark = task_metrics_table.join(task_meta_table, ["Task ID", "Stage ID", "Application ID", "Application Name"]).join(jobs_to_stages, "Stage ID")
plan_metrics = plan_nodes.join(accumulable_nodes, ["plan_node", "Application ID", "Application Name"]).join(task_all_spark, ["accumulatorId", "Application ID", "Application Name"])
plan_metrics_full = plan_metrics_rollup(plan_metrics)
configs = meltconfig(metrics, ["SparkListenerEnvironmentUpdate","SparkListenerJobStart"])
import sqlite3
conn = sqlite3.Connection(output_file)
wide_conn = sqlite3.Connection(wide_output_file)
if wide_output_file != output_file:
with_appmeta(configs).toPandas().to_sql("configs", wide_conn, index=False, if_exists='append')
with_appmeta(configs).toPandas().to_sql("configs", conn, index=False, if_exists='append')
task_metrics_table.join(jobs_to_stages, "Stage ID").toPandas().to_sql('task_metrics', conn, index=False, if_exists='append')
task_meta_table.join(jobs_to_stages, "Stage ID").toPandas().to_sql('task_meta', conn, index=False, if_exists='append')
task_all = task_all_spark.toPandas()
project_columns = ['Application ID', 'Application Name', 'Attempt', 'Executor ID', 'Failed', 'Finish Time', 'Getting Result Time', 'Host', 'Index', 'Killed', 'Launch Time', 'Locality', 'Metric Name', 'Metric Value', 'Speculative', 'Job ID', 'Stage ID', 'Task ID']
index_columns = ['Application ID','Application Name','Attempt','Executor ID','Failed','Finish Time','Getting Result Time','Host','Index','Killed','Launch Time','Locality','Speculative','Job ID', 'Stage ID','Task ID']
wide_tasks = task_all[
project_columns
].pivot_table(index=index_columns, columns="Metric Name", values="Metric Value").reset_index().rename_axis(None, axis=1)
# wide_tasks.to_sql('wide_tasks', conn, index=False, if_exists='append')
if wide_output_file != output_file:
safe_write(wide_tasks, 'wide_tasks', wide_conn, index=False)
safe_write(wide_tasks, 'wide_tasks', conn, index=False)
ppn = plan_nodes.toPandas()
pan = accumulable_nodes.toPandas()
ppn.to_sql('plans', conn, index=False, if_exists='append')
pan.to_sql('accumulables', conn, index=False, if_exists='append')
if wide_output_file != output_file:
ppn.to_sql('plans', wide_conn, index=False, if_exists='append')
pan.to_sql('accumulables', wide_conn, index=False, if_exists='append')
metric_names = metric_names_for(plan_metrics)
wide_plan_metrics = plan_metrics_full.groupBy(["plan_node", "accumulatorId", "Application ID", "Application Name", "Task ID", "Stage ID", "Job ID"]).pivot("Metric Name", metric_names).agg(F.sum("Metric Value"))
pmf = plan_metrics_full.toPandas()
wpm = wide_plan_metrics.toPandas()
if wide_output_file != output_file:
safe_write(pmf, 'plan_metrics', wide_conn, index=False)
safe_write(wpm, 'wide_plans', wide_conn, index=False)
safe_write(pmf, 'plan_metrics', conn, index=False)
safe_write(wpm, 'wide_plans', conn, index=False)
for table in [job_info, stage_meta, stage_parents, stage_rddmeta, stage_rddparents]:
raw = table(metrics)
the_df = with_appmeta(raw).toPandas()
the_df.to_sql(table.__name__, conn, index=False, if_exists='append')
if wide_output_file != output_file:
wide_conn.execute('CREATE INDEX IF NOT EXISTS accumulable_apps on accumulables ([Application ID])')
wide_conn.execute('CREATE INDEX IF NOT EXISTS plan_apps on plans ([Application ID])')
wide_conn.execute('CREATE INDEX IF NOT EXISTS plan_metrics_apps on plan_metrics ([Application ID])')
wide_conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_task on wide_tasks ([Application ID], [Task ID])')
wide_conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_app on wide_tasks ([Application ID])')
conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_task on wide_tasks ([Application ID], [Task ID])')
conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_app on wide_tasks ([Application ID])')
conn.execute('CREATE INDEX IF NOT EXISTS accumulable_apps on accumulables ([Application ID])')
conn.execute('CREATE INDEX IF NOT EXISTS task_metric_apps on task_metrics ([Application ID])')
conn.execute('CREATE INDEX IF NOT EXISTS task_metric_names on task_metrics ([Metric Name])')
conn.execute('CREATE INDEX IF NOT EXISTS task_metric_kinds on task_metrics ([kind])')
conn.execute('CREATE INDEX IF NOT EXISTS task_metric_agg on task_metrics ([Task ID], [Metric Name])')
conn.execute('CREATE INDEX IF NOT EXISTS stage_metric_agg on task_metrics ([Stage ID], [Metric Name])')
conn.close()
wide_conn.close()
if not interactive:
spark.stop()