Introduction

In this recipe, we're going to develop a small Workflow to process sequence data sourced from RNAcentral. The focus of this recipe is to highlight how the Workflow object can decompose complex logic into smaller blocks that are easier to verify. The structure of the Notebook is to first develop functions that allow us to query RNAcentral and NCBI, where the former is our primary source of data and the latter is for obtaining lineage details.

First, we're going to import the ElementTree to facilitate XML parsing in one of the functions. Second, we're going to import the requests library which makes querying remote RESTful interfaces very easy. More detail on requests can be found here. Third, our example will query for RNA sequence, so we'll grab the appropriate scikit-bio object. And last, we're going to import the Workflow object, two decorators and a helper object that defines a notion of not None.

In [1]:
import xml.etree.ElementTree as ET

import requests

from skbio import RNA
from skbio.workflow import Workflow, method, requires, not_none

Defining functions for querying remote databases

Next, let's put together a function that allows us to query RNAcentral. Details about its RESTful interface can be found here, and is an excellent example of a well thought out web interface. This function will allow us to perform arbitrary queries, and it'll return a fair bit of detail about each sequence found. In the future, we may add a standard mechanism for querying RNAcentral, but for right now, we'll just roll our own.

In [2]:
def query_rnacentral(query, max_records=100):
    """Query RNACentral and yield parsed records
    
    Parameters
    ----------
    query : str
        The query to use against RNACentral
        
    Returns
    -------
    generator
        Yields `dict` of the individual records.
    
    See Also
    --------
    http://rnacentral.org/api
    """
    def _request_to_json(url, params=None, headers=None):
        # Perform the request, and if valid, fetch the JSON data. We're defining this as a
        # function as we'll be performing this type of operation a few times.
        r = requests.get(url, params=params, headers=headers)
        if r.status_code != 200:
            raise IOError("We received the following status code: %d" % r.status_code)

        return r.json()

    url = 'http://rnacentral.org/api/v1/rna'
    
    # Tell RNAcentral that we'd like JSON data back
    headers = {'Accept': 'application/json'}
    
    # These are the specific parameters for the query. For example, if our query was foo,
    # the resulting URL would be (feel free to copy/paste into your browser too):
    # http://rnacentral.org/api/v1/rna/?query=foo&page_size=25&flat=true
    
    # More information about the specific parameters and their values can be found on the
    # API description page: http://rnacentral.org/api
    params = {'query': query,   # The specific query
              'page_size': 25,  # The number of items to return per page. 
              'flat': 'True'}   # This expands out the detail about each record
    
    data = _request_to_json(url, params, headers)
    
    # The key `next` contains the URL of the next "page" of data
    next_page = data['next']
    
    # The key `count` contains the total number of items that match our query
    count_in_payload = data['count']
    
    # And, let's setup a counter to track now many results have been yielded
    count_yielded = 0
    
    while count_in_payload > 0 and count_yielded < max_records:
        for record in data['results']:
            # Let's only care about records that appear to have sequence and length data
            sequence = record.get('sequence')
            length = record.get('length')
            
            if sequence is None or length is None:
                continue
                
            # The "flat" form from RNAcentral has a list of external references associated with each sequence.
            # This list might contain more than a single entry if the sequence has been deposited multiple times,
            # and for the purposes of our program, we're going to consider each one of these as independent.
            sequence = {'sequence': sequence, 'length': length}    
            for xref in record['xrefs']['results']:
                # Now, let's build our notion of a record and yield it so a consumer can operate on it
                to_yield = sequence.copy()
                to_yield.update(xref)
                yield to_yield
                count_yielded += 1
        
        # Finally, if there is another page of data, let's grab it, otherwise we'll terminate the loop
        if next_page:
            data = _request_to_json(next_page)
        else:
            break

So what do these records that we're getting from RNAcentral look like? Let's explore! For our test, let's search for "tRNA", and to be polite, we'll only request a single record as that's all we need right now. We're also going to use a helper function, pprint, to format the resulting dict better for human consumption.

In [3]:
from pprint import pprint
test_query = query_rnacentral('tRNA', max_records=1)
pprint(next(test_query))
{'accession': {'citations': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rRNA/citations',
               'description': 'uncultured bacterium partial 16S ribosomal '
                              'RNA',
               'expert_db_url': '',
               'external_id': '',
               'gene': '',
               'id': 'DQ668905.1:1..363:rRNA',
               'optional_id': '',
               'organelle': '',
               'product': '16S ribosomal RNA',
               'rna_type': 'rRNA',
               'source_url': 'http://www.ebi.ac.uk/ena/data/view/Non-coding:DQ668905.1:1..363:rRNA',
               'species': 'environmental samples uncultured bacterium',
               'url': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rRNA/info'},
 'database': 'ENA',
 'first_seen': '2014-05-29',
 'is_active': True,
 'is_expert_db': False,
 'last_seen': '2016-03-15',
 'length': 363,
 'sequence': 'GUAUGCAACUUACCUUUUACUAGAGAAUAGCCAAGAGAAAUUUUGAUUAAUGCUCUAUGUUCUUAUUUACUCGCAUGAGUAAAUAAGCAAAGCUCCGGCGGUAAAAGAUGGGCAUGCGUCCUAUUAGCUUGUAGGUGAGGUAAUGGCUCACCUAAGCUCCGAUAGGUAGGGGUCCUGAGAGGGAGAUCCCCCACACUGGUACUGAGACACGGACCAGACUUCUACGGAAGGCAGCAGUAAGGAAUAUUGGACAAUGGAGGCAACUCUGAUCCAGCCAUGCCGCGUGAAGGAAGACGGCCUUAUGGGUUGUAAACUUCUUUUAUACAGGAAGAAACCUUUCCACGUGUGGAAAGCUGACGGUAC',
 'taxid': 77133}

Well, that's interesting. Even though we queried for "tRNA", we ended up getting back "rRNA"! Unfortunately, while the search interface of RNAcentral is quite powerful (and we're intentionally not fully leveraging it here), it is not perfect. So, we'll need to do some data scrubbing on our end if we want to only get specific records. More on this in a little.

Next, it'd be great to get the lineage associated with the records. Luckily, RNAcentral provides a taxid, which corresponds to the NCBI taxonomy. Unfortunately, they don't provide the full lineage. But, we can query NCBI to gather these details. NCBI provides EUtils that let users programmatically query their resources similarly to RNAcentral.

In [4]:
def query_ncbi_lineage(taxon_id):
    """Obtain the NCBI lineage for a taxon ID
    
    Parameters
    ----------
    taxon_id : int
        The taxon ID of interest
    
    Returns
    -------
    list or None
        Each taxon name or None if unable to retreive the taxon details
    """
    url = "http://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
    
    # Define our parameters to use in our query
    params = {'db': 'taxonomy',  # We want to query the taxonomy database
              'id': taxon_id}    # We're requesting detail on the taxon ID specifically
    
    # Make the request
    r = requests.get(url, params=params)
    
    # Bail if we received a bad status
    if r.status_code != 200:
        return None
    
    # NCBI returns XML, so we need to parse the "content" of our request into a usable structure
    tree = ET.fromstring(r.content)
    
    # We are only interested in the Lineage key within the tree. There is a fair bit of other data
    # present within the structure, however.
    lineage = next(tree.iter('Lineage'))
    
    # The lineage, if we did get it, is delimited by "; ", so let's go ahead and parse that into 
    # a friendlier structure
    if lineage is not None:
        return [v.strip() for v in lineage.text.split(';')]
    else:
        return None

Great! Now we can query NCBI, let's do a quick test to see what lineage we get back that would be associated with the previous RNAcentral record.

In [5]:
print(query_ncbi_lineage(57045))
['cellular organisms', 'Bacteria', 'Proteobacteria', 'Gammaproteobacteria', 'Enterobacteriales', 'Enterobacteriaceae', 'Salmonella', 'Salmonella enterica', 'Salmonella enterica subsp. enterica']

Building a database record processor without the Workflow object

Now that the querying functions are out of the way, we're going to construct a method that parses and filters the records based on some runtime critera. Specifically, this function will:

  • Obtain the record accession
  • Filter any records that appear to be environmental
  • Force the sequence to use the RNA alphabet
  • Filter any sequences that are to short or two long
  • Filter based on GC content
  • Obtain the taxonomy
  • Determine the "group" the sequence is in based on the taxonomy

Most of these things are going to be optional, as is typical in a sequence processor where under some conditions you may want to filter, and may want to specify the specific criteria to filter by.

In addition, we also want to track our "failed" records, which is also common as they can be used to debug issues in the pipeline or get dumped into some other processing stream.

And last, we want to track processing statistics so we know how many items were filtered based on what critera.

In [6]:
from collections import Counter

failures = []
def failed(item):
    """Store a failed record
    
    This function is decomposed in case we want to add other logic for dealing
    with failures
    """
    global failures
    failures.append(item)
    
def record_parser(items, options):
    """Parse items based on our runtime options
    
    Parameters
    ----------
    items : Iterable
        Iterable of `dict` that contains the record information
    options : dict
        The options that describe filtering criteria, etc
    
    Returns
    -------
    generator
        Yields `dict`s of the parsed records
    """
    stats = Counter()
    for item in items:
        # We're going to be updating item inplace, so let's make a copy to be safe
        item = item.copy()
        
        # The id is delimited by a ':', where the first component is the INSDC accession
        id_ = item['accession']['id'].split(':', 1)[0]
        
        # If the options indicate to, let's filter by "environmental" criteria. For instance,
        # if the description contains the word "environmental" then we want to ignore the record.
        # This is common when parsing out named isolates.
        if options['filter-description']:
            acc = item['accession']
            tokens = {t.strip().lower() for t in acc['description'].split()}
            
            if options['filter-description'].intersection(tokens):
                failed(item)
                stats['filter-description'] += 1
                continue
        
        # Force the record to use an RNA alphabet
        item['sequence'] = item['sequence'].replace('T', 'U')
        
        # Filter out records that don't meet a minimum length requirement
        if options['filter-length'] and item['length'] < options['minimum-length']:
            failed(item)
            stats['minimum-length'] += 1
            continue
            
        # Filter out records that don't meet a maximum length requirement
        if options['filter-length'] and item['length'] > options['maximum-length']:
            failed(item)
            stats['maximum-length'] += 1
            continue
        
        # Compute GC if necessary
        if options['compute-gc']:
            gc = float(item['sequence'].count('G') + item['sequence'].count('C'))
            gc /= item['length']
        else:
            gc = None
           
        # Filter by a minimum GC content
        if options['compute-gc'] and options['minimum-gc'] and gc is not None:
            if gc < options['minimum-gc']:
                failed(item)
                stats['minimum-gc'] += 1
                continue
        
        # Filter by a maximum GC content
        if options['compute-gc'] and options['maximum-gc'] and gc is not None:
            if gc < options['maximum-gc']:
                failed(item)
                stats['maximum-gc'] += 1
                continue
        
        # If we have a taxonomy ID, then let's grab the lineage
        if item.get('taxid', None) is not None:
            lineage = query_ncbi_lineage(item['taxid'])
        else:
            lineage = None
            
        # Determine the group based on the lineage
        group = lineage[-1] if lineage is not None else 'unassigned'
        
        # Finally, let's save a little bit more state and yield for a subsequent consumer
        item['gc'] = gc
        item['group'] = group
        item['lineage'] = lineage
        item['id'] = id_
        item['sequence'] = RNA(item['sequence'], metadata={'id': item['id']}, validate=True)

        yield item

So that's a pretty complex function. There are a lot of conditionals and short circuiting the loop with continues. Though this particular example probably is not extremely difficult to verify its correctness, it is not difficult to imagine a more complex process (such as QC for amplicon data) that would be much more difficult to verify. Additionally, we can't retreive the stats defined without making the return more complex, or the use of global variables. Let's quickly play with the function though so we can see how it behaves. First, lets not define any options and examine the result.

In [7]:
options = {}
items = query_rnacentral('tRNA')
records = record_parser(items, options)

try:
    pprint(next(records))
except Exception as e:
    print("We raised an exception!")
    print("%s: %s" % (type(e), e.args))
We raised an exception!
<class 'KeyError'>: ('filter-description',)

Okay, so that's a bit annoying. We could update the function to issue dict.get calls, and increase the complexity of our conditional logic, but for how, lets just define usable options.

In [8]:
options = {'filter-description': set(),
           'filter-length': False,
           'minimum-length': None,
           'maximum-length': None,
           'compute-gc': True,
           'minimum-gc': None,
           'maximum-gc': None}
           
items = query_rnacentral('tRNA')
records = record_parser(items, options)
pprint(next(records))
{'accession': {'citations': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rRNA/citations',
               'description': 'uncultured bacterium partial 16S ribosomal '
                              'RNA',
               'expert_db_url': '',
               'external_id': '',
               'gene': '',
               'id': 'DQ668905.1:1..363:rRNA',
               'optional_id': '',
               'organelle': '',
               'product': '16S ribosomal RNA',
               'rna_type': 'rRNA',
               'source_url': 'http://www.ebi.ac.uk/ena/data/view/Non-coding:DQ668905.1:1..363:rRNA',
               'species': 'environmental samples uncultured bacterium',
               'url': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rRNA/info'},
 'database': 'ENA',
 'first_seen': '2014-05-29',
 'gc': 0.465564738292011,
 'group': 'environmental samples',
 'id': 'DQ668905.1',
 'is_active': True,
 'is_expert_db': False,
 'last_seen': '2016-03-15',
 'length': 363,
 'lineage': ['cellular organisms', 'Bacteria', 'environmental samples'],
 'sequence': RNA
---------------------------------------------------------------------
Metadata:
    'id': 'DQ668905.1'
Stats:
    length: 363
    has gaps: False
    has degenerates: False
    has definites: True
    GC-content: 46.56%
---------------------------------------------------------------------
0   GUAUGCAACU UACCUUUUAC UAGAGAAUAG CCAAGAGAAA UUUUGAUUAA UGCUCUAUGU
60  UCUUAUUUAC UCGCAUGAGU AAAUAAGCAA AGCUCCGGCG GUAAAAGAUG GGCAUGCGUC
...
300 UAUGGGUUGU AAACUUCUUU UAUACAGGAA GAAACCUUUC CACGUGUGGA AAGCUGACGG
360 UAC,
 'taxid': 77133}

Building a better database record processor with the Workflow object

That worked, but can we do better? One large issue with the above function is that there are so many different paths through the function, that it is difficult to validate the individual functions. Additionally, the boilerplate logic for checking options and state is replicated numerous times. In a real world situation, additional features will likely get added in as well further increasing the complexity of the method, and the likelihood of bugs creeping in.

The goal of the Workflow is to centralize the boilerplate logic, and allow the relatively small blocks of logic that rely on shared state (such as the GC filtering) to be decomposed. To do this, the Workflow relies on function decorators: method and requires. The structure of the object is such that each individual block of code (such as the minimum length filter) can be explicitly unit-tested easily.

method tells the Workflow that this is a method to be executed for every item being operated on. It takes a priority parameter that indicates its relative order of execution in the workflow, where a higher value means it will be executed earlier. If a method sets failed, the item is considered to have "failed" and, by default, none of the other methods are executed (e.g., shortcircuiting like the continue above).

The second decorator, requires, performs the option checking to verify that specific runtime options are set, have particular values (e.g., execute method foo if option X has value Y), and to check on state requirements. If a requirement is not met, the function is not evaluated.

Last, the Workflow allows the developer to specify a callback to be executed on a successful processing, and one that can be executed on a failed processing.

Let's take a look at what this looks like. First, we're going to define two helper functions that simply test for the presence of some piece of state.

In [9]:
def has_gc(item):
    """Test if state has gc computed"""
    return item.get('gc') is not None

def has_taxid(item):
    """Test if state has a valid taxid"""
    return item.get('taxid') is not None

Next, we're going to define the actual workflow itself. The first method we'll define is a special one that is executed for every record called initialize_state. This method is responsible for resetting the notion of state, which should be independent for every item. Following this, we'll define the actual methods that'll do the work and decorate on their various requirements.

In [10]:
class RecordParser(Workflow):
    def initialize_state(self, item):
        """Initialize state based on the item being processed"""
        self.state = item.copy()
        self.state['gc'] = None
        self.state['group'] = None
        self.state['lineage'] = None
        self.state['id'] = None
         
    @method(priority=99)
    @requires(option='filter-description', values=not_none)  # not_none means any option value except None is valid
    def filter_description(self):
        """Filter records depending on keywords present in the description"""
        acc = self.state['accession']
        tokens = {t.strip().lower() for t in acc['description'].split()}
       
        if self.options['filter-description'].intersection(tokens):
            self.failed = True
            self.stats['filter-description'] += 1
    
    @method(priority=95)  # This method has no requirements
    def force_to_rna(self):
        """Convert any thymine to uracil""" 
        self.state['sequence'] = self.state['sequence'].replace('T', 'U')
    
    @method(priority=89)
    @requires(option='filter-length', values=True)  # Require 'filter-length' to be True
    @requires(option='minimum-length', values=not_none)  # Require that the 'minimum-length' is any value other than None
    def minimum_length(self):
        if self.state['length'] < self.options['minimum-length']:
            self.failed = True
            self.stats['minimum-length'] += 1

    @method(priority=88)
    @requires(option='filter-length', values=True)
    @requires(option='maximum-length', values=not_none)  # Require that the 'maximum-length' is any value other than None
    def maximum_length(self):
        if self.state['length'] > self.options['maximum-length']:
            self.failed = True
            self.stats['maximum-length'] += 1
    
    @method(priority=80)
    @requires(option='compute-gc', values=True)
    def compute_gc(self):
        gc = self.state['sequence'].count('G') + self.state['sequence'].count('C')
        self.state['gc'] = float(gc) / self.state['length']
    
    @method(priority=79)
    @requires(option='minimum-gc', values=not_none)
    @requires(state=has_gc)  # Require that has_gc(self.state) evaluates to True 
    def minimum_gc(self):
        if self.state['gc'] < self.options['minimum-gc']:
            self.failed = True
            self.stats['minimum-gc'] += 1
    
    @method(priority=78)
    @requires(option='maximum-gc', values=not_none)
    @requires(state=has_gc)  # Require that has_gc(self.state) evaluates to True
    def maximum_gc(self):
        if self.state['gc'] > self.options['maximum-gc']:
            self.failed = True
            self.stats['maximum-gc'] += 1
    
    @method(priority=3)
    @requires(state=has_taxid)  # Require that has_taxid(self.state) evaluates to True
    def fetch_lineage(self):
        self.state['lineage'] = query_ncbi_lineage(self.state['taxid'])    
    
    @method(priority=2)
    def assign_to_group(self):
        self.state['group'] = self.state['lineage'][-1] if self.state['lineage'] is not None else 'unassigned'
        
    @method(priority=1)
    def cast_to_rnasequence(self):
        self.state['sequence'] = RNA(self.state['sequence'], metadata={'id': self.state['id']}, validate=True)

While the workflow itself is complex, each individual block of logic is small making it much easier to test out the components. Integration testing is of course still a very good idea.

Now, let's take the object out for a spin. First, we'll instantiate the workflow. We can predefine state on instantiation which is useful if it is possible to preallocate memory (instaed of always creating an object in initialize_state), but this isn't necessary but a point of optimization. Second, we'll specify the same options we defined in our singular function above. And third, we're going to define a stats object to track information about failures (note, state is required, options is optional, and anything in kwargs gets set as a member variable).

To get a record out, we pass in our items iterator (created from the query_rna_central function defined earlier), and simply get the next item. On __call__, we can also pass in callbacks which we'll show shortly.

In [11]:
records = RecordParser(state=None, options=options, stats=Counter())
pprint(next(records(items)))
pprint(records.stats)
{'accession': {'citations': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rfam/citations',
               'description': 'uncultured bacterium Bacterial small subunit '
                              'ribosomal RNA',
               'expert_db_url': 'http://rfam.xfam.org/family/RF00177',
               'external_id': 'RF00177',
               'gene': '',
               'id': 'DQ668905.1:1..363:rfam',
               'optional_id': 'SSU_rRNA_bacteria',
               'organelle': '',
               'product': 'Bacterial small subunit ribosomal RNA',
               'rna_type': 'rRNA',
               'source_url': '',
               'species': 'uncultured bacterium',
               'url': 'http://rnacentral.org/api/v1/accession/DQ668905.1:1..363:rfam/info'},
 'database': 'Rfam',
 'first_seen': '2016-03-15',
 'gc': 0.465564738292011,
 'group': 'environmental samples',
 'id': None,
 'is_active': True,
 'is_expert_db': False,
 'last_seen': '2016-03-15',
 'length': 363,
 'lineage': ['cellular organisms', 'Bacteria', 'environmental samples'],
 'sequence': RNA
---------------------------------------------------------------------
Metadata:
    'id': None
Stats:
    length: 363
    has gaps: False
    has degenerates: False
    has definites: True
    GC-content: 46.56%
---------------------------------------------------------------------
0   GUAUGCAACU UACCUUUUAC UAGAGAAUAG CCAAGAGAAA UUUUGAUUAA UGCUCUAUGU
60  UCUUAUUUAC UCGCAUGAGU AAAUAAGCAA AGCUCCGGCG GUAAAAGAUG GGCAUGCGUC
...
300 UAUGGGUUGU AAACUUCUUU UAUACAGGAA GAAACCUUUC CACGUGUGGA AAGCUGACGG
360 UAC,
 'taxid': 77133}
Counter()

Great, things appear to be working as we'd expect. To finish this notebook off, lets do three more things. First, we'll construct some example callback functions. Second, we're going to change the options to trigger some failures. And last, we're going to enable debug mode to gather additional context.

By default, no failure callback is defined as it is assumed that failures are ignored. In addition, there actually is always a success callback but it is by default just a passthrough. Each callback is provided self, or the entire workflow object. By default, success simply just returns self.state. These callbacks can be handy if you want to automatically stream results to a database, file, or other consumer.

In [12]:
def fail_callback(wf):
    """Print out the ID associated with the failure"""
    print("We had a failure with %s" % wf.state['accession']['id'])
    
def success_callback(wf):
    """Print out the ID associated with the success"""
    print("We successfully processed %s" % wf.state['accession']['id'])

# Let's filter out those pesky ribosomal RNAs
options['filter-description'] = {'ribosomal', 'rrna'}

records = RecordParser(state=None, options=options, stats=Counter())
next(records(items, success_callback=success_callback, fail_callback=fail_callback))
pprint(records.stats)
We had a failure with DQ668905.1:1..363:silva
Counter({'filter-description': 1})

It would be cumbersome to interrogate stats everytime something failed in order to figure out why it failed. The workflow assumes that you typically don't care about why something failed specifically (as it also adds a fair bit of overhead), but it is very useful to know when debugging. We can enable this tracking it by setting debug=True. To make this more interesting, let's trigger a failure deep in the workflow as the description filter happens first.

In [13]:
options['filter-description'] = None
options['compute-gc'] = True
options['minimum-gc'] = 1.0

records = RecordParser(state=None, options=options, stats=Counter(), debug=True)
next(records(items, success_callback=success_callback, fail_callback=fail_callback))
We had a failure with KC615826.1:1..558:rRNA

Okay, so we understandably had a failure as the minumum GC was set at 100%. What type of debug detail do we have? First is the trace, which contains tuples of ("method_name", order_of_execution). A low value for the order_of_execution means it was executed earlier relative to a higher number. The value is assigned based on the priority, so in the subsequent example, there isn't a method associated with 0 as the highest priority method (filter_description) was not executed due to the options we set.

What we can see is that the highest number in the execution order is associated with minimum_gc indicating that that method failed.

In [14]:
pprint(records.debug_trace)
{('compute_gc', 4), ('minimum_gc', 5), ('force_to_rna', 1)}

We can also get runtime information (in seconds), where the key in the runtime dict corresponds to a tuple in the debug_trace.

In [15]:
pprint(records.debug_runtime)
{('compute_gc', 4): 1.0728836059570312e-05,
 ('force_to_rna', 1): 3.0994415283203125e-06,
 ('minimum_gc', 5): 8.821487426757812e-06}

And finally, we have pre- and post-state tracking which lets us examine state prior to and following each method execution. These are also keyed by the tuples in the debug_trace. For the purposes of example, let's take a look at the compute_gc method as that one modifies state, whereas the minimum_gc method, though triggering the failure, does not modify the state object. And, to minimize cognitive load, let's just look at what happened to the gc key.

In [16]:
print("GC prior to the execution of `compute_gc`")
print(records.debug_pre_state[('compute_gc', 4)]['gc'])

print("\nGC following the execution of `compute_gc`")
print(records.debug_post_state[('compute_gc', 4)]['gc'])
GC prior to the execution of `compute_gc`
None

GC following the execution of `compute_gc`
0.2616487455197133