Anatomy of a Device

In this notebook you will:

  • Understand the various methods of an ophyd Signal
  • Learn how to group Signals into Devices.
  • Learn how to specific "pseudopositions" that expose real axes (corresponding to physical hardware) and pseudoaxes, that move real axes via some mathematical transformation.

Recommended Prerequisites:

Configuration

Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs should already be running in the background. Run this command to verify that they are running: it should produce output with RUNNING on each line. In the event of a problem, edit this command to replace status with restart all and run again.

In [ ]:
!supervisorctl -c supervisor/supervisord.conf status
In [ ]:
%run scripts/beamline_configuration.py
In [ ]:
import time
from ophyd import Device, Signal, Component as Cpt, DeviceStatus
from ophyd.sim import SynSignal, SynPeriodicSignal

Interface to Signal

In [ ]:
sig = Signal(name='sig', value=3)
sig

Methods that require no communication with the IOC

In [ ]:
sig.name
In [ ]:
sig.parent is None

Methods that ask the IOC to tell us something it already 'knows'

In [ ]:
sig.connected
In [ ]:
sig.limits
In [ ]:
sig.read()
In [ ]:
sig.describe()

Monitoring (subscribing for updates asynchronously)

In [ ]:
def cb(value, old_value, **a_whole_bunch_of_junk):
    print(f'changed from {old_value} to {value}')
    
sig.subscribe(cb)
# The act of subscribing always generates one reading immediately...

If this were an EpicsSignal instead of a Signal, cb would be called from a thread every time pyepics receives a new update about the value of sig. In this case, we have to update it manually.

In [ ]:
sig.put(5)
In [ ]:
sig.put(10)

Or we can connect to the random_walk IOC which publishes a new updates at a regular interval.

In [ ]:
from ophyd import EpicsSignal

rand = EpicsSignal('random_walk:x', name='rand')
token = rand.subscribe(cb)
In [ ]:
rand.unsubscribe(token)

Methods that ask the IOC to take a (potentially lengthy) action

In [ ]:
def cb():
    print("finished at t =", time.time())

status = sig.set(5)
status.add_callback(cb)
In [ ]:
status
In [ ]:
status.done
In [ ]:
status = sig.trigger()
status.add_callback(cb)

Interface of a Status object

In [ ]:
status = DeviceStatus(sig)
In [ ]:
status.done
In [ ]:
status.success
In [ ]:
def cb():
    print("BOOM")

status.add_callback(cb)
In [ ]:
status.callbacks
In [ ]:
status.device  # the Device or Signal that the Status pertains to
In [ ]:
status._finished()
In [ ]:
status.done
In [ ]:
status.success
In [ ]:
# Failure looks like this:
status = DeviceStatus(sig)
status.add_callback(cb)
status._finished(success=False)
status.success

We'll see later how to actually use this in practice.

Interface to Device

In [ ]:
# This encodes the _structure_ of a kind of Device.
# Real examples include EpicsMotor, EpicsScaler or user-defined
# combinations of these, such as a platform that can move in X and Y.

class Platform(Device):
    x = Cpt(Signal, value=3)
    y = Cpt(Signal, value=4)
    
p1 = Platform(name='p1')
p2 = Platform(name='p2')

Names and relationships

In [ ]:
p1
In [ ]:
p1.component_names
In [ ]:
p1.x
In [ ]:
p1.y
In [ ]:
p1.name
In [ ]:
p1.x.name
In [ ]:
p1.x.parent is p1

Reading the parent combines the readings of its children

In [ ]:
p1.read()
In [ ]:
p1.x.read()

and describe works exactly the same way:

In [ ]:
p1.describe()
In [ ]:
p1.x.describe()

Components are sorted into categories:

  • OMITTED -- not read (exposed for debugging only)
  • NORMAL / read_attrs -- things to read once per Event (i.e. row in the table)
  • CONFIG / configuration_attrs -- things to read once per Event Descriptor (which usually means one per run)
  • things ommitted from data collection entirely, but available for debugging etc.
  • HINTED -- subset of NORMAL flagged as interesting
