The current setup allows to execute PySpark operations on CERN Hadoop and Spark clusters.
This notebook illustrates the use of Spark in SWAN to access CERN Accelerator logging service data.
Env - bleeding egde python3 & nxcals
To connect to a cluster, click on the star button on the top and follow the instructions
NXCals API - http://nxcals-docs.web.cern.ch/current/
# reference to NXCALS API - http://nxcals-docs.web.cern.ch/current/
# source the nxcals python libs
from cern.nxcals.api.extraction.data.builders import *
from cern.nxcals.pyquery.builders import *
# build the query and load data into spark dataframe
df1 = DevicePropertyQuery \
.builder(spark) \
.system("CMW") \
.startTime("2021-01-10 00:00:00.000") \
.endTime("2021-01-11 00:00:00.000") \
.entity() \
.parameter("RADMON.PS-10/ExpertMonitoringAcquisition") \
.buildDataset()
df1.count()
86398
df1.select('acqStamp','voltage_18V','current_18V','device','pt100Value').show()
+-------------------+------------------+------------------+------------+------------------+ | acqStamp| voltage_18V| current_18V| device| pt100Value| +-------------------+------------------+------------------+------------+------------------+ |1610236888815202000| null| 45.91139405965806|RADMON.PS-10| 108.86628699| |1610237136819218000| null| null|RADMON.PS-10|109.53605934000001| |1610237338822497000| null| 45.77429965138436|RADMON.PS-10|108.89605465000001| |1610237350822686000| 19.9731320142746| 45.98755761981011|RADMON.PS-10|109.17884742000001| |1610237381823188000|19.979225099086765|45.850463211536415|RADMON.PS-10| 109.68489764| |1610237980832878000| null|45.850463211536415|RADMON.PS-10|109.71466530000001| |1610238283837795000|19.982271641492847| 45.98755761981011|RADMON.PS-10|109.46164019000001| |1610238495841218000|19.974655285477642| 45.75906693935395|RADMON.PS-10|108.88117082000001| |1610238547842045000| null| 45.74383422732354|RADMON.PS-10|109.74443296000001| |1610238758845482000| null| 46.15511745214463|RADMON.PS-10|109.56582700000001| |1610238964848821000| 19.9731320142746| 45.78953236341477|RADMON.PS-10|109.71466530000001| |1610239208852774000|19.979225099086765| 46.18558287620545|RADMON.PS-10|109.19373125000001| |1610239232853163000| null| 45.71336880326272|RADMON.PS-10| 109.43187253| |1610239359855206000| null| null|RADMON.PS-10|108.92582231000002| |1610239414856100000| null| 46.09418660402299|RADMON.PS-10| 109.04489295| |1610239781862006000| null| 46.09418660402299|RADMON.PS-10| 109.05977678| |1610239810862505000| null| null|RADMON.PS-10|109.35745338000001| |1610239857863294000| null| 46.15511745214463|RADMON.PS-10|108.89605465000001| |1610240110867365000| null| null|RADMON.PS-10|109.16396359000001| |1610240466873131000| null| null|RADMON.PS-10| 109.4169887| +-------------------+------------------+------------------+------------+------------------+ only showing top 20 rows
import matplotlib
import pandas as pd
%matplotlib inline
p_df = df1.select('acqStamp','current_18V').toPandas()
p_df.plot('acqStamp','current_18V',figsize=(15,5))
#p_df.sort_values(by='acqStamp').plot(pd.to_datetime(p_df['acqStamp'],unit='ns'),'current_18V',figsize=(15,5))
<matplotlib.axes._subplots.AxesSubplot at 0x7fac9aa7b860>
df2 = DataQuery.builder(spark).byVariables() \
.system('CMW') \
.startTime('2018-04-29 00:00:00.000').endTime('2018-04-30 00:00:00.000') \
.variable('LTB.BCT60:INTENSITY') \
.build()
df2.toPandas()[:10]
nxcals_value | nxcals_entity_id | nxcals_timestamp | nxcals_variable_name | |
---|---|---|---|---|
0 | 1846.24 | 52034 | 1524960170465000000 | LTB.BCT60:INTENSITY |
1 | 1579.96 | 52034 | 1524960421265000000 | LTB.BCT60:INTENSITY |
2 | 1813.40 | 52034 | 1524960602465000000 | LTB.BCT60:INTENSITY |
3 | 2382.25 | 52034 | 1524960924065000000 | LTB.BCT60:INTENSITY |
4 | 7478.39 | 52034 | 1524960931265000000 | LTB.BCT60:INTENSITY |
5 | 2388.88 | 52034 | 1524961032065000000 | LTB.BCT60:INTENSITY |
6 | 2383.04 | 52034 | 1524961047665000000 | LTB.BCT60:INTENSITY |
7 | 2404.83 | 52034 | 1524961060865000000 | LTB.BCT60:INTENSITY |
8 | 2400.61 | 52034 | 1524961938065000000 | LTB.BCT60:INTENSITY |
9 | 2390.86 | 52034 | 1524962168465000000 | LTB.BCT60:INTENSITY |
df3 = DataQuery.builder(spark).byEntities().system('WINCCOA') \
.startTime('2018-06-15 00:00:00.000').endTime('2018-06-17 00:00:00.000') \
.entity().keyValue('variable_name', 'MB.C16L2:U_HDS_3') \
.build()
df3.toPandas().to_csv('/eos/user/p/pkothuri/winccoa_hds.csv', index=False)