"We'll Cross the Streams": Combining Asynchronous Data Streams

Ghostbusters reference

Ghostbusters reference

In this notebook you will:

  • Acquires multiple asynchronous "streams" of data.
  • Align them based on time and perform basic computations, like normalization.

Recommended Prerequisites:

Configuration

Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs should already be running in the background. Run this command to verify that they are running: it should produce output with RUNNING on each line. In the event of a problem, edit this command to replace status with restart all and run again.

In [ ]:
!supervisorctl -c supervisor/supervisord.conf status
In [ ]:
%run scripts/beamline_configuration.py

Monitor current asynchronously while counting the detector.

"Monitoring" is requesting updates from the device whenever something changes beyond some tolerance ("deadband"). This is different than the plans we have encountered so far because bluesky leaves the update rate up to the device, so the number of readings is not fixed or necessarily repeatable from one run to the next.

Monitoring can be done on a one-off basis, but it's typically set up in a semi-permanent way: you want to "set it and forget it." The sd object keeps track of things to monitor concurrently with other measurements. We'll add the beam current signal I to that list.

In [ ]:
sd
In [ ]:
sd.monitors.append(I)

Now while we scan a motor and read a detector, I will be monitored in the background.

In [ ]:
RE(scan([slit], motor_slit, -15, 15, 150))

Get the data as before. The dataset is large, so we'll use the .head() method to show just the first several rows.

In [ ]:
header = db[-1]
header.table().head()  # shows the 'primary' stream by default
In [ ]:
header.table(stream_name='primary').head()  # equivalent to the above

What other streams are there?

In [ ]:
header.stream_names

See the figure on this documentation page to visualize "streams."

In [ ]:
header.table('I_monitor').head()

Plot data streams together

We can plot them each against time.

In [ ]:
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
plt.figure()
plt.plot('time', 'slit_det', data=header.table(), marker='o', label='slit_det')
plt.plot('time', 'I', data=header.table(stream_name='I_monitor'), marker='x', label='I')
plt.legend()

We cannot plot slit_det vs I or normalize slit_det by I because the two were never measured at exactly the same time. We'll have to interpolate, downsample, or some combination to get the two measurements into one unified time series.

"Muxing" (combining into one time series) the streams

To start, we'll use a pandas function for concatenating the tables side by side. The result is a sort of "block matrix" of missing data (NaN).

In [ ]:
import pandas as pd
data = pd.concat([header.table('primary'), header.table('I_monitor')], axis=0, sort=False)
data
In [ ]:
# Make 'time' the index and sort on it.
sorted_data = data.set_index('time').sort_index()
sorted_data.head(20)

Crude but conceptually simple: interpolate everything with "forward-fill"

In [ ]:
ffilled_data = sorted_data.ffill()
ffilled_data.head(20)
In [ ]:
ffilled_data['normalized'] = ffilled_data['slit_det'] / ffilled_data['I'] * ffilled_data['I'].mean()
In [ ]:
plt.figure()
plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')
plt.plot('motor_slit', 'normalized', data=ffilled_data, label='interpolated and normalized')
plt.legend()

More accurate: Use linear interpolation instead of "forward-fill"

In [ ]:
interp_data = sorted_data.interpolate('linear')
interp_data['normalized'] = interp_data['slit_det'] / interp_data['I'] * interp_data['I'].mean()
In [ ]:
plt.figure()
plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')
plt.plot('motor_slit', 'normalized', data=ffilled_data, label='ffill')
plt.plot('motor_slit', 'normalized', data=interp_data, label='linear')
plt.legend()

More accurate: First down-sample current with a "rolling" (windowed) mean

In [ ]:
sm_I = header.table('I_monitor').set_index('time')['I'].rolling(window=3).mean()
sm_data = pd.concat([header.table('primary').set_index('time'), pd.DataFrame({'I': sm_I})], axis=0, sort=False)
sorted_sm_data = sm_data.sort_index()
interp_sm_data = sorted_sm_data.interpolate('linear')
interp_sm_data['normalized'] = interp_sm_data['slit_det'] / interp_sm_data['I'] * interp_sm_data['I'].mean()

plt.figure()
plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')
plt.plot('motor_slit', 'normalized', data=ffilled_data, label='ffill')
plt.plot('motor_slit', 'normalized', data=interp_data, label='linear')
plt.plot('motor_slit', 'normalized', data=interp_sm_data, label='downsampled + linear')
plt.legend()

Histogram of time-spacing between current readings.

In [ ]:
header.table('I_monitor')['time']
current_reading_times = header.table('I_monitor')['time']
delta = current_reading_times.diff()
In [ ]:
delta.head()
In [ ]:
delta.dropna().head()

Matplotlib can't plot times. We must convert to integers (units of nanoseconds).

In [ ]:
plt.figure()
plt.hist(delta.dropna().astype(int))
plt.xlabel('nanoseconds between readings')
plt.ylabel('counts')

Exercises

Q1. Pandas Dataframes have a .diff() method, which computes the pairwise difference of rows. Example:

In [ ]:
df = pd.DataFrame({'a': [1, 2, 4, 8]})
df
In [ ]:
df.diff()

Use this (and something else you have already seen...) to compute the average time-spacing of current readings.

In [ ]:
# Type you answer here. Fill in the blanks.
# current_reading_times = header.table('I_monitor')['time']
# current_reading_times._____._____
In [ ]:
%load solutions/compute_mean_period.py

Q2. Above we used the interpolate method. It has several optional parameters. Use sorted_data.interpolate? to pull up the documentation and explore the options.

In [ ]:
# Type it here.