#!/usr/bin/env python # coding: utf-8 # SWAN # EP-SFT #

#

Integration of SWAN with Spark clusters

#
# The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters. # # This notebook illustrates the use of __Spark in SWAN to access CERN Accelerator logging service data__. # ### Connect to the cluster (NXCals) # *Env - bleeding egde python3 & nxcals* # To connect to a cluster, click on the star button on the top and follow the instructions # * The star button only appears if you have selected a SPARK cluster in the configuration # * The star button is active after the notebook kernel is ready # * SELECT NXCALS configuration bundle # * Access to the cluster and NXCALS data is controlled by acc-logging-team, please write to acc-logging-team@cern.ch # # NXCals API - http://nxcals-docs.web.cern.ch/current/ # ### NXCals DataExtraction API # ### 1) Extract data using device/property pairs # In[1]: # reference to NXCALS API - http://nxcals-docs.web.cern.ch/current/ # source the nxcals python libs from cern.nxcals.api.extraction.data.builders import * from cern.nxcals.pyquery.builders import * # build the query and load data into spark dataframe df1 = DevicePropertyQuery \ .builder(spark) \ .system("CMW") \ .startTime("2021-01-10 00:00:00.000") \ .endTime("2021-01-11 00:00:00.000") \ .entity() \ .parameter("RADMON.PS-10/ExpertMonitoringAcquisition") \ .buildDataset() # In[2]: df1.count() # ### Inspect data # In[3]: df1.select('acqStamp','voltage_18V','current_18V','device','pt100Value').show() # ### Draw a plot with matplotlib # In[4]: import matplotlib import pandas as pd get_ipython().run_line_magic('matplotlib', 'inline') # In[5]: p_df = df1.select('acqStamp','current_18V').toPandas() p_df.plot('acqStamp','current_18V',figsize=(15,5)) #p_df.sort_values(by='acqStamp').plot(pd.to_datetime(p_df['acqStamp'],unit='ns'),'current_18V',figsize=(15,5)) # ### 2) Extract data using variable names # In[6]: df2 = DataQuery.builder(spark).byVariables() \ .system('CMW') \ .startTime('2018-04-29 00:00:00.000').endTime('2018-04-30 00:00:00.000') \ .variable('LTB.BCT60:INTENSITY') \ .build() # In[7]: df2.toPandas()[:10] # ### 3) Extract data using key/value pairs. # In[8]: df3 = DataQuery.builder(spark).byEntities().system('WINCCOA') \ .startTime('2018-06-15 00:00:00.000').endTime('2018-06-17 00:00:00.000') \ .entity().keyValue('variable_name', 'MB.C16L2:U_HDS_3') \ .build() # In[10]: df3.toPandas().to_csv('/eos/user/p/pkothuri/winccoa_hds.csv', index=False)