#!/usr/bin/env python # coding: utf-8 # In[1]: # replace this with a Spark history log of your own or parameterize with Papermill! metrics_file = "metrics/application_1601392010735_0030" wide_output_file = "output.db" output_file = "output.db" transform_structure_prefixes = True use_structure_prefixes = False driver_memory = '8g' executor_memory = '8g' master = 'local[*]' debug_me = False interactive = True store_parquet = True # In[2]: import pandas as pd if debug_me: pd.options.display.max_columns = None pd.options.display.max_rows = None # In[3]: import pyspark import pyspark.sql.functions as F from pyspark import SparkConf import json import eventlog spark = pyspark.sql.SparkSession.\ builder.\ master(master).\ config("spark.ui.showConsoleProgress", False).\ config("spark.driver.memory", driver_memory).\ config("spark.executor.memory", executor_memory).\ getOrCreate() sc = spark.sparkContext sc.setLogLevel("OFF") logger = sc._jvm.org.apache.log4j logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR) # In[4]: metrics = spark.read.json(metrics_file) eventlog.init_eventlog(metrics, use_structure_prefixes=use_structure_prefixes, transform_structure_prefixes=transform_structure_prefixes, debug_me=debug_me) # In[5]: from eventlog import * app_id, app_name = metrics.select("App ID", "App Name").dropna().collect()[0] # In[6]: jobs_to_stages = all_stage_meta(metrics).select('Job ID', 'Stage ID').distinct() # In[7]: plan_nodes, accumulable_nodes = plan_dfs(metrics) # In[8]: sql_info_df = with_appmeta(sql_info(metrics)) # In[9]: metric_meta = spark.createDataFrame(data=eventlog.metric_metas) task_metrics = tidy_tasks(metrics) # In[10]: tasks_to_plans = with_appmeta(task_metrics.join(accumulable_nodes, "accumulatorId").join(plan_nodes, "plan_node")) # In[11]: import altair as alt alt.data_transformers.disable_max_rows() # # Shuffle metrics # In[12]: task_byte_metrics = tidy_tasks(metrics).join( metric_meta.withColumnRenamed("MetricName", "Metric Name"), "Metric Name", how="outer" ).where(F.col("unit") == "bytes").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value").toPandas() task_shuffle_metrics = task_byte_metrics[task_byte_metrics['Metric Name'].str.contains('internal.metrics.shuffle')].sort_values('Task ID') shuffle_replacer = lambda match: "Shuffle %s" % match.group('metric') task_shuffle_metrics['Metric Name'] = task_shuffle_metrics['Metric Name'].str.replace(r'internal\.metrics\.shuffle\.(?Pread|write).(?P.*)$', shuffle_replacer) # In[13]: stage_and_task_charts(task_shuffle_metrics, "bytes") # # Executor time metrics # In[14]: task_metrics = tidy_tasks(metrics).join( metric_meta.withColumnRenamed("MetricName", "Metric Name"), "Metric Name", how="outer" ).withColumn("Metric Value", F.col("Metric Value").cast("float")) task_ms_metrics = task_metrics.where(F.col("unit") == "ms").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value") task_ns_metrics = task_metrics.where(F.col("unit") == "ns").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value").withColumn("Metric Value", F.col("Metric Value").cast("float") / 1000000) task_time_metrics = task_ms_metrics.union(task_ns_metrics).toPandas() stage_and_task_charts(task_time_metrics[~task_time_metrics["Metric Name"].str.contains("internal.metrics")]) # In[15]: stage_and_task_charts( task_time_metrics[task_time_metrics["Metric Name"].str.contains("CPU") | task_time_metrics["Metric Name"].str.contains("GPU") | task_time_metrics["Metric Name"].str.contains("JVM GC") ]) # # Plotting wall-clock vs CPU time with layered charts # # This gives us some sense of the relationship between CPU time and system time. # In[16]: cputime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorCpuTime')] runtime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorRunTime')] layered_stage_and_task_charts([runtime, cputime]) # # Memory and spill metrics # In[17]: stage_and_task_charts(task_byte_metrics[task_byte_metrics['Metric Name'].str.contains(' memory') | task_byte_metrics['Metric Name'].str.contains('size') | task_byte_metrics['Metric Name'].str.contains('Bytes Spilled')], "bytes") # In[18]: stage_and_task_charts(task_byte_metrics, "bytes") # # Task metrics and metadata # In[19]: task_metrics_table, task_meta_table = split_metrics(task_metrics) task_all_spark = task_metrics_table.join(task_meta_table, ["Task ID", "Stage ID", "Application ID", "Application Name"]).join(jobs_to_stages, "Stage ID") # # Query plan node metrics # In[20]: plan_metrics = plan_nodes.join(accumulable_nodes, ["plan_node", "Application ID", "Application Name"]).join(task_all_spark, ["accumulatorId", "Application ID", "Application Name"]) plan_metrics_full = plan_metrics_rollup(plan_metrics) # # # Configuration information # In[21]: configs = meltconfig(metrics, ["SparkListenerEnvironmentUpdate","SparkListenerJobStart"]) # # Exporting tabular data # In[22]: import sqlite3 conn = sqlite3.Connection(output_file) wide_conn = sqlite3.Connection(wide_output_file) # In[23]: if wide_output_file != output_file: with_appmeta(configs).toPandas().to_sql("configs", wide_conn, index=False, if_exists='append') with_appmeta(configs).toPandas().to_sql("configs", conn, index=False, if_exists='append') # In[24]: task_metrics_table.join(jobs_to_stages, "Stage ID").toPandas().to_sql('task_metrics', conn, index=False, if_exists='append') task_meta_table.join(jobs_to_stages, "Stage ID").toPandas().to_sql('task_meta', conn, index=False, if_exists='append') # In[25]: task_all = task_all_spark.toPandas() # In[26]: project_columns = ['Application ID', 'Application Name', 'Attempt', 'Executor ID', 'Failed', 'Finish Time', 'Getting Result Time', 'Host', 'Index', 'Killed', 'Launch Time', 'Locality', 'Metric Name', 'Metric Value', 'Speculative', 'Job ID', 'Stage ID', 'Task ID'] index_columns = ['Application ID','Application Name','Attempt','Executor ID','Failed','Finish Time','Getting Result Time','Host','Index','Killed','Launch Time','Locality','Speculative','Job ID', 'Stage ID','Task ID'] wide_tasks = task_all[ project_columns ].pivot_table(index=index_columns, columns="Metric Name", values="Metric Value").reset_index().rename_axis(None, axis=1) # wide_tasks.to_sql('wide_tasks', conn, index=False, if_exists='append') if wide_output_file != output_file: safe_write(wide_tasks, 'wide_tasks', wide_conn, index=False) safe_write(wide_tasks, 'wide_tasks', conn, index=False) # ## Exporting query plans # In[27]: ppn = plan_nodes.toPandas() pan = accumulable_nodes.toPandas() sqli = with_appmeta(sql_info_df).toPandas() sqli.to_sql('sql_info', conn, index=False, if_exists='append') ppn.to_sql('plans', conn, index=False, if_exists='append') pan.to_sql('accumulables', conn, index=False, if_exists='append') if wide_output_file != output_file: sqli.to_sql('sql_info', wide_conn, index=False, if_exists='append') ppn.to_sql('plans', wide_conn, index=False, if_exists='append') pan.to_sql('accumulables', wide_conn, index=False, if_exists='append') # In[28]: metric_names = metric_names_for(plan_metrics) wide_plan_metrics = plan_metrics_full.groupBy(["plan_node", "accumulatorId", "Application ID", "Application Name", "Task ID", "Stage ID", "Job ID"]).pivot("Metric Name", metric_names).agg(F.sum("Metric Value")) # In[29]: pmf = plan_metrics_full.toPandas() wpm = wide_plan_metrics.toPandas() if wide_output_file != output_file: safe_write(pmf, 'plan_metrics', wide_conn, index=False) safe_write(wpm, 'wide_plans', wide_conn, index=False) safe_write(pmf, 'plan_metrics', conn, index=False) safe_write(wpm, 'wide_plans', conn, index=False) # ## Miscellaneous metadata # In[31]: for table in [job_info, stage_meta, stage_parents, stage_rddmeta, stage_rddparents]: raw = table(metrics) the_df = with_appmeta(raw).toPandas() the_df.to_sql(table.__name__, conn, index=False, if_exists='append') # In[32]: if wide_output_file != output_file: wide_conn.execute('CREATE INDEX IF NOT EXISTS accumulable_apps on accumulables ([Application ID])') wide_conn.execute('CREATE INDEX IF NOT EXISTS plan_apps on plans ([Application ID])') wide_conn.execute('CREATE INDEX IF NOT EXISTS plan_metrics_apps on plan_metrics ([Application ID])') wide_conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_task on wide_tasks ([Application ID], [Task ID])') wide_conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_app on wide_tasks ([Application ID])') conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_task on wide_tasks ([Application ID], [Task ID])') conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_app on wide_tasks ([Application ID])') conn.execute('CREATE INDEX IF NOT EXISTS accumulable_apps on accumulables ([Application ID])') conn.execute('CREATE INDEX IF NOT EXISTS task_metric_apps on task_metrics ([Application ID])') conn.execute('CREATE INDEX IF NOT EXISTS task_metric_names on task_metrics ([Metric Name])') conn.execute('CREATE INDEX IF NOT EXISTS task_metric_kinds on task_metrics ([kind])') conn.execute('CREATE INDEX IF NOT EXISTS task_metric_agg on task_metrics ([Task ID], [Metric Name])') conn.execute('CREATE INDEX IF NOT EXISTS stage_metric_agg on task_metrics ([Stage ID], [Metric Name])') # In[33]: conn.close() wide_conn.close() # In[34]: if not interactive: spark.stop() # In[ ]: