Simple De Bruijn Graph implementation with Eulerian walk-finder

In [1]:
class DeBruijnGraph:
    ''' De Bruijn directed multigraph built from a collection of
        strings. User supplies strings and k-mer length k.  Nodes
        are k-1-mers.  An Edge corresponds to the k-mer that joins
        a left k-1-mer to a right k-1-mer. '''
 
    @staticmethod
    def chop(st, k):
        ''' Chop string into k-mers of given length '''
        for i in range(len(st)-(k-1)):
            yield (st[i:i+k], st[i:i+k-1], st[i+1:i+k])
    
    class Node:
        ''' Node representing a k-1 mer.  Keep track of # of
            incoming/outgoing edges so it's easy to check for
            balanced, semi-balanced. '''
        
        def __init__(self, km1mer):
            self.km1mer = km1mer
            self.nin = 0
            self.nout = 0
        
        def isSemiBalanced(self):
            return abs(self.nin - self.nout) == 1
        
        def isBalanced(self):
            return self.nin == self.nout
        
        def __hash__(self):
            return hash(self.km1mer)
        
        def __str__(self):
            return self.km1mer
    
    def __init__(self, strIter, k, circularize=False):
        ''' Build de Bruijn multigraph given string iterator and k-mer
            length k '''
        self.G = {}     # multimap from nodes to neighbors
        self.nodes = {} # maps k-1-mers to Node objects
        for st in strIter:
            if circularize:
                st += st[:k-1]
            for kmer, km1L, km1R in self.chop(st, k):
                nodeL, nodeR = None, None
                if km1L in self.nodes:
                    nodeL = self.nodes[km1L]
                else:
                    nodeL = self.nodes[km1L] = self.Node(km1L)
                if km1R in self.nodes:
                    nodeR = self.nodes[km1R]
                else:
                    nodeR = self.nodes[km1R] = self.Node(km1R)
                nodeL.nout += 1
                nodeR.nin += 1
                self.G.setdefault(nodeL, []).append(nodeR)
        # Iterate over nodes; tally # balanced, semi-balanced, neither
        self.nsemi, self.nbal, self.nneither = 0, 0, 0
        # Keep track of head and tail nodes in the case of a graph with
        # Eularian walk (not cycle)
        self.head, self.tail = None, None
        for node in iter(self.nodes.values()):
            if node.isBalanced():
                self.nbal += 1
            elif node.isSemiBalanced():
                if node.nin == node.nout + 1:
                    self.tail = node
                if node.nin == node.nout - 1:
                    self.head = node
                self.nsemi += 1
            else:
                self.nneither += 1
    
    def nnodes(self):
        ''' Return # nodes '''
        return len(self.nodes)
    
    def nedges(self):
        ''' Return # edges '''
        return len(self.G)
    
    def hasEulerianWalk(self):
        ''' Return true iff graph has Eulerian walk. '''
        return self.nneither == 0 and self.nsemi == 2
    
    def hasEulerianCycle(self):
        ''' Return true iff graph has Eulerian cycle. '''
        return self.nneither == 0 and self.nsemi == 0
    
    def isEulerian(self):
        ''' Return true iff graph has Eulerian walk or cycle '''
        # technically, if it has an Eulerian walk
        return self.hasEulerianWalk() or self.hasEulerianCycle()
    
    def eulerianWalkOrCycle(self):
        ''' Find and return sequence of nodes (represented by
            their k-1-mer labels) corresponding to Eulerian walk
            or cycle '''
        assert self.isEulerian()
        g = self.G
        if self.hasEulerianWalk():
            g = g.copy()
            g.setdefault(self.tail, []).append(self.head)
        # graph g has an Eulerian cycle
        tour = []
        src = next(iter(g.keys())) # pick arbitrary starting node
        
        def __visit(n):
            while len(g[n]) > 0:
                dst = g[n].pop()
                __visit(dst)
            tour.append(n)
        
        __visit(src)
        tour = tour[::-1][:-1] # reverse and then take all but last node
            
        if self.hasEulerianWalk():
            # Adjust node list so that it starts at head and ends at tail
            sti = tour.index(self.head)
            tour = tour[sti:] + tour[:sti]
        
        # Return node list
        return list(map(str, tour))
In [2]:
g = DeBruijnGraph(['AAABBBA'], k=3)
In [3]:
g.isEulerian(), g.hasEulerianWalk(), g.hasEulerianCycle()
Out[3]:
(True, True, False)
In [4]:
# the figure we drew in class had 4 nodes
g.nnodes()
Out[4]:
4
In [5]:
g.eulerianWalkOrCycle()
Out[5]:
['AA', 'AB', 'BB', 'BB', 'BA', 'AA']
In [6]:
g = DeBruijnGraph(['AAABBBBA'], k=3) # Add 1 more B to the run of Bs
In [7]:
g.isEulerian(), g.hasEulerianWalk(), g.hasEulerianCycle()
Out[7]:
(True, True, False)
In [8]:
# the figure we drew in class again had 4 nodes
g.nnodes()
Out[8]:
4
In [9]:
g.eulerianWalkOrCycle()
Out[9]:
['AA', 'AB', 'BB', 'BB', 'BB', 'BA', 'AA']
In [10]:
# circularize makes DeBruijnGraph treat string as circular,
# returning to left-hand side at extreme right end
g = DeBruijnGraph(['AAABBBBA'], k=3, circularize=True)
In [11]:
g.isEulerian(), g.hasEulerianWalk(), g.hasEulerianCycle()
Out[11]:
(True, False, True)
In [12]:
g.eulerianWalkOrCycle()
Out[12]:
['AA', 'AA', 'AB', 'BB', 'BB', 'BB', 'BA', 'AA']
In [13]:
DeBruijnGraph(["a_long_long_long_time"], 5).eulerianWalkOrCycle()
Out[13]:
['a_lo',
 '_lon',
 'long',
 'ong_',
 'ng_l',
 'g_lo',
 '_lon',
 'long',
 'ong_',
 'ng_l',
 'g_lo',
 '_lon',
 'long',
 'ong_',
 'ng_t',
 'g_ti',
 '_tim',
 'time']