In [ ]:
p1.read_attrs
In [ ]:
p1.configuration_attrs
In [ ]:
# dumb example...

class Platform(Device):
    x = Cpt(Signal, value=3)
    y = Cpt(Signal, value=4)
    motion_compensation = Cpt(Signal, value=1, kind='CONFIG')  # a boolean
    
p1 = Platform(name='p1')
p2 = Platform(name='p2')
In [ ]:
p1.read_attrs
In [ ]:
p1.configuration_attrs
In [ ]:
p1.read_configuration()
In [ ]:
p1.describe_configuration()

The data from configuration_attrs isn't displayed by the built-in callbacks...

In [ ]:
RE(count([p1]))

... but the data is saved, and it can accessed conveniently like so:

In [ ]:
h = db[-1]
h.config_data('p1')
In [ ]:
p1.summary()

Hints are meant to help downstream consumers of the data correctly infer user intent and automatically construct useful views on the data. They are only a suggestion. They do not affect what is saved.

In [ ]:
# dumb example...

class Platform(Device):
    x = Cpt(Signal, value=3, kind='hinted')
    y = Cpt(Signal, value=4, kind='hinted')
    motion_compensation = Cpt(Signal, value=1, kind='config')  # a boolean
    
p1 = Platform(name='p1')
p1.hints
In [ ]:
p1.summary()

'Staging' -- a hook for putting a device into a controlled state for data collection (and then putting it back)

In [ ]:
class Platform(Device):
    _default_configuration_attrs = ('motion_compensation',)
    _default_read_attrs = ('x', 'y')
    x = Cpt(Signal, value=3)
    y = Cpt(Signal, value=4)
    motion_compensation = Cpt(Signal, value=1)  # a boolean
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stage_sigs['motion_compensation'] = 1
        

p1 = Platform(name='p1')
In [ ]:
p1.motion_compensation.get()
In [ ]:
p1.motion_compensation.put(0)

Device.stage() stashes the current state of the signals in stage_sigs and then puts the device into the desired state.

In [ ]:
p1.stage()
In [ ]:
p1.motion_compensation.get()

Device.unstage() uses that stashed stage to put everything back.

In [ ]:
p1.unstage()
In [ ]:
p1.motion_compensation.get()

Staging twice is illegal:

In [ ]:
p1.stage()
In [ ]:
# THIS IS EXPECTED TO CREATE AN ERROR.

p1.stage()

But unstaging is indempotent:

In [ ]:
p1.unstage()
p1.unstage()
p1.unstage()

Pseudopositioners

In [ ]:
from ophyd import (PseudoPositioner, PseudoSingle)
from ophyd.pseudopos import (pseudo_position_argument,
                             real_position_argument)
from ophyd import SoftPositioner
C = Cpt

class SPseudo3x3(PseudoPositioner):
    pseudo1 = C(PseudoSingle, limits=(-10, 10), egu='a')
    pseudo2 = C(PseudoSingle, limits=(-10, 10), egu='b')
    pseudo3 = C(PseudoSingle, limits=None, egu='c')
    
    real1 = C(SoftPositioner, init_pos=0.)
    real2 = C(SoftPositioner, init_pos=0.)
    real3 = C(SoftPositioner, init_pos=0.)

    sig = C(Signal, value=0)

    @pseudo_position_argument
    def forward(self, pseudo_pos):
        # logger.debug('forward %s', pseudo_pos)
        return self.RealPosition(real1=-pseudo_pos.pseudo1,
                                    real2=-pseudo_pos.pseudo2,
                                    real3=-pseudo_pos.pseudo3)

    @real_position_argument
    def inverse(self, real_pos):
        # logger.debug('inverse %s', real_pos)
        return self.PseudoPosition(pseudo1=-real_pos.real1,
                                    pseudo2=-real_pos.real2,
                                    pseudo3=-real_pos.real3)
    

p3 = SPseudo3x3(name='p3')
In [ ]:
from ophyd.sim import det

RE(scan([det, p3], p3.pseudo2, -1, 1, 5))
In [ ]: