#!/usr/bin/env python # coding: utf-8 # SWAN # EP-SFT #

#

Integration of SWAN with Spark clusters

#
# The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters. # # This notebook illustrates the use of __Spark in SWAN to analyze the monitoring data available on HDFS (analytix)__ and plots a heatmap of loadAvg across machines in a particular service. # ### Connect to the cluster (analytix) # *Env - LCG_97a & analytix* # To connect to a cluster, click on the star button on the top and follow the instructions # * The star button only appears if you have selected a SPARK cluster in the configuration # * The star button is active after the notebook kernel is ready # ### Import necessary spark and python stuff # In[1]: from pyspark.sql.functions import from_unixtime, when, col from pyspark.sql.types import * from pyspark.sql.functions import from_json # In[2]: get_ipython().run_line_magic('matplotlib', 'inline') import pandas as pd import matplotlib.pyplot as plt import numpy as np import seaborn as sns # ### select the data # *path_on_hdfs_to_your_data* # In[3]: df = spark.read.parquet("/project/monitoring/collectd/load/2020/10/14/") # ### check the structure # In[4]: df.printSchema() # ### Create temporary table view # In[5]: df.createOrReplaceTempView("loadAvg") # ### Do the heavylifting in spark and collect aggregated view to panda DF # In[6]: df_loadAvg_pandas = spark.sql("SELECT host, \ avg(value) as avg, \ hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) as hr \ FROM loadAvg \ WHERE submitter_hostgroup like 'hadoop_ng/nxcals_prod%' \ AND dayofmonth(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 14 \ GROUP BY hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')), host")\ .toPandas() # ### Visualize with seaborn # In[12]: # heatmap of loadAvg plt.figure(figsize=(12, 8)) ax = sns.heatmap(df_loadAvg_pandas.pivot(index='host', columns='hr', values='avg'), cmap="Blues") ax.set_title("Heatmap of loadAvg for NXCals cluster on 14th October 2020", fontsize=20) # ### Create histogram of uptime for the servers in datacenter # In[13]: # create the dataframe df = spark.read.parquet("/project/monitoring/collectd/uptime/2020/10/14/") # In[14]: # create temporary view df.createOrReplaceTempView("uptime") # In[15]: # processing in spark df_uptime_pandas = spark.sql("SELECT host, round(max(value)/60/60/24) as days \ FROM uptime \ WHERE dayofmonth(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 14 \ AND hour(from_unixtime(timestamp / 1000, 'yyyy-MM-dd HH:mm:ss')) = 12 \ GROUP BY host")\ .toPandas() # In[16]: # visualize with seaborn # histogram of uptime (last reboot) plt.figure(figsize=(12, 8)) ax = sns.distplot(df_uptime_pandas['days'], kde=False, color='red', bins=range(0, 1800, 20)) ax.set_title("Histogram of uptime (last reboot) for servers in datacenter", fontsize=20) ax.set_yscale('log')