In this tutorial we will take a first look at how we might add an array detector device to Ophyd.
To allow us to focus purely on the Ophyd side of things, we've stripped out EPICS entirely here and kept all other complexity to a minimum.
You will need to define a function that integrates directly the hardware to acquire an image and save it at a specified filepath.
This function must return the array shape (i.e. dimensions) of the image. The name of the function does not matter.
import numpy
from pathlib import Path
def acquire_image(filepath):
"""
This function should integrate directly with the hardware.
No concepts particular to ophyd are involved here.
Just tell the hardware to take an image, however that works.
This function should block until acquisition is complete or
raise if acquisition fails.
It will be run on a worker thread, so it will not block
ophyd / the RunEngine.
"""
# For this tutorail, just generate a random image.
from PIL import Image
image = numpy.random.randint(0, 255, (512, 512)).astype('uint8')
# Ensure the directory exists.
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
# Save the image.
Image.fromarray(image).save(filepath)
return image.shape
acquire_image('test.jpg')
This sample function simply generated a random image in the current directory.
Let's have a quick look at it:
from IPython.display import Image
Image('test.jpg')
Let's get some imports out of the way before we move on:
import os
import uuid
import threading
import itertools
import requests
from ophyd import Device, Component, Signal, DeviceStatus
from ophyd.areadetector.filestore_mixins import resource_factory
We will need to define a signal to help us reference the image file:
class ExternalFileReference(Signal):
"""
A pure software signal pointing to data in an external file
The parent device is intended to set the value of this Signal to a datum_id.
"""
def __init__(self, *args, shape, **kwargs):
super().__init__(*args, **kwargs)
self.shape = shape
def describe(self):
res = super().describe()
# Tell consumers that readings from this Signal point to "external" data,
# data that is not in-line in the reading itself.
res[self.name].update(dict(external="FILESTORE:", dtype="array", shape=self.shape))
return res
Our Camera
device will use this ExternalFileReference, and implement the bulk of the staging and acquisition logic:
class Camera(Device):
"""
An ophyd device for a camera that acquires images and saves them in files.
"""
# We initialize the shape to [] and update it below once we know the shape
# of the array.
image = Component(ExternalFileReference, value="", kind="normal", shape=[])
def __init__(self, *args, root_path, **kwargs):
super().__init__(*args, **kwargs)
self._root_path = root_path
# Use this lock to ensure that we only process one "trigger" at a time.
# Generally bluesky should care of this, so this is just an extra
# precaution.
self._acquiring_lock = threading.Lock()
self._counter = None # set to an itertools.count object when staged
# Accumulate Resource and Datum documents in this cache.
self._asset_docs_cache = []
# This string is included in the Resource documents to indicate which
# can of reader ("handler") is needed to access the relevant data.
self._SPEC = "MY_FORMAT_SPEC"
def stage(self):
# Set the filepath where will be saving images.
self._rel_path_template = f"images/{uuid.uuid4()}_%d.jpg"
# Create a Resource document referring to this series of images that we
# are about to take, and stash it in _asset_docs_cache.
resource, self._datum_factory = resource_factory(
self._SPEC, self._root_path, self._rel_path_template, {}, "posix")
self._asset_docs_cache.append(("resource", resource))
self._counter = itertools.count()
return super().stage()
def unstage(self):
self._counter = None
self._asset_docs_cache.clear()
return super().unstage()
def trigger(self):
status = DeviceStatus(self)
if self._counter is None:
raise RuntimeError("Device must be staged before triggering.")
i = next(self._counter)
# Start a background thread to capture an image and write it to disk.
thread = threading.Thread(target=self._capture, args=(status, i))
thread.start()
# Promptly return a status object, which will be marked "done" when the
# capture completes.
return status
def _capture(self, status, i):
"This runs on a background thread."
try:
if not self._acquiring_lock.acquire(timeout=0):
raise RuntimeError("Cannot trigger, currently triggering!")
filepath = os.path.join(self._root_path, self._rel_path_template % i)
# Kick off requests, or subprocess, or whatever with the result
# that a file is saved at `filepath`.
shape = acquire_image(filepath)
self.image.shape = shape
# Compose a Datum document referring to this specific image, and
# stash it in _asset_docs_cache.
datum = self._datum_factory({"index": i})
self._asset_docs_cache.append(("datum", datum))
self.image.set(datum["datum_id"]).wait()
except Exception as exc:
status.set_exception(exc)
else:
status.set_finished()
finally:
self._acquiring_lock.release()
def collect_asset_docs(self):
"Yield the documents from our cache, and reset it."
yield from self._asset_docs_cache
self._asset_docs_cache.clear()
Finally, we will need a File Handler to allow us to load data from the file. Handlers are explained in more detail in the event model documentation.
A simple one might look like this:
class MyHandler:
def __init__(self, resource_path):
# resource_path is really a template string with a %d in it
self._template = resource_path
def __call__(self, index):
import PIL, numpy
filepath = str(self._template) % index
return numpy.asarray(PIL.Image.open(filepath))
And, of course, we will want an instance of our Camera
device to work with:
camera = Camera(root_path="external_data", name="camera")
camera
As before, we'll manually walk through the individual steps such as staging and reading from the device. Typically this would be done as part of a plan executed by the RunEngine.
camera.stage()
status = camera.trigger()
status
status
camera.describe()
camera.read()
documents = list(camera.collect_asset_docs())
documents
camera.unstage()
Let's take a closer look at what is going on inside the documents:
documents
We can pull out the interesting structures, and finally put our Handler to use:
_, resource_document = documents[0]
_, datum_document = documents[1]
handler = MyHandler(
Path(resource_document["root"], resource_document["resource_path"]),
**resource_document["resource_kwargs"]
)
When we invoke the handler and pass in the datum_kwargs
with the index
, we should get back an array with our data:
handler(**datum_document["datum_kwargs"])
from bluesky import RunEngine
from databroker.v2 import temp
RE = RunEngine()
db = temp()
RE.subscribe(db.v1.insert)
db.register_handler("MY_FORMAT_SPEC", MyHandler)
from bluesky.plans import count
RE(count([camera]))
run = db[-1] # Acccess the most recent run.
dataset = run.primary.read() # Access the dataset of its 'primary' stream.
dataset
dataset["camera_image"]