# "We'll Cross the Streams": Combining Asynchronous Data Streams¶

In this notebook you will:

• Acquires multiple asynchronous "streams" of data.
• Align them based on time and perform basic computations, like normalization.

Recommended Prerequisites:

## Configuration¶

Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs should already be running in the background. Run this command to verify that they are running: it should produce output with RUNNING on each line. In the event of a problem, edit this command to replace status with restart all and run again.

In [ ]:
!supervisorctl -c supervisor/supervisord.conf status

In [ ]:
%run scripts/beamline_configuration.py


## Monitor current asynchronously while counting the detector.¶

"Monitoring" is requesting updates from the device whenever something changes beyond some tolerance ("deadband"). This is different than the plans we have encountered so far because bluesky leaves the update rate up to the device, so the number of readings is not fixed or necessarily repeatable from one run to the next.

Monitoring can be done on a one-off basis, but it's typically set up in a semi-permanent way: you want to "set it and forget it." The sd object keeps track of things to monitor concurrently with other measurements. We'll add the beam current signal I to that list.

In [ ]:
sd

In [ ]:
sd.monitors.append(I)


Now while we scan a motor and read a detector, I will be monitored in the background.

In [ ]:
RE(scan([slit], motor_slit, -15, 15, 150))


Get the data as before. The dataset is large, so we'll use the .head() method to show just the first several rows.

In [ ]:
header = db[-1]

In [ ]:
header.table(stream_name='primary').head()  # equivalent to the above


What other streams are there?

In [ ]:
header.stream_names


See the figure on this documentation page to visualize "streams."

In [ ]:
header.table('I_monitor').head()


### Plot data streams together¶

We can plot them each against time.

In [ ]:
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
plt.figure()
plt.legend()


We cannot plot slit_det vs I or normalize slit_det by I because the two were never measured at exactly the same time. We'll have to interpolate, downsample, or some combination to get the two measurements into one unified time series.

## "Muxing" (combining into one time series) the streams¶

To start, we'll use a pandas function for concatenating the tables side by side. The result is a sort of "block matrix" of missing data (NaN).

In [ ]:
import pandas as pd
data

In [ ]:
# Make 'time' the index and sort on it.
sorted_data = data.set_index('time').sort_index()


### Crude but conceptually simple: interpolate everything with "forward-fill"¶

In [ ]:
ffilled_data = sorted_data.ffill()

In [ ]:
ffilled_data['normalized'] = ffilled_data['slit_det'] / ffilled_data['I'] * ffilled_data['I'].mean()

In [ ]:
plt.figure()
plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')
plt.plot('motor_slit', 'normalized', data=ffilled_data, label='interpolated and normalized')
plt.legend()


### More accurate: Use linear interpolation instead of "forward-fill"¶

In [ ]:
interp_data = sorted_data.interpolate('linear')
interp_data['normalized'] = interp_data['slit_det'] / interp_data['I'] * interp_data['I'].mean()

In [ ]:
plt.figure()
plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')
plt.plot('motor_slit', 'normalized', data=ffilled_data, label='ffill')
plt.plot('motor_slit', 'normalized', data=interp_data, label='linear')
plt.legend()


### More accurate: First down-sample current with a "rolling" (windowed) mean¶

In [ ]:
sm_I = header.table('I_monitor').set_index('time')['I'].rolling(window=3).mean()
sm_data = pd.concat([header.table('primary').set_index('time'), pd.DataFrame({'I': sm_I})], axis=0, sort=False)
sorted_sm_data = sm_data.sort_index()
interp_sm_data = sorted_sm_data.interpolate('linear')
interp_sm_data['normalized'] = interp_sm_data['slit_det'] / interp_sm_data['I'] * interp_sm_data['I'].mean()

plt.figure()
plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')
plt.plot('motor_slit', 'normalized', data=ffilled_data, label='ffill')
plt.plot('motor_slit', 'normalized', data=interp_data, label='linear')
plt.plot('motor_slit', 'normalized', data=interp_sm_data, label='downsampled + linear')
plt.legend()


## Histogram of time-spacing between current readings.¶

In [ ]:
header.table('I_monitor')['time']

In [ ]:
delta.head()

In [ ]:
delta.dropna().head()


Matplotlib can't plot times. We must convert to integers (units of nanoseconds).

In [ ]:
plt.figure()
plt.hist(delta.dropna().astype(int))
plt.ylabel('counts')


## Exercises¶

Q1. Pandas Dataframes have a .diff() method, which computes the pairwise difference of rows. Example:

In [ ]:
df = pd.DataFrame({'a': [1, 2, 4, 8]})
df

In [ ]:
df.diff()


Use this (and something else you have already seen...) to compute the average time-spacing of current readings.

In [ ]:
# Type you answer here. Fill in the blanks.

%load solutions/compute_mean_period.py

Q2. Above we used the interpolate method. It has several optional parameters. Use sorted_data.interpolate? to pull up the documentation and explore the options.
# Type it here.