In [14]:
DeBruijnGraph(['a_long_long_long_time'], 5).eulerianWalkOrCycle().count('long')
Out[14]:
3
In [15]:
# see if we can get correct reconstruction at k=4
walk = DeBruijnGraph(['to_every_thing_turn_turn_turn_there_is_a_season'], 4).eulerianWalkOrCycle()
walk[0] + ''.join(map(lambda x: x[-1], walk[1:]))
Out[15]:
'to_every_thing_turn_turn_turn_there_is_a_season'
In [16]:
# that worked, but k=3 doesn't!  unresolvable repeat at k=3
walk = DeBruijnGraph(['to_every_thing_turn_turn_turn_there_is_a_season'], 3).eulerianWalkOrCycle()
walk[0] + ''.join(map(lambda x: x[-1], walk[1:]))
Out[16]:
'to_every_turn_turn_thing_turn_there_is_a_season'

Visualizing the graph

We define a new Python class extending DeBruijnGraph, but adding a to_dot member function. to_dot returns a Digraph object - a graph with directed edges. See the graphviz package user guide for more details on Digraph. Jupyter is kind enough to render Digraphs into pretty pictures.

In [17]:
import graphviz
In [18]:
class DeBruijnGraph2(DeBruijnGraph):
    def to_dot(self, weights=False):
        """ Return string with graphviz representation.  If 'weights'
            is true, label edges corresponding to distinct k-1-mers
            with weights, instead of drawing separate edges for
            k-1-mer copies. """
        g = graphviz.Digraph(comment='DeBruijn graph')
        for node in iter(self.G.keys()):
            g.node(node.km1mer, node.km1mer)
        for src, dsts in iter(self.G.items()):
            if weights:
                weightmap = {}
                if weights:
                    for dst in dsts:
                        weightmap[dst] = weightmap.get(dst, 0) + 1
                for dst, v in weightmap.iteritems():
                    g.edge(src.km1mer, dst.km1mer, label=str(v))
            else:
                for dst in dsts:
                    g.edge(src.km1mer, dst.km1mer)
        return g
In [19]:
# sing along
DeBruijnGraph2(['to_every_thing_turn_turn_turn_there_is_a_season_turn_turn_turn'], 4).to_dot()
Out[19]:
%3 to_ to_ o_e o_e to_->o_e n_t n_t _tu _tu n_t->_tu n_t->_tu n_t->_tu n_t->_tu n_t->_tu _th _th n_t->_th g_t g_t g_t->_tu ery ery ry_ ry_ ery->ry_ _ev _ev o_e->_ev y_t y_t ry_->y_t sea sea eas eas sea->eas a_s a_s _se _se a_s->_se ng_ ng_ ng_->g_t re_ re_ e_i e_i re_->e_i _se->sea ing ing ing->ng_ ver ver ver->ery her her ere ere her->ere y_t->_th s_a s_a _a_ _a_ s_a->_a_ _is _is e_i->_is on_ on_ on_->n_t is_ is_ is_->s_a tur tur urn urn tur->urn tur->urn tur->urn tur->urn tur->urn tur->urn _tu->tur _tu->tur _tu->tur _tu->tur _tu->tur _tu->tur thi thi _th->thi the the _th->the aso aso eas->aso son son son->on_ aso->son eve eve _ev->eve hin hin hin->ing _a_->a_s rn_ rn_ urn->rn_ urn->rn_ urn->rn_ urn->rn_ urn->rn_ _is->is_ eve->ver ere->re_ rn_->n_t rn_->n_t rn_->n_t rn_->n_t rn_->n_t thi->hin the->her
In [20]:
# now simplified with edge weights
DeBruijnGraph2(['to_every_thing_turn_turn_turn_there_is_a_season_turn_turn_turn'], 4).to_dot(True)
Out[20]:
%3 to_ to_ o_e o_e to_->o_e 1 n_t n_t _tu _tu n_t->_tu 5 _th _th n_t->_th 1 g_t g_t g_t->_tu 1 ery ery ry_ ry_ ery->ry_ 1 _ev _ev o_e->_ev 1 y_t y_t ry_->y_t 1 sea sea eas eas sea->eas 1 a_s a_s _se _se a_s->_se 1 ng_ ng_ ng_->g_t 1 re_ re_ e_i e_i re_->e_i 1 _se->sea 1 ing ing ing->ng_ 1 ver ver ver->ery 1 her her ere ere her->ere 1 y_t->_th 1 s_a s_a _a_ _a_ s_a->_a_ 1 _is _is e_i->_is 1 on_ on_ on_->n_t 1 is_ is_ is_->s_a 1 tur tur urn urn tur->urn 6 _tu->tur 6 thi thi _th->thi 1 the the _th->the 1 aso aso eas->aso 1 son son son->on_ 1 aso->son 1 eve eve _ev->eve 1 hin hin hin->ing 1 _a_->a_s 1 rn_ rn_ urn->rn_ 5 _is->is_ 1 eve->ver 1 ere->re_ 1 rn_->n_t 5 thi->hin 1 the->her 1