Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs may already be running in the background. Run this command to ensure that they are running. In the event of a problem, edit this command to replace status
with restart all
and run again.
!./supervisor/start_supervisor.sh status
A Device
is a hierarchy composed of Signals and other Devices. The components of a Device can be introspected by layers above ophyd and may be decomposed to, ultimately, the underlying Signals.
from ophyd import EpicsSignal, EpicsSignalRO
x = EpicsSignal('random_walk:x')
dt = EpicsSignal('random_walk:dt')
It would be convenient if we could read these as a unit, instead of x.read(); dt.read()
.
from ophyd import Device, Component
class RandomWalk(Device):
x = Component(EpicsSignalRO, 'x')
dt = Component(EpicsSignal, 'dt')
random_walk = RandomWalk('random_walk:', name='random_walk')
random_walk.wait_for_connection()
random_walk
The read()
and describe()
methods walk the hierarchy.
random_walk.read()
random_walk.x.read()
random_walk.dt.read()
random_walk.describe()
A Device embodies a certain "layout" of components. We can have multiple Devices with different PV prefixes but the same layout.
another_random_walk = RandomWalk('random_walk:vert-', name='another_random_walk')
another_random_walk.wait_for_connection()
another_random_walk.read()
A Device can be made of subdevices.
class RandomWalks(Device):
vert = Component(RandomWalk, 'vert-')
horiz = Component(RandomWalk, 'horiz-')
random_walks = RandomWalks('random_walk:', name='random_walks')
random_walks.wait_for_connection()
random_walks
random_walks.vert.x.pvname
random_walks.read()
random_walks.vert.read()
random_walks.vert.x.read()
Using the random_walks
Device, set the PV random_walk:horiz-dt
to 3
.
%load solutions/set_subcomponent_signal.py
Device
¶Sometimes, setting a value to a Signal and knowing when it is "done" involves just one PV:
status = random_walks.vert.dt.set(2)
In other cases it involves coordination across multiple PVs, such as a setpoint PV nd a readback PV, or a setpoint PV and a "done" PV. For those cases, we define a set
method on the Device to manage the coordination across multiple Signals.
from ophyd import DeviceStatus
class Decay(Device):
"""
A device with a setpoint and readback that decays exponentially toward the setpoint.
"""
readback = Component(EpicsSignalRO, ':I')
setpoint = Component(EpicsSignal, ':SP')
def set(self, setpoint):
"""
Set the setpoint and return a Status object that monitors the readback.
"""
status = DeviceStatus(self)
# Wire up a callback that will mark the status object as finished
# when the readback approaches within some tolerance of the setpoint.
def callback(old_value, value, **kwargs):
TOLERANCE = 1 # hard-coded; we'll make this configurable later on...
if abs(value - setpoint) < TOLERANCE:
status._finished()
self.readback.clear_sub(callback)
self.readback.subscribe(callback)
# Now 'put' the value.
self.setpoint.put(setpoint)
# And return the Status object, which the caller can use to
# tell when the action is complete.
return status
decay = Decay('decay', name='decay')
decay.wait_for_connection()
decay
decay.read()
status = decay.set(115)
We can watch for completion either by registering a callback:
def callback():
print("DONE!")
status.add_callback(callback)
or by polling:
status = decay.set(120)
import time
while not status.done:
time.sleep(0.01) # Make sure to sleep to avoid pinning CPU.
print("DONE!")
from ophyd import Signal
class Decay(Device):
"""
A device with a setpoint and readback that decays exponentially toward the setpoint.
"""
readback = Component(EpicsSignalRO, ':I')
setpoint = Component(EpicsSignal, ':SP')
tolerance = Component(Signal, value=1) # not associated with anything in EPICS---a pure ophyd construct
def set(self, setpoint):
"""
Set the setpoint and return a Status object that monitors the readback.
"""
status = DeviceStatus(self)
# Wire up a callback that will mark the status object as finished
# when the readback approaches within some tolerance of the setpoint.
def callback(old_value, value, **kwargs):
if abs(value - setpoint) < self.tolerance.get():
status._finished()
self.readback.clear_sub(callback)
self.readback.subscribe(callback)
# Now 'put' the value.
self.setpoint.put(setpoint)
# And return the Status object, which the caller can use to
# tell when the action is complete.
return status
decay = Decay('decay', name='decay')
status = decay.set(125)
status.add_callback(callback)
decay.tolerance.set(2)
status = decay.set(130)
status.add_callback(callback)
Some IOCs (but not all) provide a specific signal that we can use to know when a set is complete. In that case we can remove the "tolerance" logic entirely if we want to and trust the IOC.
class Decay(Device):
"""
A device with a setpoint and readback that decays exponentially toward the setpoint.
"""
readback = Component(EpicsSignalRO, ':I')
setpoint = Component(EpicsSignal, ':SP')
done = Component(EpicsSignalRO, ':done')
def set(self, setpoint):
"""
Set the setpoint and return a Status object that monitors the 'done' PV.
"""
status = DeviceStatus(self)
# Wire up a callback that will mark the status object as finished
# when the done signal goes from low to high---that is, a positive edge.
def callback(old_value, value, **kwargs):
if old_value == 0 and value == 1:
status._finished()
self.done.clear_sub(callback)
self.done.subscribe(callback)
# Now 'put' the value.
self.setpoint.put(setpoint)
# And return the Status object, which the caller can use to
# tell when the action is complete.
return status
decay = Decay('decay', name='decay')
decay
status = decay.set(135)
status.add_callback(callback)
PVPositioner
¶The pattern of readback
, setpoint
and done
is pretty common, so ophyd has a special Device
subclass that writes the set()
method for you if you provide components with these particular names.
from ophyd import PVPositioner
class Decay(PVPositioner):
"""
A device with a setpoint and readback that decays exponentially toward the setpoint.
"""
readback = Component(EpicsSignalRO, ':I')
setpoint = Component(EpicsSignal, ':SP')
done = Component(EpicsSignalRO, ':done')
# actuate = Component(EpicsSignal, ...) # the "Go" button, not applicable to this IOC, but sometimes needed
decay = Decay('decay', name='decay')
status = decay.set(140)
status.add_callback(callback)
trigger
method to Device
¶Like Device.set
, Device.trigger
can coordinate across multiple PVs to trigger and detector and tell when it is done triggering.
When a bluesky plan obtains a reading from some device
it typically:
device.trigger()
and receives back a status objectdevice.read()
Some detectors don't need to be triggered and may be summarily read at any time, such as random_walk.x
. In that case, device.trigger()
simply has no effect and returns a Status object that is immediately "done". But other detector require a specific signal or sequence of signals to acquire a new reading, such as in this example.
class TriggeredDetector(Device):
"""
A detector that requires triggering
"""
gain = Component(EpicsSignal, ':gain')
exposure_time = Component(EpicsSignal, ':exposure_time')
reading = Component(EpicsSignalRO, ':reading')
acquire = Component(EpicsSignal, ':acquire')
enabled = Component(EpicsSignal, ':enabled')
def trigger(self):
"""
Trigger the detector and return a Status object.
"""
status = DeviceStatus(self)
# Wire up a callback that will mark the status object as finished
# when we see the state flip from "acquiring" to "not acquiring"---
# that is, a negative edge.
def callback(old_value, value, **kwargs):
if old_value == 1 and value == 0:
status._finished()
self.acquire.clear_sub(callback)
self.acquire.subscribe(callback)
# Now 'put' 1 to the acquire signal.
self.acquire.put(1)
# And return the Status object, which the caller can use to
# tell when the action is complete.
return status
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
status = triggered_detector.trigger()
This status
object is exactly the same as the one we got from set()
. We can check completion by registering a callback or polling.
def callback():
print("ACQUISITION COMPLETE")
status.add_callback(callback)
while not status.done:
time.sleep(0.01)
print("ACQUISITION COMPLETE!")
If the IOC in question implements correct "put-completion" on the triggering PV, we can rely on that, and a simpler solution is possible. But not all IOCs do this so it is good to know the more general solution above.
class TriggeredDetector(Device):
gain = Component(EpicsSignal, ':gain')
exposure_time = Component(EpicsSignal, ':exposure_time')
reading = Component(EpicsSignalRO, ':reading')
acquire = Component(EpicsSignal, ':acquire', put_complete=True) # Rely on the IOC to signal done-ness.
enabled = Component(EpicsSignal, ':enabled')
def trigger(self):
"""
Trigger the detector and return a Status object.
"""
status = DeviceStatus(self)
self.acquire.put(1, callback=status._finished)
return status
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
status = triggered_detector.trigger()
while not status.done:
time.sleep(0.01)
Status objects are like rich Futures. They know whether they are done
, whether their action finished in success
or not, and they hold a reference to the device
that they came from, which can be useful to debugging failures. The status.watch()
may be used to subscribe to incremental progress updates and is used by bluesky to display progress bars during sets and triggers.
status
status.success
status.device
triggered_detector.read()
class TriggeredDetector(Device):
"""
A detector that requires triggering
"""
gain = Component(EpicsSignal, ':gain', kind='config')
exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
reading = Component(EpicsSignalRO, ':reading', kind='normal')
acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
enabled = Component(EpicsSignal, ':enabled', kind='omitted')
def trigger(self):
"""
Trigger the detector and return a Status object.
"""
status = DeviceStatus(self)
self.acquire.put(1, callback=status._finished)
return status
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
triggered_detector.read()
triggered_detector.read_configuration()
triggered_detector.reading.kind
triggered_detector.reading.kind = 'HINTED'
triggered_detector.reading.kind
triggered_detector.hints
Above we said that when bluesky obtains a reading from some device
it typically:
device.trigger()
and receives back a status objectdevice.read()
If it obtains multiple readings in sequence, it repeats this trigger/wait/read cycle. Sometimes, before triggering, there is some choreographed sequence of steps necessary to make device
ready for use and some corresponding sequence to put in back safely into a resting state. To support this bluesky plans typically call device.stage()
once before first using a device in a plan and then device.unstage()
at the end. Even if a plan is interrupted by the user or by an exception begin raise, device.unstage()
will be called.
Like device.trigger()
, is hook is optional and does not apply to all devices.
from ophyd import set_and_wait
class TriggeredDetector(Device):
"""
A detector that requires triggering
"""
gain = Component(EpicsSignal, ':gain', kind='config')
exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
reading = Component(EpicsSignalRO, ':reading', kind='normal')
acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
enabled = Component(EpicsSignal, ':enabled', kind='omitted')
def trigger(self):
"""
Trigger the detector and return a Status object.
"""
status = DeviceStatus(self)
self.acquire.put(1, callback=status._finished)
return status
def stage(self):
self.initial_enabled_state = self.enabled.get()
set_and_wait(self.enabled, 1)
return super().stage()
def unstage(self):
ret = super().unstage()
set_and_wait(self.enabled, self.initial_enabled_state)
return ret
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
triggered_detector.enabled.put(0)
triggered_detector.enabled.get()
triggered_detector.stage()
triggered_detector.enabled.get()
status = triggered_detector.trigger()
while not status.done:
time.sleep(0.01)
triggered_detector.read()
status = triggered_detector.trigger()
while not status.done:
time.sleep(0.01)
triggered_detector.read()
triggered_detector.unstage()
triggered_detector.enabled.get()
stage_sigs
¶from ophyd import set_and_wait
class TriggeredDetector(Device):
"""
A detector that requires triggering
"""
gain = Component(EpicsSignal, ':gain', kind='config')
exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
reading = Component(EpicsSignalRO, ':reading', kind='hinted')
acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
enabled = Component(EpicsSignal, ':enabled', kind='omitted')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stage_sigs['enabled'] = 1 # OrderedDict mapping component name to desired state
def trigger(self):
"""
Trigger the detector and return a Status object.
"""
status = DeviceStatus(self)
self.acquire.put(1, callback=status._finished)
return status
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
Try staging the device twice in a row. Then try unstaging it twice in a row.
stop
, resume
, pause
¶These optional methods can be used to further customize a Device's cleanup:
stop
-- called by bluesky when a plan is paused or exits (successfully or in error)pause
-- called when the RunEngine is pausedresume
-- called when the RunEngine resumes from a pauseclass TriggeredDetector(Device):
"""
A detector that requires triggering
"""
gain = Component(EpicsSignal, ':gain', kind='config')
exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
reading = Component(EpicsSignalRO, ':reading', kind='hinted')
acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
enabled = Component(EpicsSignal, ':enabled', kind='omitted')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stage_sigs['enabled'] = 1 # OrderedDict mapping component name to desired state
def trigger(self):
"""
Trigger the detector and return a Status object.
"""
status = DeviceStatus(self)
self.acquire.put(1, callback=status._finished)
return status
def resume(self):
...
def pause(self):
...
def stop(self, success=False):
...
The payoff of adhering to the Device
interfaces is that you can scan devices with very different ways of indicating when they are done (put completion or not, one or multiple PVs), very different hardware (motor, diffractometer, temperature controller, power supply voltage, anything you want to "scan" as part of an experiment). The details of each case are encapsulated in Device
and do not leak into the scripts higher up.
%matplotlib widget
import matplotlib.pyplot as plt
from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.plans import scan
RE = RunEngine()
bec = BestEffortCallback()
RE.subscribe(bec)
%run scripts/eurotherm.py
eurotherm = Eurotherm('thermo:', name='eurotherm')
eurotherm.readback.kind = 'hinted'
eurotherm.hints
# Creating a figure explicitly in advance helps with the
# top-to-bottom flow of this notebook, but it is not necessary.
# If this is omitted, bluesky will cause a figure to appear
# during the RE(...) execution below.
plt.figure('triggered_detector_reading vs eurotherm_readback')
RE(scan([triggered_detector], eurotherm, 100, 130, 6))
plt.gcf() # Display a snapshot of the current state of the figure.