#!/usr/bin/env python
# coding: utf-8
#
#
#
#
Integration of SWAN with Spark clusters
#
# The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters.
#
# This notebook illustrates the use of __Spark in SWAN to access CERN Accelerator logging service data__.
# ### Connect to the cluster (NXCals)
# *Env - bleeding egde python3 & nxcals*
# To connect to a cluster, click on the star button on the top and follow the instructions
# * The star button only appears if you have selected a SPARK cluster in the configuration
# * The star button is active after the notebook kernel is ready
# * SELECT NXCALS configuration bundle
# * Access to the cluster and NXCALS data is controlled by acc-logging-team, please write to acc-logging-team@cern.ch
#
# NXCals API - http://nxcals-docs.web.cern.ch/current/
# ### NXCals DataExtraction API
# ### 1) Extract data using device/property pairs
# In[1]:
# reference to NXCALS API - http://nxcals-docs.web.cern.ch/current/
# source the nxcals python libs
from cern.nxcals.api.extraction.data.builders import *
from cern.nxcals.pyquery.builders import *
# build the query and load data into spark dataframe
df1 = DevicePropertyQuery \
.builder(spark) \
.system("CMW") \
.startTime("2021-01-10 00:00:00.000") \
.endTime("2021-01-11 00:00:00.000") \
.entity() \
.parameter("RADMON.PS-10/ExpertMonitoringAcquisition") \
.buildDataset()
# In[2]:
df1.count()
# ### Inspect data
# In[3]:
df1.select('acqStamp','voltage_18V','current_18V','device','pt100Value').show()
# ### Draw a plot with matplotlib
# In[4]:
import matplotlib
import pandas as pd
get_ipython().run_line_magic('matplotlib', 'inline')
# In[5]:
p_df = df1.select('acqStamp','current_18V').toPandas()
p_df.plot('acqStamp','current_18V',figsize=(15,5))
#p_df.sort_values(by='acqStamp').plot(pd.to_datetime(p_df['acqStamp'],unit='ns'),'current_18V',figsize=(15,5))
# ### 2) Extract data using variable names
# In[6]:
df2 = DataQuery.builder(spark).byVariables() \
.system('CMW') \
.startTime('2018-04-29 00:00:00.000').endTime('2018-04-30 00:00:00.000') \
.variable('LTB.BCT60:INTENSITY') \
.build()
# In[7]:
df2.toPandas()[:10]
# ### 3) Extract data using key/value pairs.
# In[8]:
df3 = DataQuery.builder(spark).byEntities().system('WINCCOA') \
.startTime('2018-06-15 00:00:00.000').endTime('2018-06-17 00:00:00.000') \
.entity().keyValue('variable_name', 'MB.C16L2:U_HDS_3') \
.build()
# In[10]:
df3.toPandas().to_csv('/eos/user/p/pkothuri/winccoa_hds.csv', index=False)