In this recipe, we're going to develop a small Workflow
to process sequence data sourced from RNAcentral. The focus of this recipe is to highlight how the Workflow
object can decompose complex logic into smaller blocks that are easier to verify. The structure of the Notebook is to first develop functions that allow us to query RNAcentral and NCBI, where the former is our primary source of data and the latter is for obtaining lineage details.
First, we're going to import the ElementTree
to facilitate XML parsing in one of the functions. Second, we're going to import the requests
library which makes querying remote RESTful interfaces very easy. More detail on requests
can be found here. Third, our example will query for RNA sequence, so we'll grab the appropriate scikit-bio object. And last, we're going to import the Workflow
object, two decorators and a helper object that defines a notion of not None
.
import xml.etree.ElementTree as ET
import requests
from skbio import RNA
from skbio.workflow import Workflow, method, requires, not_none
Next, let's put together a function that allows us to query RNAcentral. Details about its RESTful interface can be found here, and is an excellent example of a well thought out web interface. This function will allow us to perform arbitrary queries, and it'll return a fair bit of detail about each sequence found. In the future, we may add a standard mechanism for querying RNAcentral, but for right now, we'll just roll our own.
def query_rnacentral(query, max_records=100):
"""Query RNACentral and yield parsed records
Parameters
----------
query : str
The query to use against RNACentral
Returns
-------
generator
Yields `dict` of the individual records.
See Also
--------
http://rnacentral.org/api
"""
def _request_to_json(url, params=None, headers=None):
# Perform the request, and if valid, fetch the JSON data. We're defining this as a
# function as we'll be performing this type of operation a few times.
r = requests.get(url, params=params, headers=headers)
if r.status_code != 200:
raise IOError("We received the following status code: %d" % r.status_code)
return r.json()
url = 'http://rnacentral.org/api/v1/rna'
# Tell RNAcentral that we'd like JSON data back
headers = {'Accept': 'application/json'}
# These are the specific parameters for the query. For example, if our query was foo,
# the resulting URL would be (feel free to copy/paste into your browser too):
# http://rnacentral.org/api/v1/rna/?query=foo&page_size=25&flat=true
# More information about the specific parameters and their values can be found on the
# API description page: http://rnacentral.org/api
params = {'query': query, # The specific query
'page_size': 25, # The number of items to return per page.
'flat': 'True'} # This expands out the detail about each record
data = _request_to_json(url, params, headers)
# The key `next` contains the URL of the next "page" of data
next_page = data['next']
# The key `count` contains the total number of items that match our query
count_in_payload = data['count']
# And, let's setup a counter to track now many results have been yielded
count_yielded = 0
while count_in_payload > 0 and count_yielded < max_records:
for record in data['results']:
# Let's only care about records that appear to have sequence and length data
sequence = record.get('sequence')
length = record.get('length')
if sequence is None or length is None:
continue
# The "flat" form from RNAcentral has a list of external references associated with each sequence.
# This list might contain more than a single entry if the sequence has been deposited multiple times,
# and for the purposes of our program, we're going to consider each one of these as independent.
sequence = {'sequence': sequence, 'length': length}
for xref in record['xrefs']['results']:
# Now, let's build our notion of a record and yield it so a consumer can operate on it
to_yield = sequence.copy()
to_yield.update(xref)
yield to_yield
count_yielded += 1
# Finally, if there is another page of data, let's grab it, otherwise we'll terminate the loop
if next_page:
data = _request_to_json(next_page)
else:
break
So what do these records that we're getting from RNAcentral look like? Let's explore! For our test, let's search for "tRNA", and to be polite, we'll only request a single record as that's all we need right now. We're also going to use a helper function, pprint
, to format the resulting dict
better for human consumption.
from pprint import pprint
test_query = query_rnacentral('tRNA', max_records=1)
pprint(next(test_query))
{'accession': {'citations': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rRNA/citations', 'description': 'uncultured bacterium partial 16S ribosomal ' 'RNA', 'expert_db_url': '', 'external_id': '', 'gene': '', 'id': 'DQ668905.1:1..363:rRNA', 'optional_id': '', 'organelle': '', 'product': '16S ribosomal RNA', 'rna_type': 'rRNA', 'source_url': 'http://www.ebi.ac.uk/ena/data/view/Non-coding:DQ668905.1:1..363:rRNA', 'species': 'environmental samples uncultured bacterium', 'url': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rRNA/info'}, 'database': 'ENA', 'first_seen': '2014-05-29', 'is_active': True, 'is_expert_db': False, 'last_seen': '2016-03-15', 'length': 363, 'sequence': 'GUAUGCAACUUACCUUUUACUAGAGAAUAGCCAAGAGAAAUUUUGAUUAAUGCUCUAUGUUCUUAUUUACUCGCAUGAGUAAAUAAGCAAAGCUCCGGCGGUAAAAGAUGGGCAUGCGUCCUAUUAGCUUGUAGGUGAGGUAAUGGCUCACCUAAGCUCCGAUAGGUAGGGGUCCUGAGAGGGAGAUCCCCCACACUGGUACUGAGACACGGACCAGACUUCUACGGAAGGCAGCAGUAAGGAAUAUUGGACAAUGGAGGCAACUCUGAUCCAGCCAUGCCGCGUGAAGGAAGACGGCCUUAUGGGUUGUAAACUUCUUUUAUACAGGAAGAAACCUUUCCACGUGUGGAAAGCUGACGGUAC', 'taxid': 77133}
Well, that's interesting. Even though we queried for "tRNA", we ended up getting back "rRNA"! Unfortunately, while the search interface of RNAcentral is quite powerful (and we're intentionally not fully leveraging it here), it is not perfect. So, we'll need to do some data scrubbing on our end if we want to only get specific records. More on this in a little.
Next, it'd be great to get the lineage associated with the records. Luckily, RNAcentral provides a taxid
, which corresponds to the NCBI taxonomy. Unfortunately, they don't provide the full lineage. But, we can query NCBI to gather these details. NCBI provides EUtils that let users programmatically query their resources similarly to RNAcentral.
def query_ncbi_lineage(taxon_id):
"""Obtain the NCBI lineage for a taxon ID
Parameters
----------
taxon_id : int
The taxon ID of interest
Returns
-------
list or None
Each taxon name or None if unable to retreive the taxon details
"""
url = "http://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
# Define our parameters to use in our query
params = {'db': 'taxonomy', # We want to query the taxonomy database
'id': taxon_id} # We're requesting detail on the taxon ID specifically
# Make the request
r = requests.get(url, params=params)
# Bail if we received a bad status
if r.status_code != 200:
return None
# NCBI returns XML, so we need to parse the "content" of our request into a usable structure
tree = ET.fromstring(r.content)
# We are only interested in the Lineage key within the tree. There is a fair bit of other data
# present within the structure, however.
lineage = next(tree.iter('Lineage'))
# The lineage, if we did get it, is delimited by "; ", so let's go ahead and parse that into
# a friendlier structure
if lineage is not None:
return [v.strip() for v in lineage.text.split(';')]
else:
return None
Great! Now we can query NCBI, let's do a quick test to see what lineage we get back that would be associated with the previous RNAcentral record.
print(query_ncbi_lineage(57045))
['cellular organisms', 'Bacteria', 'Proteobacteria', 'Gammaproteobacteria', 'Enterobacteriales', 'Enterobacteriaceae', 'Salmonella', 'Salmonella enterica', 'Salmonella enterica subsp. enterica']
Now that the querying functions are out of the way, we're going to construct a method that parses and filters the records based on some runtime critera. Specifically, this function will:
Most of these things are going to be optional, as is typical in a sequence processor where under some conditions you may want to filter, and may want to specify the specific criteria to filter by.
In addition, we also want to track our "failed" records, which is also common as they can be used to debug issues in the pipeline or get dumped into some other processing stream.
And last, we want to track processing statistics so we know how many items were filtered based on what critera.
from collections import Counter
failures = []
def failed(item):
"""Store a failed record
This function is decomposed in case we want to add other logic for dealing
with failures
"""
global failures
failures.append(item)
def record_parser(items, options):
"""Parse items based on our runtime options
Parameters
----------
items : Iterable
Iterable of `dict` that contains the record information
options : dict
The options that describe filtering criteria, etc
Returns
-------
generator
Yields `dict`s of the parsed records
"""
stats = Counter()
for item in items:
# We're going to be updating item inplace, so let's make a copy to be safe
item = item.copy()
# The id is delimited by a ':', where the first component is the INSDC accession
id_ = item['accession']['id'].split(':', 1)[0]
# If the options indicate to, let's filter by "environmental" criteria. For instance,
# if the description contains the word "environmental" then we want to ignore the record.
# This is common when parsing out named isolates.
if options['filter-description']:
acc = item['accession']
tokens = {t.strip().lower() for t in acc['description'].split()}
if options['filter-description'].intersection(tokens):
failed(item)
stats['filter-description'] += 1
continue
# Force the record to use an RNA alphabet
item['sequence'] = item['sequence'].replace('T', 'U')
# Filter out records that don't meet a minimum length requirement
if options['filter-length'] and item['length'] < options['minimum-length']:
failed(item)
stats['minimum-length'] += 1
continue
# Filter out records that don't meet a maximum length requirement
if options['filter-length'] and item['length'] > options['maximum-length']:
failed(item)
stats['maximum-length'] += 1
continue
# Compute GC if necessary
if options['compute-gc']:
gc = float(item['sequence'].count('G') + item['sequence'].count('C'))
gc /= item['length']
else:
gc = None
# Filter by a minimum GC content
if options['compute-gc'] and options['minimum-gc'] and gc is not None:
if gc < options['minimum-gc']:
failed(item)
stats['minimum-gc'] += 1
continue
# Filter by a maximum GC content
if options['compute-gc'] and options['maximum-gc'] and gc is not None:
if gc < options['maximum-gc']:
failed(item)
stats['maximum-gc'] += 1
continue
# If we have a taxonomy ID, then let's grab the lineage
if item.get('taxid', None) is not None:
lineage = query_ncbi_lineage(item['taxid'])
else:
lineage = None
# Determine the group based on the lineage
group = lineage[-1] if lineage is not None else 'unassigned'
# Finally, let's save a little bit more state and yield for a subsequent consumer
item['gc'] = gc
item['group'] = group
item['lineage'] = lineage
item['id'] = id_
item['sequence'] = RNA(item['sequence'], metadata={'id': item['id']}, validate=True)
yield item
So that's a pretty complex function. There are a lot of conditionals and short circuiting the loop with continues. Though this particular example probably is not extremely difficult to verify its correctness, it is not difficult to imagine a more complex process (such as QC for amplicon data) that would be much more difficult to verify. Additionally, we can't retreive the stats
defined without making the return more complex, or the use of global variables. Let's quickly play with the function though so we can see how it behaves. First, lets not define any options and examine the result.
options = {}
items = query_rnacentral('tRNA')
records = record_parser(items, options)
try:
pprint(next(records))
except Exception as e:
print("We raised an exception!")
print("%s: %s" % (type(e), e.args))
We raised an exception! <class 'KeyError'>: ('filter-description',)
Okay, so that's a bit annoying. We could update the function to issue dict.get
calls, and increase the complexity of our conditional logic, but for how, lets just define usable options.
options = {'filter-description': set(),
'filter-length': False,
'minimum-length': None,
'maximum-length': None,
'compute-gc': True,
'minimum-gc': None,
'maximum-gc': None}
items = query_rnacentral('tRNA')
records = record_parser(items, options)
pprint(next(records))
{'accession': {'citations': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rRNA/citations', 'description': 'uncultured bacterium partial 16S ribosomal ' 'RNA', 'expert_db_url': '', 'external_id': '', 'gene': '', 'id': 'DQ668905.1:1..363:rRNA', 'optional_id': '', 'organelle': '', 'product': '16S ribosomal RNA', 'rna_type': 'rRNA', 'source_url': 'http://www.ebi.ac.uk/ena/data/view/Non-coding:DQ668905.1:1..363:rRNA', 'species': 'environmental samples uncultured bacterium', 'url': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rRNA/info'}, 'database': 'ENA', 'first_seen': '2014-05-29', 'gc': 0.465564738292011, 'group': 'environmental samples', 'id': 'DQ668905.1', 'is_active': True, 'is_expert_db': False, 'last_seen': '2016-03-15', 'length': 363, 'lineage': ['cellular organisms', 'Bacteria', 'environmental samples'], 'sequence': RNA --------------------------------------------------------------------- Metadata: 'id': 'DQ668905.1' Stats: length: 363 has gaps: False has degenerates: False has definites: True GC-content: 46.56% --------------------------------------------------------------------- 0 GUAUGCAACU UACCUUUUAC UAGAGAAUAG CCAAGAGAAA UUUUGAUUAA UGCUCUAUGU 60 UCUUAUUUAC UCGCAUGAGU AAAUAAGCAA AGCUCCGGCG GUAAAAGAUG GGCAUGCGUC ... 300 UAUGGGUUGU AAACUUCUUU UAUACAGGAA GAAACCUUUC CACGUGUGGA AAGCUGACGG 360 UAC, 'taxid': 77133}
That worked, but can we do better? One large issue with the above function is that there are so many different paths through the function, that it is difficult to validate the individual functions. Additionally, the boilerplate logic for checking options and state is replicated numerous times. In a real world situation, additional features will likely get added in as well further increasing the complexity of the method, and the likelihood of bugs creeping in.
The goal of the Workflow
is to centralize the boilerplate logic, and allow the relatively small blocks of logic that rely on shared state (such as the GC filtering) to be decomposed. To do this, the Workflow
relies on function decorators: method
and requires
. The structure of the object is such that each individual block of code (such as the minimum length filter) can be explicitly unit-tested easily.
method
tells the Workflow
that this is a method to be executed for every item being operated on. It takes a priority
parameter that indicates its relative order of execution in the workflow, where a higher value means it will be executed earlier. If a method sets failed
, the item is considered to have "failed" and, by default, none of the other methods are executed (e.g., shortcircuiting like the continue
above).
The second decorator, requires
, performs the option checking to verify that specific runtime options are set, have particular values (e.g., execute method foo if option X has value Y), and to check on state requirements. If a requirement is not met, the function is not evaluated.
Last, the Workflow
allows the developer to specify a callback to be executed on a successful processing, and one that can be executed on a failed processing.
Let's take a look at what this looks like. First, we're going to define two helper functions that simply test for the presence of some piece of state.
def has_gc(item):
"""Test if state has gc computed"""
return item.get('gc') is not None
def has_taxid(item):
"""Test if state has a valid taxid"""
return item.get('taxid') is not None
Next, we're going to define the actual workflow itself. The first method we'll define is a special one that is executed for every record called initialize_state
. This method is responsible for resetting the notion of state
, which should be independent for every item. Following this, we'll define the actual methods that'll do the work and decorate on their various requirements.
class RecordParser(Workflow):
def initialize_state(self, item):
"""Initialize state based on the item being processed"""
self.state = item.copy()
self.state['gc'] = None
self.state['group'] = None
self.state['lineage'] = None
self.state['id'] = None
@method(priority=99)
@requires(option='filter-description', values=not_none) # not_none means any option value except None is valid
def filter_description(self):
"""Filter records depending on keywords present in the description"""
acc = self.state['accession']
tokens = {t.strip().lower() for t in acc['description'].split()}
if self.options['filter-description'].intersection(tokens):
self.failed = True
self.stats['filter-description'] += 1
@method(priority=95) # This method has no requirements
def force_to_rna(self):
"""Convert any thymine to uracil"""
self.state['sequence'] = self.state['sequence'].replace('T', 'U')
@method(priority=89)
@requires(option='filter-length', values=True) # Require 'filter-length' to be True
@requires(option='minimum-length', values=not_none) # Require that the 'minimum-length' is any value other than None
def minimum_length(self):
if self.state['length'] < self.options['minimum-length']:
self.failed = True
self.stats['minimum-length'] += 1
@method(priority=88)
@requires(option='filter-length', values=True)
@requires(option='maximum-length', values=not_none) # Require that the 'maximum-length' is any value other than None
def maximum_length(self):
if self.state['length'] > self.options['maximum-length']:
self.failed = True
self.stats['maximum-length'] += 1
@method(priority=80)
@requires(option='compute-gc', values=True)
def compute_gc(self):
gc = self.state['sequence'].count('G') + self.state['sequence'].count('C')
self.state['gc'] = float(gc) / self.state['length']
@method(priority=79)
@requires(option='minimum-gc', values=not_none)
@requires(state=has_gc) # Require that has_gc(self.state) evaluates to True
def minimum_gc(self):
if self.state['gc'] < self.options['minimum-gc']:
self.failed = True
self.stats['minimum-gc'] += 1
@method(priority=78)
@requires(option='maximum-gc', values=not_none)
@requires(state=has_gc) # Require that has_gc(self.state) evaluates to True
def maximum_gc(self):
if self.state['gc'] > self.options['maximum-gc']:
self.failed = True
self.stats['maximum-gc'] += 1
@method(priority=3)
@requires(state=has_taxid) # Require that has_taxid(self.state) evaluates to True
def fetch_lineage(self):
self.state['lineage'] = query_ncbi_lineage(self.state['taxid'])
@method(priority=2)
def assign_to_group(self):
self.state['group'] = self.state['lineage'][-1] if self.state['lineage'] is not None else 'unassigned'
@method(priority=1)
def cast_to_rnasequence(self):
self.state['sequence'] = RNA(self.state['sequence'], metadata={'id': self.state['id']}, validate=True)
While the workflow itself is complex, each individual block of logic is small making it much easier to test out the components. Integration testing is of course still a very good idea.
Now, let's take the object out for a spin. First, we'll instantiate the workflow. We can predefine state
on instantiation which is useful if it is possible to preallocate memory (instaed of always creating an object in initialize_state
), but this isn't necessary but a point of optimization. Second, we'll specify the same options we defined in our singular function above. And third, we're going to define a stats
object to track information about failures (note, state
is required, options
is optional, and anything in kwargs
gets set as a member variable).
To get a record out, we pass in our items
iterator (created from the query_rna_central
function defined earlier), and simply get the next item. On __call__
, we can also pass in callbacks which we'll show shortly.
records = RecordParser(state=None, options=options, stats=Counter())
pprint(next(records(items)))
pprint(records.stats)
{'accession': {'citations': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rfam/citations', 'description': 'uncultured bacterium Bacterial small subunit ' 'ribosomal RNA', 'expert_db_url': 'http://rfam.xfam.org/family/RF00177', 'external_id': 'RF00177', 'gene': '', 'id': 'DQ668905.1:1..363:rfam', 'optional_id': 'SSU_rRNA_bacteria', 'organelle': '', 'product': 'Bacterial small subunit ribosomal RNA', 'rna_type': 'rRNA', 'source_url': '', 'species': 'uncultured bacterium', 'url': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rfam/info'}, 'database': 'Rfam', 'first_seen': '2016-03-15', 'gc': 0.465564738292011, 'group': 'environmental samples', 'id': None, 'is_active': True, 'is_expert_db': False, 'last_seen': '2016-03-15', 'length': 363, 'lineage': ['cellular organisms', 'Bacteria', 'environmental samples'], 'sequence': RNA --------------------------------------------------------------------- Metadata: 'id': None Stats: length: 363 has gaps: False has degenerates: False has definites: True GC-content: 46.56% --------------------------------------------------------------------- 0 GUAUGCAACU UACCUUUUAC UAGAGAAUAG CCAAGAGAAA UUUUGAUUAA UGCUCUAUGU 60 UCUUAUUUAC UCGCAUGAGU AAAUAAGCAA AGCUCCGGCG GUAAAAGAUG GGCAUGCGUC ... 300 UAUGGGUUGU AAACUUCUUU UAUACAGGAA GAAACCUUUC CACGUGUGGA AAGCUGACGG 360 UAC, 'taxid': 77133} Counter()
Great, things appear to be working as we'd expect. To finish this notebook off, lets do three more things. First, we'll construct some example callback functions. Second, we're going to change the options to trigger some failures. And last, we're going to enable debug mode to gather additional context.
By default, no failure callback is defined as it is assumed that failures are ignored. In addition, there actually is always a success callback but it is by default just a passthrough. Each callback is provided self
, or the entire workflow object. By default, success simply just returns self.state
. These callbacks can be handy if you want to automatically stream results to a database, file, or other consumer.
def fail_callback(wf):
"""Print out the ID associated with the failure"""
print("We had a failure with %s" % wf.state['accession']['id'])
def success_callback(wf):
"""Print out the ID associated with the success"""
print("We successfully processed %s" % wf.state['accession']['id'])
# Let's filter out those pesky ribosomal RNAs
options['filter-description'] = {'ribosomal', 'rrna'}
records = RecordParser(state=None, options=options, stats=Counter())
next(records(items, success_callback=success_callback, fail_callback=fail_callback))
pprint(records.stats)
We had a failure with DQ668905.1:1..363:silva Counter({'filter-description': 1})
It would be cumbersome to interrogate stats
everytime something failed in order to figure out why it failed. The workflow assumes that you typically don't care about why something failed specifically (as it also adds a fair bit of overhead), but it is very useful to know when debugging. We can enable this tracking it by setting debug=True
. To make this more interesting, let's trigger a failure deep in the workflow as the description filter happens first.
options['filter-description'] = None
options['compute-gc'] = True
options['minimum-gc'] = 1.0
records = RecordParser(state=None, options=options, stats=Counter(), debug=True)
next(records(items, success_callback=success_callback, fail_callback=fail_callback))
We had a failure with KC615826.1:1..558:rRNA
Okay, so we understandably had a failure as the minumum GC was set at 100%. What type of debug detail do we have? First is the trace, which contains tuple
s of ("method_name", order_of_execution)
. A low value for the order_of_execution
means it was executed earlier relative to a higher number. The value is assigned based on the priority, so in the subsequent example, there isn't a method associated with 0
as the highest priority method (filter_description
) was not executed due to the options we set.
What we can see is that the highest number in the execution order is associated with minimum_gc
indicating that that method failed.
pprint(records.debug_trace)
{('compute_gc', 4), ('minimum_gc', 5), ('force_to_rna', 1)}
We can also get runtime information (in seconds), where the key in the runtime dict
corresponds to a tuple
in the debug_trace
.
pprint(records.debug_runtime)
{('compute_gc', 4): 1.0728836059570312e-05, ('force_to_rna', 1): 3.0994415283203125e-06, ('minimum_gc', 5): 8.821487426757812e-06}
And finally, we have pre- and post-state tracking which lets us examine state
prior to and following each method execution. These are also keyed by the tuple
s in the debug_trace
. For the purposes of example, let's take a look at the compute_gc
method as that one modifies state, whereas the minimum_gc
method, though triggering the failure, does not modify the state
object. And, to minimize cognitive load, let's just look at what happened to the gc
key.
print("GC prior to the execution of `compute_gc`")
print(records.debug_pre_state[('compute_gc', 4)]['gc'])
print("\nGC following the execution of `compute_gc`")
print(records.debug_post_state[('compute_gc', 4)]['gc'])
GC prior to the execution of `compute_gc` None GC following the execution of `compute_gc` 0.2616487455197133