#!/usr/bin/env python # coding: utf-8 # ### Introduction # In this recipe, we're going to develop a small [`Workflow`](http://scikit-bio.org/docs/0.5.0/generated/skbio.workflow.Workflow.html#skbio.workflow.Workflow) to process sequence data sourced from [RNAcentral](http://rnacentral.org/). The focus of this recipe is to highlight how the `Workflow` object can decompose complex logic into smaller blocks that are easier to verify. The structure of the Notebook is to first develop functions that allow us to query RNAcentral and NCBI, where the former is our primary source of data and the latter is for obtaining lineage details. # First, we're going to import the `ElementTree` to facilitate XML parsing in one of the functions. Second, we're going to import the `requests` library which makes querying remote RESTful interfaces very easy. More detail on `requests` can be found [here](http://docs.python-requests.org/en/latest/). Third, our example will query for RNA sequence, so we'll grab the appropriate scikit-bio object. And last, we're going to import the `Workflow` object, two decorators and a helper object that defines a notion of `not None`. # In[1]: import xml.etree.ElementTree as ET import requests from skbio import RNA from skbio.workflow import Workflow, method, requires, not_none # ### Defining functions for querying remote databases # Next, let's put together a function that allows us to query RNAcentral. Details about its RESTful interface can be found [here](http://rnacentral.org/api/), and is an excellent example of a well thought out web interface. This function will allow us to perform arbitrary queries, and it'll return a fair bit of detail about each sequence found. In the future, we may add a standard mechanism for querying RNAcentral, but for right now, we'll just roll our own. # In[2]: def query_rnacentral(query, max_records=100): """Query RNACentral and yield parsed records Parameters ---------- query : str The query to use against RNACentral Returns ------- generator Yields `dict` of the individual records. See Also -------- http://rnacentral.org/api """ def _request_to_json(url, params=None, headers=None): # Perform the request, and if valid, fetch the JSON data. We're defining this as a # function as we'll be performing this type of operation a few times. r = requests.get(url, params=params, headers=headers) if r.status_code != 200: raise IOError("We received the following status code: %d" % r.status_code) return r.json() url = 'http://rnacentral.org/api/v1/rna' # Tell RNAcentral that we'd like JSON data back headers = {'Accept': 'application/json'} # These are the specific parameters for the query. For example, if our query was foo, # the resulting URL would be (feel free to copy/paste into your browser too): # http://rnacentral.org/api/v1/rna/?query=foo&page_size=25&flat=true # More information about the specific parameters and their values can be found on the # API description page: http://rnacentral.org/api params = {'query': query, # The specific query 'page_size': 25, # The number of items to return per page. 'flat': 'True'} # This expands out the detail about each record data = _request_to_json(url, params, headers) # The key `next` contains the URL of the next "page" of data next_page = data['next'] # The key `count` contains the total number of items that match our query count_in_payload = data['count'] # And, let's setup a counter to track now many results have been yielded count_yielded = 0 while count_in_payload > 0 and count_yielded < max_records: for record in data['results']: # Let's only care about records that appear to have sequence and length data sequence = record.get('sequence') length = record.get('length') if sequence is None or length is None: continue # The "flat" form from RNAcentral has a list of external references associated with each sequence. # This list might contain more than a single entry if the sequence has been deposited multiple times, # and for the purposes of our program, we're going to consider each one of these as independent. sequence = {'sequence': sequence, 'length': length} for xref in record['xrefs']['results']: # Now, let's build our notion of a record and yield it so a consumer can operate on it to_yield = sequence.copy() to_yield.update(xref) yield to_yield count_yielded += 1 # Finally, if there is another page of data, let's grab it, otherwise we'll terminate the loop if next_page: data = _request_to_json(next_page) else: break # So what do these records that we're getting from RNAcentral look like? Let's explore! For our test, let's search for "tRNA", and to be polite, we'll only request a single record as that's all we need right now. We're also going to use a helper function, `pprint`, to format the resulting `dict` better for human consumption. # In[3]: from pprint import pprint test_query = query_rnacentral('tRNA', max_records=1) pprint(next(test_query)) # Well, that's interesting. Even though we queried for "tRNA", we ended up getting back "rRNA"! Unfortunately, while the search interface of RNAcentral is quite powerful (and we're intentionally not fully leveraging it here), it is not perfect. So, we'll need to do some data scrubbing on our end if we want to only get specific records. More on this in a little. # Next, it'd be great to get the lineage associated with the records. Luckily, RNAcentral provides a `taxid`, which corresponds to the NCBI taxonomy. Unfortunately, they don't provide the full lineage. But, we can query NCBI to gather these details. NCBI provides [EUtils](http://www.ncbi.nlm.nih.gov/books/NBK25501/) that let users programmatically query their resources similarly to RNAcentral. # In[4]: def query_ncbi_lineage(taxon_id): """Obtain the NCBI lineage for a taxon ID Parameters ---------- taxon_id : int The taxon ID of interest Returns ------- list or None Each taxon name or None if unable to retreive the taxon details """ url = "http://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" # Define our parameters to use in our query params = {'db': 'taxonomy', # We want to query the taxonomy database 'id': taxon_id} # We're requesting detail on the taxon ID specifically # Make the request r = requests.get(url, params=params) # Bail if we received a bad status if r.status_code != 200: return None # NCBI returns XML, so we need to parse the "content" of our request into a usable structure tree = ET.fromstring(r.content) # We are only interested in the Lineage key within the tree. There is a fair bit of other data # present within the structure, however. lineage = next(tree.iter('Lineage')) # The lineage, if we did get it, is delimited by "; ", so let's go ahead and parse that into # a friendlier structure if lineage is not None: return [v.strip() for v in lineage.text.split(';')] else: return None # Great! Now we can query NCBI, let's do a quick test to see what lineage we get back that would be associated with the previous RNAcentral record. # In[5]: print(query_ncbi_lineage(57045)) # ### Building a database record processor without the Workflow object # Now that the querying functions are out of the way, we're going to construct a method that parses and filters the records based on some runtime critera. Specifically, this function will: # # - Obtain the record accession # - Filter any records that appear to be environmental # - Force the sequence to use the RNA alphabet # - Filter any sequences that are to short or two long # - Filter based on GC content # - Obtain the taxonomy # - Determine the "group" the sequence is in based on the taxonomy # # Most of these things are going to be optional, as is typical in a sequence processor where under some conditions you may want to filter, and may want to specify the specific criteria to filter by. # # In addition, we also want to track our "failed" records, which is also common as they can be used to debug issues in the pipeline or get dumped into some other processing stream. # # And last, we want to track processing statistics so we know how many items were filtered based on what critera. # In[6]: from collections import Counter failures = [] def failed(item): """Store a failed record This function is decomposed in case we want to add other logic for dealing with failures """ global failures failures.append(item) def record_parser(items, options): """Parse items based on our runtime options Parameters ---------- items : Iterable Iterable of `dict` that contains the record information options : dict The options that describe filtering criteria, etc Returns ------- generator Yields `dict`s of the parsed records """ stats = Counter() for item in items: # We're going to be updating item inplace, so let's make a copy to be safe item = item.copy() # The id is delimited by a ':', where the first component is the INSDC accession id_ = item['accession']['id'].split(':', 1)[0] # If the options indicate to, let's filter by "environmental" criteria. For instance, # if the description contains the word "environmental" then we want to ignore the record. # This is common when parsing out named isolates. if options['filter-description']: acc = item['accession'] tokens = {t.strip().lower() for t in acc['description'].split()} if options['filter-description'].intersection(tokens): failed(item) stats['filter-description'] += 1 continue # Force the record to use an RNA alphabet item['sequence'] = item['sequence'].replace('T', 'U') # Filter out records that don't meet a minimum length requirement if options['filter-length'] and item['length'] < options['minimum-length']: failed(item) stats['minimum-length'] += 1 continue # Filter out records that don't meet a maximum length requirement if options['filter-length'] and item['length'] > options['maximum-length']: failed(item) stats['maximum-length'] += 1 continue # Compute GC if necessary if options['compute-gc']: gc = float(item['sequence'].count('G') + item['sequence'].count('C')) gc /= item['length'] else: gc = None # Filter by a minimum GC content if options['compute-gc'] and options['minimum-gc'] and gc is not None: if gc < options['minimum-gc']: failed(item) stats['minimum-gc'] += 1 continue # Filter by a maximum GC content if options['compute-gc'] and options['maximum-gc'] and gc is not None: if gc < options['maximum-gc']: failed(item) stats['maximum-gc'] += 1 continue # If we have a taxonomy ID, then let's grab the lineage if item.get('taxid', None) is not None: lineage = query_ncbi_lineage(item['taxid']) else: lineage = None # Determine the group based on the lineage group = lineage[-1] if lineage is not None else 'unassigned' # Finally, let's save a little bit more state and yield for a subsequent consumer item['gc'] = gc item['group'] = group item['lineage'] = lineage item['id'] = id_ item['sequence'] = RNA(item['sequence'], metadata={'id': item['id']}, validate=True) yield item # So that's a pretty complex function. There are a lot of conditionals and short circuiting the loop with continues. Though this particular example probably is not extremely difficult to verify its correctness, it is not difficult to imagine a more complex process (such as QC for amplicon data) that would be much more difficult to verify. Additionally, we can't retreive the `stats` defined without making the return more complex, or the use of global variables. Let's quickly play with the function though so we can see how it behaves. First, lets not define any options and examine the result. # In[7]: options = {} items = query_rnacentral('tRNA') records = record_parser(items, options) try: pprint(next(records)) except Exception as e: print("We raised an exception!") print("%s: %s" % (type(e), e.args)) # Okay, so that's a bit annoying. We could update the function to issue `dict.get` calls, and increase the complexity of our conditional logic, but for how, lets just define usable options. # In[8]: options = {'filter-description': set(), 'filter-length': False, 'minimum-length': None, 'maximum-length': None, 'compute-gc': True, 'minimum-gc': None, 'maximum-gc': None} items = query_rnacentral('tRNA') records = record_parser(items, options) pprint(next(records)) # ### Building a better database record processor with the Workflow object # That worked, but can we do better? One large issue with the above function is that there are so many different paths through the function, that it is difficult to validate the individual functions. Additionally, the boilerplate logic for checking options and state is replicated numerous times. In a real world situation, additional features will likely get added in as well further increasing the complexity of the method, and the likelihood of bugs creeping in. # # The goal of the `Workflow` is to centralize the boilerplate logic, and allow the relatively small blocks of logic that rely on shared state (such as the GC filtering) to be decomposed. To do this, the `Workflow` relies on function decorators: `method` and `requires`. The structure of the object is such that each individual block of code (such as the minimum length filter) can be explicitly unit-tested easily. # # `method` tells the `Workflow` that this is a method to be executed for every item being operated on. It takes a `priority` parameter that indicates its relative order of execution in the workflow, where a higher value means it will be executed earlier. If a method sets `failed`, the item is considered to have "failed" and, by default, none of the other methods are executed (e.g., shortcircuiting like the `continue` above). # # The second decorator, `requires`, performs the option checking to verify that specific runtime options are set, have particular values (e.g., execute method foo if option X has value Y), and to check on state requirements. If a requirement is not met, the function is not evaluated. # # Last, the `Workflow` allows the developer to specify a callback to be executed on a successful processing, and one that can be executed on a failed processing. # # Let's take a look at what this looks like. First, we're going to define two helper functions that simply test for the presence of some piece of state. # In[9]: def has_gc(item): """Test if state has gc computed""" return item.get('gc') is not None def has_taxid(item): """Test if state has a valid taxid""" return item.get('taxid') is not None # Next, we're going to define the actual workflow itself. The first method we'll define is a special one that is executed for every record called `initialize_state`. This method is responsible for resetting the notion of `state`, which _should_ be independent for every item. Following this, we'll define the actual methods that'll do the work and decorate on their various requirements. # In[10]: class RecordParser(Workflow): def initialize_state(self, item): """Initialize state based on the item being processed""" self.state = item.copy() self.state['gc'] = None self.state['group'] = None self.state['lineage'] = None self.state['id'] = None @method(priority=99) @requires(option='filter-description', values=not_none) # not_none means any option value except None is valid def filter_description(self): """Filter records depending on keywords present in the description""" acc = self.state['accession'] tokens = {t.strip().lower() for t in acc['description'].split()} if self.options['filter-description'].intersection(tokens): self.failed = True self.stats['filter-description'] += 1 @method(priority=95) # This method has no requirements def force_to_rna(self): """Convert any thymine to uracil""" self.state['sequence'] = self.state['sequence'].replace('T', 'U') @method(priority=89) @requires(option='filter-length', values=True) # Require 'filter-length' to be True @requires(option='minimum-length', values=not_none) # Require that the 'minimum-length' is any value other than None def minimum_length(self): if self.state['length'] < self.options['minimum-length']: self.failed = True self.stats['minimum-length'] += 1 @method(priority=88) @requires(option='filter-length', values=True) @requires(option='maximum-length', values=not_none) # Require that the 'maximum-length' is any value other than None def maximum_length(self): if self.state['length'] > self.options['maximum-length']: self.failed = True self.stats['maximum-length'] += 1 @method(priority=80) @requires(option='compute-gc', values=True) def compute_gc(self): gc = self.state['sequence'].count('G') + self.state['sequence'].count('C') self.state['gc'] = float(gc) / self.state['length'] @method(priority=79) @requires(option='minimum-gc', values=not_none) @requires(state=has_gc) # Require that has_gc(self.state) evaluates to True def minimum_gc(self): if self.state['gc'] < self.options['minimum-gc']: self.failed = True self.stats['minimum-gc'] += 1 @method(priority=78) @requires(option='maximum-gc', values=not_none) @requires(state=has_gc) # Require that has_gc(self.state) evaluates to True def maximum_gc(self): if self.state['gc'] > self.options['maximum-gc']: self.failed = True self.stats['maximum-gc'] += 1 @method(priority=3) @requires(state=has_taxid) # Require that has_taxid(self.state) evaluates to True def fetch_lineage(self): self.state['lineage'] = query_ncbi_lineage(self.state['taxid']) @method(priority=2) def assign_to_group(self): self.state['group'] = self.state['lineage'][-1] if self.state['lineage'] is not None else 'unassigned' @method(priority=1) def cast_to_rnasequence(self): self.state['sequence'] = RNA(self.state['sequence'], metadata={'id': self.state['id']}, validate=True) # While the workflow itself is complex, each individual block of logic is small making it much easier to test out the components. Integration testing is of course still a very good idea. # # Now, let's take the object out for a spin. First, we'll instantiate the workflow. We can predefine `state` on instantiation which is useful if it is possible to preallocate memory (instaed of always creating an object in `initialize_state`), but this isn't necessary but a point of optimization. Second, we'll specify the same options we defined in our singular function above. And third, we're going to define a `stats` object to track information about failures (note, `state` is required, `options` is optional, and anything in `kwargs` gets set as a member variable). # # To get a record out, we pass in our `items` iterator (created from the `query_rna_central` function defined earlier), and simply get the next item. On `__call__`, we can also pass in callbacks which we'll show shortly. # In[11]: records = RecordParser(state=None, options=options, stats=Counter()) pprint(next(records(items))) pprint(records.stats) # Great, things appear to be working as we'd expect. To finish this notebook off, lets do three more things. First, we'll construct some example callback functions. Second, we're going to change the options to trigger some failures. And last, we're going to enable debug mode to gather additional context. # # By default, no failure callback is defined as it is assumed that failures are ignored. In addition, there actually is always a success callback but it is by default just a passthrough. Each callback is provided `self`, or the entire workflow object. By default, success simply just returns `self.state`. These callbacks can be handy if you want to automatically stream results to a database, file, or other consumer. # In[12]: def fail_callback(wf): """Print out the ID associated with the failure""" print("We had a failure with %s" % wf.state['accession']['id']) def success_callback(wf): """Print out the ID associated with the success""" print("We successfully processed %s" % wf.state['accession']['id']) # Let's filter out those pesky ribosomal RNAs options['filter-description'] = {'ribosomal', 'rrna'} records = RecordParser(state=None, options=options, stats=Counter()) next(records(items, success_callback=success_callback, fail_callback=fail_callback)) pprint(records.stats) # It would be cumbersome to interrogate `stats` everytime something failed in order to figure out why it failed. The workflow assumes that you typically don't care about why something failed specifically (as it also adds a fair bit of overhead), but it is very useful to know when debugging. We can enable this tracking it by setting `debug=True`. To make this more interesting, let's trigger a failure deep in the workflow as the description filter happens first. # In[13]: options['filter-description'] = None options['compute-gc'] = True options['minimum-gc'] = 1.0 records = RecordParser(state=None, options=options, stats=Counter(), debug=True) next(records(items, success_callback=success_callback, fail_callback=fail_callback)) # Okay, so we understandably had a failure as the minumum GC was set at 100%. What type of debug detail do we have? First is the trace, which contains `tuple`s of `("method_name", order_of_execution)`. A low value for the `order_of_execution` means it was executed earlier relative to a higher number. The value is assigned based on the priority, so in the subsequent example, there isn't a method associated with `0` as the highest priority method (`filter_description`) was not executed due to the options we set. # # What we can see is that the highest number in the execution order is associated with `minimum_gc` indicating that that method failed. # In[14]: pprint(records.debug_trace) # We can also get runtime information (in seconds), where the key in the runtime `dict` corresponds to a `tuple` in the `debug_trace`. # In[15]: pprint(records.debug_runtime) # And finally, we have pre- and post-state tracking which lets us examine `state` prior to and following each method execution. These are also keyed by the `tuple`s in the `debug_trace`. For the purposes of example, let's take a look at the `compute_gc` method as that one modifies state, whereas the `minimum_gc` method, though triggering the failure, does not modify the `state` object. And, to minimize cognitive load, let's just look at what happened to the `gc` key. # In[16]: print("GC prior to the execution of `compute_gc`") print(records.debug_pre_state[('compute_gc', 4)]['gc']) print("\nGC following the execution of `compute_gc`") print(records.debug_post_state[('compute_gc', 4)]['gc'])