The first part sets up the observations. It actually stores them in an in-memory SQLite database.
import sys
import sqlite3
from datetime import datetime, timedelta
from collections import namedtuple
import pandas as pd
import numpy as np
A simple type to handle our simulated observations:
Obs = namedtuple('Observation', ['filter', 'exptime', 'target', 'repeats'])
Our telescope and instrument configuration. There is a lot of identicalness here, on purpose: easy to replace, easy to expand, easy to process:
READOUTTIME = 12
CONFIG = {
'telescopes': {
'GOTO1': {
'cameras': {
'UT1': {
'instruments': ['CCD1'],
'filters': ['L', 'B', 'V', 'R', 'I']
},
'UT2': {
'instruments': ['CCD2'],
'filters': ['L', 'B', 'V', 'R', 'I']
},
'UT3': {
'instruments': ['CCD3'],
'filters': ['L', 'B', 'V', 'R', 'I']
},
'UT4': {
'instruments': ['CCD4'],
'filters': ['L', 'B', 'V', 'R', 'I']
},
},
},
'GOTO2': {
'cameras': {
'UT1': {
'instruments': ['CCD1'],
'filters': ['L', 'G', 'R', 'I']
},
'UT2': {
'instruments': ['CCD2'],
'filters': ['L', 'G', 'R', 'I']
},
'UT4': {
'instruments': ['CCD4'],
'filters': ['L', 'G', 'R', 'I']
}
}
}
}
}
We start observing at just half past seven in the evening. Clouds roll in around quarter past nine.
startdate = datetime(2018, 9, 9, 19, 35, 30)
clouddate = datetime(startdate.year, startdate.month, startdate.day, 21, 13, 0)
We start observing a GRB with a standard 3xL, B, V, R sequence. We then move to a series of short exposures on a Cepheid variable, then do some longer exposures on an older Ligo transient.
The L-filter observations are repeated, to account for e.g. cosmic rays, so that defects can be filtered out. These filters are the widest, and are meant to detect the fainter objects.
We'll want to (median) stack the repeated GRB and LIGO observations in our reduction process (and similar for the generic field observations), but not the short Cepheid observations: the latter exhibits rapid variability that we don't miss by averaging over.
schedule = [
Obs(filter='L', exptime=180, target='GRB', repeats=3),
Obs(filter='B', exptime=180, target='GRB', repeats=1),
Obs(filter='V', exptime=180, target='GRB', repeats=1),
Obs(filter='G', exptime=180, target='GRB', repeats=1),
Obs(filter='R', exptime=180, target='GRB', repeats=1),
Obs(filter='R', exptime=15, target='Ceph', repeats=20),
Obs(filter='L', exptime=120, target='GW123456', repeats=6),
Obs(filter='B', exptime=120, target='GW123456', repeats=2),
Obs(filter='G', exptime=120, target='GW123456', repeats=2),
Obs(filter='V', exptime=120, target='GW123456', repeats=2),
Obs(filter='R', exptime=120, target='GW123456', repeats=2),
]
Observe some standard fields with a standard sequence:
for field in [23, 44, 56, 79]:
name = "Field{}".format(field)
schedule.extend([
Obs(filter='L', exptime=120, target=name, repeats=3),
Obs(filter='B', exptime=120, target=name, repeats=1),
Obs(filter='V', exptime=120, target=name, repeats=1),
Obs(filter='G', exptime=120, target=name, repeats=1),
Obs(filter='R', exptime=120, target=name, repeats=1),
])
Some specific sequence for a special target:
schedule.extend([
Obs(filter='L', exptime=80, target='And123', repeats=12),
])
Add some more L-filter observations, for other targets. This results in 18 frames in L-filter, which are for three different sets.
schedule.extend([
Obs(filter='L', exptime=120, target='Peg54', repeats=3),
Obs(filter='L', exptime=80, target='Cas54', repeats=4),
])
Back to default fields and a standard sequence:
for field in [88, 123, 135, 77]:
name = "Field{}".format(field)
# Use three sequences of just two L filter observations;
# combining these sequences should *not* cross the border of
# the individual sequences, that is, the stacks should be 2, 2
# and 2 images, not 3 and 3.
schedule.extend([
Obs(filter='L', exptime=120, target=name, repeats=2),
Obs(filter='L', exptime=120, target=name, repeats=2),
Obs(filter='L', exptime=120, target=name, repeats=2),
Obs(filter='B', exptime=120, target=name, repeats=1),
Obs(filter='V', exptime=120, target=name, repeats=1),
Obs(filter='G', exptime=120, target=name, repeats=1),
Obs(filter='R', exptime=120, target=name, repeats=1),
])
We set up our observation database first: we'll store the results of the simulated observations in here.
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("""CREATE TABLE observations (
id INT PRIMARY KEY,
telescope TEXT,
camera TEXT,
instrument TEXT,
filter TEXT,
imagetype TEXT,
target TEXT,
exptime REAL,
obsdate TEXT,
iobs INT,
nobs INT,
stage INT DEFAULT 0,
status TEXT DEFAULT "unknown",
"set" INT DEFAULT 0
)""")
conn.commit()
We've set up the schedule. Now, we let our telescopes observe. Somewhere during our simulated observations, clouds appear: it takes 1.5 hours to clear, and the current sequence from that moment is aborted, only to be redone once the clouds have cleared.
imagetype = "SCIENCE"
for telescope in ['GOTO1', 'GOTO2']:
for camname, camconfig in CONFIG['telescopes'][telescope]['cameras'].items():
filters = camconfig['filters']
obsdate = startdate
for instrument in camconfig['instruments']:
for obs in schedule:
if obs.filter not in filters:
continue
for iobs in range(obs.repeats):
cursor.execute("""INSERT INTO observations
(telescope, camera, instrument, filter, imagetype, target, exptime, obsdate, iobs, nobs) VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (telescope, camname, instrument, obs.filter, imagetype, obs.target, obs.exptime, obsdate,
iobs+1, obs.repeats))
obsdate += timedelta(0, obs.exptime+READOUTTIME, 0)
if clouddate and obsdate > clouddate:
# Abort current sequence, and wait until it clears
obsdate += timedelta(0, 5400, 0)
clouddate = False
# Now repeat the failed sequence
for iobs in range(obs.repeats):
cursor.execute("""INSERT INTO observations
(telescope, camera, instrument, filter, imagetype, target, exptime, obsdate, iobs, nobs) VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (telescope, camname, instrument, obs.filter, imagetype, obs.target, obs.exptime, obsdate,
iobs+1, obs.repeats))
obsdate += timedelta(0, obs.exptime+READOUTTIME, 0)
break
conn.commit()
Before we retrieve the data from the database, we will put the database in a stage halfway done processing. So it looks like we're halfway done. Note that we focus on just one camera on one telescope: GOTO1 and UT1.
The stages are steps in the data reduction. What they are exactly is not important (e.g., think overscan correction, bias/dark/flat correction etc). What is important, is that stage 4 is the process where images are combined with each other. All the steps before that can be run completely separately for each image (and as such, are very easy to run in parallel). For stage 4, however, there is a dependency between subsets of the images, as detailed further down.
cursor.execute("""\
UPDATE observations SET stage = 1, status = 'processing'
WHERE telescope = 'GOTO1' AND camera = 'UT1' AND instrument = 'CCD1' AND
obsdate < '2018-09-09 23:45:00'
""")
cursor.execute("""\
UPDATE observations SET stage = 1, status = 'completed'
WHERE telescope = 'GOTO1' AND camera = 'UT1' AND instrument = 'CCD1' AND
obsdate < '2018-09-09 23:30:00'
""")
cursor.execute("""\
UPDATE observations SET stage = 2, status = 'processing'
WHERE telescope = 'GOTO1' AND camera = 'UT1' AND instrument = 'CCD1' AND
obsdate < '2018-09-09 23:15:00'
""")
cursor.execute("""\
UPDATE observations SET stage = 2, status = 'completed'
WHERE telescope = 'GOTO1' AND camera = 'UT1' AND instrument = 'CCD1' AND
obsdate < '2018-09-09 23:05:00'
""")
cursor.execute("""\
UPDATE observations SET stage = 3, status = 'processing'
WHERE telescope = 'GOTO1' AND camera = 'UT1' AND instrument = 'CCD1' AND
obsdate BETWEEN '2018-09-09 22:56:00' AND '2018-09-09 23:02:00'
""")
cursor.execute("""\
UPDATE observations SET stage = 3, status = 'completed'
WHERE telescope = 'GOTO1' AND camera = 'UT1' AND instrument = 'CCD1' AND
obsdate < '2018-09-09 22:50:00'
""")
conn.commit()
Let's see what's in the database. Just for one telescope, instrument and camera.
cursor.execute("""SELECT imagetype, target, exptime, filter, obsdate, stage, status
FROM observations
WHERE telescope = 'GOTO1' AND instrument = 'CCD1' AND camera = 'UT1'
ORDER BY obsdate""")
cursor.fetchall()
We use Pandas to handle the table: the most general and Python-esque interface to a database table.
df = pd.read_sql("select * from observations order by obsdate, telescope, camera, instrument",
conn)
df['obsdate'] = pd.to_datetime(df['obsdate'])
Show the table.
Remove .head()
to show the full table, or for example .head(20)
to show just a bit more.
df.head()
Most columns speak for themselves. nobs
and iobs
are inserted by the schedule executor: they are the total number of observations in a sequence, and the current index in a sequence (1-based). A sequence is complete when iobs == nobs
, for a specific combination of telescope, camera and instrument.
Let's choose a nicer index first for our table:
df = df.set_index(['telescope', 'camera', 'instrument'])
df.head()
Let's just select one telescope, camera and instrument; all other ones are identical in observations anyway (except for the rare case one of them breaks down; not in this example). Besides, we didn't set the processing stages for the other telescope and instruments.
Select the index combination of interest, as well as the columns of interest (which avoids the telescope, camera and instrument information). Then, change to a new index.
# sort first to increase performance, and avoid the PerformanceWarning
df.sort_index(inplace=True)
dfsel = df.loc[('GOTO1', 'UT1', 'CCD1'),
['imagetype', 'target', 'obsdate', 'filter', 'exptime', 'iobs', 'nobs', 'stage', 'status']]
dfsel = dfsel.set_index(['imagetype', 'target', 'filter'])
Let's see what the table looks like now. Notice that fhe first observations have already completed processing stage 3, and are ready to be combined (where applicable) in stage 4.
dfsel.head(15)
At 21:13, clouds came in, and we stopped observing for 1.5 hours.
Note how the sequence of 3xL for field 79 is aborted, and we're left with just two observations. Here, nobs
still equals 3, but iobs
will never reach that. It's only 1.5 hours later that there are the proper 3xL frames.
dfsel.iloc[50:65]
Between rows 60 and 100, however, we can see that some data have completed stage 3, others have completed stage 2 or only stage 1, some are still being processed (any of stage 1, 2 or 3), and some data haven't even started stage 1 yet.
dfsel.iloc[60:100]
The following rules apply to combine images:
iobs == nobs
, even ff none of imagetype, target, filter or exposure time changes. If iobs < nobs
, but something else changes, this overrides the iobs == nobs
condition and there'll be a new subset created. Note that this can't be relied upon, since the last frame in a sequence (where iobs == nobs
) may have been discarded early on due to low quality (that is, discarded before it is put in the processing table).NSTACK
). This overrides the iobs === nobs
rule. So a set of 12 identical observations will be split into 4 or 3 (depending on NSTACK
) subsets. There will not be a subset (and resulting combined image) of 12 frames.MAXSEQ
, no subsetting at all for that sequence is done. Usually, MAXSEQ
is 12. So 20 observations of a target are left alone, and not combined. This prevents needlessly combining images that are meant for short-term variability observations.MAXTDELTA
) is exceeded, a new subset will be created. This is practical in case of aborted sequences: the processing should not wait endlessly for consecutive frames, but it can handle short (5-10 minutes) interruptions.With the above rules, and stage 4 as the frame combination stage, and the below constants, we can have an exploratory look what needs to be combined, and how.
Note that, generally, frames are processed in sequential (date) order, since this is how the frames arrive on the processing machine. This is also how they'll be read into the algorithm: sorted by date.
NSTACK = 4
MAXSEQ = 12
MAXTDELTA = timedelta(0, 1800, 0)
For the GRB, we have no problem: the three L-filter frames are combined into a single 360 seconds frame, while all the other filters pass through without having anything done (a no-op).
For the Cepheid variable, we don't do anything: there are 20 frames in the sequence, which is larger than MAXSEQ
. This is no-op.
For the GW target (dfsel.iloc[26:38]
), the BVR filters pose no problem, and are combined by two, into 240 second frames.
The L-filter frames are trickier: ideally, you'd want either one 720 second frame, or two 360 second frames. But with the above set of rules, you'll get one 480 second frame and one 240 second frame, since NSTACK
equals 4.
For field 79, we only combine two L-filter frames. The processing algorithm will wait for half an hour (MAXTDELTA
) before it decides to combine these two frames.
Finally, for the And123 target and later targets, nothing will happen yet, since there are no completed stage 3 data yet.
There is, however, an interesting point to observe for the processing of the And123 target: iobs 5, 6, 7 and 8 are being processed through stage 3 right now, and if they complete, they'll be combined even before the other frames in that sequence are processed. This is because nearly all of the above rules can (and are) applied before, or without regard for, the processing stage. Thus, 5, 6, 7 and 8 form the second subset of that sequence, and are ready to be combined once stage 3 is completed. Of course, if one of those frames fails to be properly processed, we only have 3 frames, and we'd need the fourth frame from the next subset (with iobs 9).
Note that this last step is disputable, and may change. In particular, if a single frame from the [1, 2, 3, 4] subset fails, but the next subset has already been processed, there are no replacement frames to add (or perhaps from the [9, 10, 11, 12] set), and we're left with just 3 frames in the first subset to combine, simply because we processed things in non-sequential order.
Let's go back to our original dataframe, if only to be complete.
We loop over the unique combination of indices (telescope, instrument, camera). To verify our results, we also keep track of the subsets that are not ready for processing.
# Silence a Pandas warning. This occurs at the dfsel[...].iloc[i] stage,
# and should perhaps be replaced by something nicer.
pd.options.mode.chained_assignment = None
cols = ['imagetype', 'target', 'obsdate', 'filter', 'exptime', 'iobs', 'nobs', 'stage', 'status', 'set']
notready = [] # just for information
# A set of 0 means not having been assigned yet
for index in np.unique(df.index):
dfsel = df.loc[index, cols]
# Ensure it's sorted by date
dfsel.sort_values('obsdate', inplace=True)
dfshift = dfsel.shift(1)
# We compare exptime, a float. If, however, the float was set explicitly,
# and didn't come about from a calculation, the comparison will be exact,
# and we're good. Otherwise, things become quite a bit more complicated.
selcols = ['imagetype', 'target', 'filter', 'exptime']
# Compare the shifted frame; this results in a four column frame with booleans.
# Summing these booleans across the columns creates a non-zero value
# for every change across these columns.
# We combine that with other requirements, such as iobs == nobs and
# the maximum allowed time interval.
# Note that 'set' is only unique within a single set of a multi-index
dfsel['set'] = (((dfsel[selcols] != dfshift[selcols]).sum(axis=1) > 0) |
(dfshift['iobs'] == dfshift['nobs']) |
((dfsel['obsdate'] - dfshift['obsdate']) > MAXTDELTA)).cumsum()
# Ignore the last set: it may be incomplete, and data may still be incoming
for name, group in list(dfsel.groupby(by='set'))[:-1]:
selindex = (dfsel['set'] == name)
if len(group) == 1 or len(group) > MAXSEQ:
# Single frames or large sequences are passed through with a no-op
dfsel.loc[selindex, 'stage'] = 4
dfsel.loc[selindex, 'status'] = 'notprocessed'
continue
# Iterate in chunks of NSTACK
for pos in range(0, len(group), NSTACK):
subset = group[pos:pos+NSTACK]
ready = (np.all(subset['stage'] == 3) &
np.all(subset['status'].isin(['completed', 'notprocessed'])))
if ready:
# Get the actual indices in dfsel for this subset
i = np.where(selindex)[0][pos:pos+NSTACK]
dfsel['stage'].iloc[i] = 4
dfsel['status'].iloc[i] = 'starting'
# Make the 'set' id unique for this subset
dfsel['set'].iloc[i] = dfsel['set'].max() + 1
else: # for information purposes
notready.append(subset)
# Assign back to the original dataframe
df.loc[index, cols] = dfsel
The first fifteen frames. Notice that
df.head(15)
The 6 GW L-filter frames are grouped into two sets, of 4 and 2 frames each, with set ids 55 and 56. The set numbering is non-sequential here (following a 5), because this subdivision happens later. By that time, an initial sequential set numbering has already been assigned, and the numbering continues from the largest unique number available. While this makes it a tad awkward to read as human, it remains simply a unique label assigned to a set.
df.iloc[25:35]
Below shows a subset of our processing table around the point where clouds came in. Notice that
df.loc[('GOTO1', 'UT1', 'CCD1')].iloc[50:65]
The list of notready
groups shows that the the frames left out from processing in stage 4, are indeed the ones that were not completed at stage 3 yet.
Notice how the groups have set ids assigned, but these are not subsetted to groups of size NSTACK
yet (the 12xL sequence).
pd.concat(notready).loc[('GOTO1', 'UT1', 'CCD1')]
The problem with the above approach is that it does everything in memory, having read the full table from the database. Once the database gets large, this approach doesn't become feasible anymore.
It would be nice if this could be done inside the database itself, using e.g. SQL. With operations like GROUP BY
, ORDER BY
and PARTITION BY
(window functions), one may get a long way, but ultimately, I don't think it's feasible.
A bonus of doing this inside the database, is that the set
id can be set to a unique sequence, which is quite common in databases. In fact, it could be a foreign key to the primary key of another table that just keeps track of the individual sets that are processed.
The alternative approach would be to preselect the data from the database table when creating the dataframe. Doing this for each combination of telescope, camera and instrument would be a first step. One could also try to only retrieve frames which have a stage 3 complete, but in a sequence of e.g. 3xL, where the middle frame is still processing, this would end up with just processing the first and third frame, and missing the second frame (the algorithm does not check for this, since the second frame may genuinely be missing: it could have been due to low quality).
Since the idea is that the processing happens nearly live, a time limit could also be introduced when retrieving files from the database: data older than a few days should be ignored. This means that any data that didn't get processed by that time, will have to processed later by hand (or at least, the processing started manually with a different valid time range).
If the dataframes still end up being too large, they could be read in chunks (e.g., of 1000 rows). Here, each last, possibly incomplete, subset should be ignored, as it may be incomplete and part of it resides in the next chunk. The chunks should therefore overlap, with the overlap size the maximum size of a sequence (which may be a lot more than MAXSEQ
). The reason for that is that we still set the status and stage of a sequence that otherwise doesn't get processed.
For processing in chunks or within a date range, the set id still has to be unique across the whole table (or at least, unique within a unique group of telescope, camera and instrument). The easiest way to do this is to grab the maximum set id from the data just read, and make sure all new set ids are larger than that. How quickly this runs beyond the number of available integers remains to be seen, but that is unlikely to be a problem.
Of course, the database table is just one step in a pipeline process. In the end, a manager tool will regularly scan the database and perform the actions above for frames that haven't reached the last processing stage. The manager will also fire off the process for a certain stage. This process could be a Python function, a Python class (that has e.g. a run()
method), a module or even a package (if these contain a __main__
block or module; cf. the -m
command line option). It could also be an external program (provided it meets certain requirements, like a proper exit code). Finally, the manager tool would also need to update the database once a process has completed (which may be with an error, in which case the status would become 'aborted').
What process needs to be run for what pipeline stage, is stored in a configuration file. This might be a (strict)YAML file or a Python file; the latter would allow very flexible rules. Processes may not just be dependent on the current stage, but also on characteristics like the image type (flatfields versus biases versus science images) or other details in the processing table (for example, binning factor could be added).
The last example indicates that the processing table should be flexible, to be extended with more necessary information. This might be done with two tables: a processing table with barely the essential information, and a user defined table that contains all the extra characteristics needed.
Further configuration details would be subsetting of sequences: in the above example, a sequence of 12 is subsetted into 3 sets of 4 frames, and a sequence of 6 is subsetted into a set of 4 and a set of 2 frames. For the latter, 2 sets of 3 frames may make more sense, or a single set of 6 frames.
There is also the matching on exposure time: for flatfields, this should actually be ignored (twilight flatfields will have changing exposure times to handle the changing sky brightness). Plus, it compares floating point numbers. Generally, this will work fine, but for some characteristics, a valid range may be used instead. For example, the CCD temperature can vary a bit around -20 C, but if the cooler is off (and the temperature is e.g. 5 C), these frames should be grouped into a different subset. Such options make both the configuration syntax and the algorithm implementation quite a bit harder.
The final result is a database driven pipeline: new or updated entries in the database effectively start a pipeline. This is in contrast to where a pipeline is started, grabs all available data from the database (the Extract part in an ETL pipeline), processes the data (Transform) and stores the data somewhere (Load). This means missing out on all the niceties of existing pipeline frameworks (for Python, Airflow and Luigi are the better known ones), including all the DAG parts, but it allows a different workflow. It could be possible to create this kind of pipeline within Airflow or Luigi, but it would require some working around the regular scheme used in these frameworks.