Device

In this notebook you will:

  • Encapsulate multiple Signals in a Device

Recommend Prerequisites:

Simulated Hardware

Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs should already be running in the background. Run this command to verify that they are running: it should produce output with RUNNING on each line. In the event of a problem, edit this command to replace status with restart all and run again.

In [ ]:
!supervisorctl -c supervisor/supervisord.conf status

Devices are a heirarchy

A Device is a hierarchy composed of Signals and other Devices. The components of a Device can be introspected by layers above ophyd and may be decomposed to, ultimately, the underlying Signals.

In [ ]:
from ophyd import EpicsSignal, EpicsSignalRO

x = EpicsSignal('random_walk:x')
dt = EpicsSignal('random_walk:dt')

It would be convenient if we could read these as a unit, instead of x.read(); dt.read().

In [ ]:
from ophyd import Device, Component

class RandomWalk(Device):
    x = Component(EpicsSignalRO, 'x')
    dt = Component(EpicsSignal, 'dt')
    
random_walk = RandomWalk('random_walk:', name='random_walk')
random_walk.wait_for_connection()
random_walk

The read() and describe() methods walk the hierarchy.

In [ ]:
random_walk.read()
In [ ]:
random_walk.x.read()
In [ ]:
random_walk.dt.read()
In [ ]:
random_walk.describe()

A Device embodies a certain "layout" of components. We can have multiple Devices with different PV prefixes but the same layout.

In [ ]:
another_random_walk = RandomWalk('random_walk:vert-', name='another_random_walk')
another_random_walk.wait_for_connection()
In [ ]:
another_random_walk.read()

A Device can be made of subdevices.

In [ ]:
class RandomWalks(Device):
    vert = Component(RandomWalk, 'vert-')
    horiz = Component(RandomWalk, 'horiz-')
    
random_walks = RandomWalks('random_walk:', name='random_walks')
random_walks.wait_for_connection()
random_walks
In [ ]:
random_walks.vert.x.pvname
In [ ]:
random_walks.read()
In [ ]:
random_walks.vert.read()
In [ ]:
random_walks.vert.x.read()

Exercise

Using the random_walks Device, set the PV random_walk:horiz-dt to 3.

In [ ]:
%load solutions/set_subcomponent_signal.py

Adding a set method to Device

Sometimes, setting a value to a Signal and knowing when it is "done" involves just one PV:

In [ ]:
status = random_walks.vert.dt.set(2)

In other cases it involves coordination across multiple PVs, such as a setpoint PV nd a readback PV, or a setpoint PV and a "done" PV. For those cases, we define a set method on the Device to manage the coordination across multiple Signals.

In [ ]:
from ophyd import DeviceStatus

class Decay(Device):
    """
    A device with a setpoint and readback that decays exponentially toward the setpoint.
    """
    readback = Component(EpicsSignalRO, ':I')
    setpoint = Component(EpicsSignal, ':SP')
    
    def set(self, setpoint):
        """
        Set the setpoint and return a Status object that monitors the readback.
        """
        status = DeviceStatus(self)
        
        # Wire up a callback that will mark the status object as finished
        # when the readback approaches within some tolerance of the setpoint.
        def callback(old_value, value, **kwargs):
            TOLERANCE = 1  # hard-coded; we'll make this configurable later on...
            if abs(value - setpoint) < TOLERANCE:
                status._finished()
                self.readback.clear_sub(callback)
            
        self.readback.subscribe(callback)
        
        # Now 'put' the value.
        self.setpoint.put(setpoint)
        
        # And return the Status object, which the caller can use to
        # tell when the action is complete.
        return status
        
    
decay = Decay('decay', name='decay')
decay.wait_for_connection()
decay
In [ ]:
decay.read()
In [ ]:
status = decay.set(115)

We can watch for completion either by registering a callback:

In [ ]:
def callback():
    print("DONE!")
    
status.add_callback(callback)

or by polling:

In [ ]:
status = decay.set(120)

import time
while not status.done:
    time.sleep(0.01)  # Make sure to sleep to avoid pinning CPU.
print("DONE!")

Make the tolerance configurable with a "soft" Signal

In [ ]:
from ophyd import Signal

class Decay(Device):
    """
    A device with a setpoint and readback that decays exponentially toward the setpoint.
    """
    readback = Component(EpicsSignalRO, ':I')
    setpoint = Component(EpicsSignal, ':SP')
    tolerance = Component(Signal, value=1)  # not associated with anything in EPICS---a pure ophyd construct
    
    def set(self, setpoint):
        """
        Set the setpoint and return a Status object that monitors the readback.
        """
        status = DeviceStatus(self)
        
        # Wire up a callback that will mark the status object as finished
        # when the readback approaches within some tolerance of the setpoint.
        def callback(old_value, value, **kwargs):
            if abs(value - setpoint) < self.tolerance.get():
                status._finished()
                self.readback.clear_sub(callback)
            
        self.readback.subscribe(callback)
        
        # Now 'put' the value.
        self.setpoint.put(setpoint)
        
        # And return the Status object, which the caller can use to
        # tell when the action is complete.
        return status
        
    
decay = Decay('decay', name='decay')
status = decay.set(125)
status.add_callback(callback)
In [ ]:
decay.tolerance.set(2)
status = decay.set(130)
status.add_callback(callback)

Let the IOC tell us when it is done

Some IOCs (but not all) provide a specific signal that we can use to know when a set is complete. In that case we can remove the "tolerance" logic entirely if we want to and trust the IOC.

In [ ]:
class Decay(Device):
    """
    A device with a setpoint and readback that decays exponentially toward the setpoint.
    """
    readback = Component(EpicsSignalRO, ':I')
    setpoint = Component(EpicsSignal, ':SP')
    done = Component(EpicsSignalRO, ':done')
    
    def set(self, setpoint):
        """
        Set the setpoint and return a Status object that monitors the 'done' PV.
        """
        status = DeviceStatus(self)
        
        # Wire up a callback that will mark the status object as finished
        # when the done signal goes from low to high---that is, a positive edge.
        def callback(old_value, value, **kwargs):
            if old_value == 0 and value == 1:
                status._finished()
                self.done.clear_sub(callback)
            
        self.done.subscribe(callback)
        
        # Now 'put' the value.
        self.setpoint.put(setpoint)
        
        # And return the Status object, which the caller can use to
        # tell when the action is complete.
        return status
        
    
decay = Decay('decay', name='decay')
decay
In [ ]:
status = decay.set(135)
status.add_callback(callback)

PVPositioner

The pattern of readback, setpoint and done is pretty common, so ophyd has a special Device subclass that writes the set() method for you if you provide components with these particular names.

In [ ]:
from ophyd import PVPositioner

class Decay(PVPositioner):
    """
    A device with a setpoint and readback that decays exponentially toward the setpoint.
    """
    readback = Component(EpicsSignalRO, ':I')
    setpoint = Component(EpicsSignal, ':SP')
    done = Component(EpicsSignalRO, ':done')
    # actuate = Component(EpicsSignal, ...)  # the "Go" button, not applicable to this IOC, but sometimes needed
    
decay = Decay('decay', name='decay')
status = decay.set(140)
status.add_callback(callback)

Adding a trigger method to Device

Like Device.set, Device.trigger can coordinate across multiple PVs to trigger and detector and tell when it is done triggering.

When a bluesky plan obtains a reading from some device it typically:

  • Calls device.trigger() and receives back a status object
  • Waits for that status object to complete (while potentially doing other things, like triggering other detectors in parallel)
  • Calls device.read()

Some detectors don't need to be triggered and may be summarily read at any time, such as random_walk.x. In that case, device.trigger() simply has no effect and returns a Status object that is immediately "done". But other detector require a specific signal or sequence of signals to acquire a new reading, such as in this example.

In [ ]:
class TriggeredDetector(Device):
    """
    A detector that requires triggering
    """
    gain = Component(EpicsSignal, ':gain')
    exposure_time = Component(EpicsSignal, ':exposure_time')
    reading = Component(EpicsSignalRO, ':reading')
    acquire = Component(EpicsSignal, ':acquire')
    enabled = Component(EpicsSignal, ':enabled')

    def trigger(self):
        """
        Trigger the detector and return a Status object.
        """
        status = DeviceStatus(self)
        
        # Wire up a callback that will mark the status object as finished
        # when we see the state flip from "acquiring" to "not acquiring"---
        # that is, a negative edge.
        def callback(old_value, value, **kwargs):
            if old_value == 1 and value == 0:
                status._finished()
                self.acquire.clear_sub(callback)
            
        self.acquire.subscribe(callback)
        
        # Now 'put' 1 to the acquire signal.
        self.acquire.put(1)        

        # And return the Status object, which the caller can use to
        # tell when the action is complete.
        return status
    
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
In [ ]:
status = triggered_detector.trigger()

This status object is exactly the same as the one we got from set(). We can check completion by registering a callback or polling.

In [ ]:
def callback():
    print("ACQUISITION COMPLETE")

status.add_callback(callback)
In [ ]:
while not status.done:
    time.sleep(0.01)
print("ACQUISITION COMPLETE!")

Using put-completion

If the IOC in question implements correct "put-completion" on the triggering PV, we can rely on that, and a simpler solution is possible. But not all IOCs do this so it is good to know the more general solution above.

In [ ]:
class TriggeredDetector(Device):
    gain = Component(EpicsSignal, ':gain')
    exposure_time = Component(EpicsSignal, ':exposure_time')
    reading = Component(EpicsSignalRO, ':reading')
    acquire = Component(EpicsSignal, ':acquire', put_complete=True)  # Rely on the IOC to signal done-ness.
    enabled = Component(EpicsSignal, ':enabled')

    def trigger(self):
        """
        Trigger the detector and return a Status object.
        """
        status = DeviceStatus(self)
        self.acquire.put(1, callback=status._finished)
        return status

triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
In [ ]:
status = triggered_detector.trigger()
while not status.done:
    time.sleep(0.01)

Status objects

Status objects are like rich Futures. They know whether they are done, whether their action finished in success or not, and they hold a reference to the device that they came from, which can be useful to debugging failures. The status.watch() may be used to subscribe to incremental progress updates and is used by bluesky to display progress bars during sets and triggers.

In [ ]:
status
In [ ]:
status.success
In [ ]:
status.device

Sorting components into "kinds"

In [ ]:
triggered_detector.read()
In [ ]:
class TriggeredDetector(Device):
    """
    A detector that requires triggering
    """
    gain = Component(EpicsSignal, ':gain', kind='config')
    exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
    reading = Component(EpicsSignalRO, ':reading', kind='normal')
    acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
    enabled = Component(EpicsSignal, ':enabled', kind='omitted')

    def trigger(self):
        """
        Trigger the detector and return a Status object.
        """
        status = DeviceStatus(self)
        self.acquire.put(1, callback=status._finished)
        return status
    
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
In [ ]:
triggered_detector.read()
In [ ]:
triggered_detector.read_configuration()
In [ ]:
triggered_detector.reading.kind
In [ ]:
triggered_detector.reading.kind = 'HINTED'
triggered_detector.reading.kind
In [ ]:
triggered_detector.hints

Staging and unstaging

Above we said that when bluesky obtains a reading from some device it typically:

  • Calls device.trigger() and receives back a status object
  • Waits for that status object to complete (while potentially doing other things, like triggering other detectors in parallel)
  • Calls device.read()

If it obtains multiple readings in sequence, it repeats this trigger/wait/read cycle. Sometimes, before triggering, there is some choreographed sequence of steps necessary to make device ready for use and some corresponding sequence to put in back safely into a resting state. To support this bluesky plans typically call device.stage() once before first using a device in a plan and then device.unstage() at the end. Even if a plan is interrupted by the user or by an exception begin raise, device.unstage() will be called.

Like device.trigger(), is hook is optional and does not apply to all devices.

In [ ]:
from ophyd import set_and_wait

class TriggeredDetector(Device):
    """
    A detector that requires triggering
    """
    gain = Component(EpicsSignal, ':gain', kind='config')
    exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
    reading = Component(EpicsSignalRO, ':reading', kind='normal')
    acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
    enabled = Component(EpicsSignal, ':enabled', kind='omitted')

    def trigger(self):
        """
        Trigger the detector and return a Status object.
        """
        status = DeviceStatus(self)
        self.acquire.put(1, callback=status._finished)
        return status
    
    def stage(self):
        self.initial_enabled_state = self.enabled.get()
        set_and_wait(self.enabled, 1)
        return super().stage()
    
    def unstage(self):
        ret =  super().unstage()
        set_and_wait(self.enabled, self.initial_enabled_state)
        return ret
    
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
In [ ]:
triggered_detector.enabled.put(0)
triggered_detector.enabled.get()
In [ ]:
triggered_detector.stage()
In [ ]:
triggered_detector.enabled.get()
In [ ]:
status = triggered_detector.trigger()
In [ ]:
while not status.done:
    time.sleep(0.01)
In [ ]:
triggered_detector.read()
In [ ]:
status = triggered_detector.trigger()
In [ ]:
while not status.done:
    time.sleep(0.01)
In [ ]:
triggered_detector.read()
In [ ]:
triggered_detector.unstage()
In [ ]:
triggered_detector.enabled.get()

A convenient shorthand for common simple cases: stage_sigs

In [ ]:
from ophyd import set_and_wait

class TriggeredDetector(Device):
    """
    A detector that requires triggering
    """
    gain = Component(EpicsSignal, ':gain', kind='config')
    exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
    reading = Component(EpicsSignalRO, ':reading', kind='hinted')
    acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
    enabled = Component(EpicsSignal, ':enabled', kind='omitted')
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stage_sigs['enabled'] = 1  # OrderedDict mapping component name to desired state

    def trigger(self):
        """
        Trigger the detector and return a Status object.
        """
        status = DeviceStatus(self)
        self.acquire.put(1, callback=status._finished)
        return status
    
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')

Exercise

Try staging the device twice in a row. Then try unstaging it twice in a row.

In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 

Customizing cleanup via stop, resume, pause

These optional methods can be used to further customize a Device's cleanup:

  • stop -- called by bluesky when a plan is paused or exits (successfully or in error)
  • pause -- called when the RunEngine is paused
  • resume -- called when the RunEngine resumes from a pause
In [ ]:
class TriggeredDetector(Device):
    """
    A detector that requires triggering
    """
In [ ]:
    gain = Component(EpicsSignal, ':gain', kind='config')
    exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
    reading = Component(EpicsSignalRO, ':reading', kind='hinted')
    acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
    enabled = Component(EpicsSignal, ':enabled', kind='omitted')
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stage_sigs['enabled'] = 1  # OrderedDict mapping component name to desired state

    def trigger(self):
        """
        Trigger the detector and return a Status object.
        """
        status = DeviceStatus(self)
        self.acquire.put(1, callback=status._finished)
        return status
    
    def resume(self):
        ...
        
    def pause(self):
        ...
        
    def stop(self, success=False):
        ...

The Payoff

The payoff of adhering to the Device interfaces is that you can scan devices with very different ways of indicating when they are done (put completion or not, one or multiple PVs), very different hardware (motor, diffractometer, temperature controller, power supply voltage, anything you want to "scan" as part of an experiment). The details of each case are encapsulated in Device and do not leak into the scripts higher up.

In [ ]:
%matplotlib widget
import matplotlib.pyplot as plt
from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.plans import scan

RE = RunEngine()
bec = BestEffortCallback()
RE.subscribe(bec)
In [ ]:
%run scripts/eurotherm.py

eurotherm = Eurotherm('thermo:', name='eurotherm')
eurotherm.readback.kind = 'hinted'
In [ ]:
eurotherm.hints
In [ ]:
# Creating a figure explicitly in advance helps with the
# top-to-bottom flow of this notebook, but it is not necessary.
# If this is omitted, bluesky will cause a figure to appear
# during the RE(...) execution below.
plt.figure('triggered_detector_reading vs eurotherm_readback')
In [ ]:
RE(scan([triggered_detector], eurotherm, 100, 130, 6))
In [ ]:
plt.gcf()  # Display a snapshot of the current state of the figure.
In [ ]: