Integration of SWAN with Spark clusters

The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters.

This notebook illustrates the use of Spark in SWAN to analyze the monitoring data available on HDFS (analytix) and plots a heatmap of loadAvg across machines in a particular service.

Connect to the cluster (analytix)

To connect to a cluster, click on the star button on the top and follow the instructions

  • The star button only appears if you have selected a SPARK cluster in the configuration
  • The star button is active after the notebook kernel is ready

Import necessary spark and python stuff

In [1]:
from pyspark.sql.functions import from_unixtime, when, col
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
In [2]:
%matplotlib inline
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

select the data


In [3]:
df = spark.read.json("/project/itmon/archive/lemon/hadoop_ng/2018-04/")

check the structure

In [4]:
 |-- _corrupt_record: string (nullable = true)
 |-- aggregated: string (nullable = true)
 |-- body: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- metric_id: string (nullable = true)
 |-- metric_name: string (nullable = true)
 |-- producer: string (nullable = true)
 |-- submitter_environment: string (nullable = true)
 |-- submitter_host: string (nullable = true)
 |-- submitter_hostgroup: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- toplevel_hostgroup: string (nullable = true)
 |-- type: string (nullable = true)
 |-- version: string (nullable = true)

In [5]:
body_schema = StructType([StructField("LoadAvg",DoubleType())])
df_loadAvg = df.where(col("metric_id") == "20002").withColumn('body', from_json('body', body_schema))

Create temporary table view

In [6]:
# body_schema = spark.read.json(df_loadAvg.rdd.map(lambda row: row.body)).schema

Do the heavylifting in spark and collect aggregated view to panda DF

In [7]:
df_loadAvg_pandas = spark.sql("SELECT substring(submitter_host,7,length(submitter_host)) as host, \
                                      avg(body.LoadAvg) as avg, \
                                      hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) as hr \
                               FROM loadAvg \
                               WHERE submitter_hostgroup like 'hadoop_ng/nxcals%' \
                               AND dayofmonth(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 22 \
                               GROUP BY hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')), submitter_host")\

Visualize with seaborn

In [9]:
# heatmap of loadAvg
plt.figure(figsize=(12, 8))
ax = sns.heatmap(df_loadAvg_pandas.pivot(index='host', columns='hr', values='avg'), cmap="Blues")
ax.set_title("Heatmap of loadAvg for NXCals service on 22nd April 2018")
Text(0.5,1,u'Heatmap of loadAvg for NXCals service on 22nd April 2018')