In [1]:
import math
import numpy

class HMM(object):
    ''' Simple Hidden Markov Model implementation.  User provides
        transition, emission and initial probabilities in dictionaries
        mapping 2-character codes onto floating-point probabilities
        for those table entries.  States and emissions are represented
        with single characters.  Emission symbols comes from a finite.  '''
    
    def __init__(self, A, E, I):
        ''' Initialize the HMM given transition, emission and initial
            probability tables. '''
        
        # put state labels to the set self.Q
        self.Q, self.S = set(), set() # states and symbols
        for a, prob in A.items():
            asrc, adst = a[0], a[1]
            self.Q.add(asrc)
            self.Q.add(adst)
        # add all the symbols to the set self.S
        for e, prob in E.items():
            eq, es = e[0], e[1]
            self.Q.add(eq)
            self.S.add(es)
        
        self.Q = sorted(list(self.Q))
        self.S = sorted(list(self.S))
        
        # create maps from state labels / emission symbols to integers
        # that function as unique IDs
        qmap, smap = {}, {}
        for i in range(len(self.Q)): qmap[self.Q[i]] = i
        for i in range(len(self.S)): smap[self.S[i]] = i
        lenq = len(self.Q)
        
        # create and populate transition probability matrix
        self.A = numpy.zeros(shape=(lenq, lenq), dtype=float)
        for a, prob in A.items():
            asrc, adst = a[0], a[1]
            self.A[qmap[asrc], qmap[adst]] = prob
        # make A stochastic (i.e. make rows add to 1)
        self.A /= self.A.sum(axis=1)[:, numpy.newaxis]
        
        # create and populate emission probability matrix
        self.E = numpy.zeros(shape=(lenq, len(self.S)), dtype=float)
        for e, prob in E.items():
            eq, es = e[0], e[1]
            self.E[qmap[eq], smap[es]] = prob
        # make E stochastic (i.e. make rows add to 1)
        self.E /= self.E.sum(axis=1)[:, numpy.newaxis]
        
        # initial probabilities
        self.I = [ 0.0 ] * len(self.Q)
        for a, prob in I.items():
            self.I[qmap[a]] = prob
        # make I stochastic (i.e. adds to 1)
        self.I = numpy.divide(self.I, sum(self.I))
        
        self.qmap, self.smap = qmap, smap
        
        # Make log-base-2 versions for log-space functions
        self.Alog = numpy.log2(self.A)
        self.Elog = numpy.log2(self.E)
        self.Ilog = numpy.log2(self.I)
    
    def jointProb(self, p, x):
        ''' Return joint probability of path p and emission string x '''
        p = list(map(self.qmap.get, p)) # turn state characters into ids
        x = list(map(self.smap.get, x)) # turn emission characters into ids
        tot = self.I[p[0]] # start with initial probability
        for i in range(1, len(p)):
            tot *= self.A[p[i-1], p[i]] # transition probability
        for i in range(0, len(p)):
            tot *= self.E[p[i], x[i]] # emission probability
        return tot
    
    def jointProbL(self, p, x):
        ''' Return log2 of joint probability of path p and emission
            string x.  Just like self.jointProb(...) but log2 domain. '''
        p = list(map(self.qmap.get, p)) # turn state characters into ids
        x = list(map(self.smap.get, x)) # turn emission characters into ids
        tot = self.Ilog[p[0]] # start with initial probability
        for i in range(1, len(p)):
            tot += self.Alog[p[i-1], p[i]] # transition probability
        for i in range(0, len(p)):
            tot += self.Elog[p[i], x[i]] # emission probability
        return tot
    
    def viterbi(self, x):
        ''' Given sequence of emissions, return the most probable path
            along with its probability. '''
        x = list(map(self.smap.get, x)) # turn emission characters into ids
        nrow, ncol = len(self.Q), len(x)
        mat   = numpy.zeros(shape=(nrow, ncol), dtype=float) # prob
        matTb = numpy.zeros(shape=(nrow, ncol), dtype=int)   # backtrace
        # Fill in first column
        for i in range(0, nrow):
            mat[i, 0] = self.E[i, x[0]] * self.I[i]
        # Fill in rest of prob and Tb tables
        for j in range(1, ncol):
            for i in range(0, nrow):
                ep = self.E[i, x[j]]
                mx, mxi = mat[0, j-1] * self.A[0, i] * ep, 0
                for i2 in range(1, nrow):
                    pr = mat[i2, j-1] * self.A[i2, i] * ep
                    if pr > mx:
                        mx, mxi = pr, i2
                mat[i, j], matTb[i, j] = mx, mxi
        # Find final state with maximal probability
        omx, omxi = mat[0, ncol-1], 0
        for i in range(1, nrow):
            if mat[i, ncol-1] > omx:
                omx, omxi = mat[i, ncol-1], i
        # Backtrace
        i, p = omxi, [omxi]
        for j in range(ncol-1, 0, -1):
            i = matTb[i, j]
            p.append(i)
        p = ''.join(map(lambda x: self.Q[x], p[::-1]))
        return omx, p # Return probability and path
    
    def viterbiL(self, x):
        ''' Given sequence of emissions, return the most probable path
            along with log2 of its probability.  Just like viterbi(...)
            but in log2 domain. '''
        x = list(map(self.smap.get, x)) # turn emission characters into ids
        nrow, ncol = len(self.Q), len(x)
        mat   = numpy.zeros(shape=(nrow, ncol), dtype=float) # prob
        matTb = numpy.zeros(shape=(nrow, ncol), dtype=int)   # backtrace
        # Fill in first column
        for i in range(0, nrow):
            mat[i, 0] = self.Elog[i, x[0]] + self.Ilog[i]
        # Fill in rest of log prob and Tb tables
        for j in range(1, ncol):
            for i in range(0, nrow):
                ep = self.Elog[i, x[j]]
                mx, mxi = mat[0, j-1] + self.Alog[0, i] + ep, 0
                for i2 in range(1, nrow):
                    pr = mat[i2, j-1] + self.Alog[i2, i] + ep
                    if pr > mx:
                        mx, mxi = pr, i2
                mat[i, j], matTb[i, j] = mx, mxi
        # Find final state with maximal log probability
        omx, omxi = mat[0, ncol-1], 0
        for i in range(1, nrow):
            if mat[i, ncol-1] > omx:
                omx, omxi = mat[i, ncol-1], i
        # Backtrace
        i, p = omxi, [omxi]
        for j in range(ncol-1, 0, -1):
            i = matTb[i, j]
            p.append(i)
        p = ''.join(map(lambda x: self.Q[x], p[::-1]))
        return omx, p # Return log probability and path
In [2]:
# We experiment with joint probabilities first

hmm = HMM({"FF":0.9, "FL":0.1, "LF":0.1, "LL":0.9}, # transition matrix A
          {"FH":0.5, "FT":0.5, "LH":0.75, "LT":0.25}, # emission matrix E
          {"F":0.5, "L":0.5}) # initial probabilities I
jprob1 = hmm.jointProb("FFFLLLFFFFF", "THTHHHTHTTH")
myprob1 = (0.5 ** 9) * (0.75 ** 3) * (0.9 ** 8) * (0.1 ** 2)
jprob1, myprob1
# these should be about equal
Out[2]:
(3.5469405120849628e-06, 3.5469405120849624e-06)
In [3]:
# confirming that log version of jointProb works as expected
jprobL1 = hmm.jointProbL("FFFLLLFFFFF", "THTHHHTHTTH")
math.log(jprob1, 2), jprobL1
Out[3]:
(-18.104993435171657, -18.104993435171654)
In [4]:
# Trying another path
jprob2 = hmm.jointProb("FFFFFFFFFFF", "THTHHHTHTTH")
myprob2 = (0.5 ** 12) * (0.9 ** 10)
jprob2, myprob2
# these should be about equal
Out[4]:
(8.51265722900391e-05, 8.512657229003909e-05)
In [5]:
# Note that jprob2 is greater than jprob1

# Now we experiment with viterbi decoding
jprobOpt, path = hmm.viterbi("THTHHHTHTTH")
path
Out[5]:
'FFFFFFFFFFF'
In [6]:
# maximum likelihood path is same path (all fair) as the second one
# we tried above, so jprobOpt should equal jprob2
jprobOpt, jprob2
Out[6]:
(8.51265722900391e-05, 8.51265722900391e-05)
In [7]:
# confirming that log version of viterbi works as expected
jprobLOpt, _ = hmm.viterbiL("THTHHHTHTTH")
math.log(jprobOpt, 2), jprobLOpt
Out[7]:
(-13.520030934450498, -13.520030934450496)
In [8]:
# Now let's make a new HMM with the same states but where jumps
# between fair (F) and loaded (L) are much more probable
hmm = HMM({"FF":0.6, "FL":0.4, "LF":0.4, "LL":0.6}, # transition matrix A
          {"FH":0.5, "FT":0.5, "LH":0.8, "LT":0.2}, # emission matrix E
          {"F":0.5, "L":0.5}) # initial probabilities I
hmm.viterbi("THTHHHTHTTH")
Out[8]:
(2.8665446400000001e-06, 'FFFLLLFFFFL')
In [9]:
# Here's an example of underflow.  Note that probability returned
# is 0.0 and the state string becomes all Fs after a while.
hmm.viterbi("THTHHHTHTTH" * 100)
Out[9]:
(0.0,
 'FFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF')
In [10]:
# Moving to log2 domain fixes underflow
hmm.viterbiL("THTHHHTHTTH" * 100)
Out[10]:
(-1824.4030071946879,
 'FFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFFFFFLLLFFFFL')
In [11]:
cpgHmm = HMM({'IO':0.20, 'OI':0.20, 'II':0.80, 'OO':0.80},
             {'IA':0.10, 'IC':0.40, 'IG':0.40, 'IT':0.10,
              'OA':0.25, 'OC':0.25, 'OG':0.25, 'OT':0.25},
             {'I' :0.50, 'O' :0.50})
x = 'ATATATACGCGCGCGCGCGCGATATATATATATA'
logp, path = cpgHmm.viterbiL(x)
print(x)
print(path) # finds the CpG island fine
ATATATACGCGCGCGCGCGCGATATATATATATA
OOOOOOOIIIIIIIIIIIIIIOOOOOOOOOOOOO
In [12]:
x = 'ATATCGCGCGCGATATATCGCGCGCGATATATAT'
logp, path = cpgHmm.viterbiL(x)
print(x)
print(path) # finds two CpG islands fine
ATATCGCGCGCGATATATCGCGCGCGATATATAT
OOOOIIIIIIIIOOOOOOIIIIIIIIOOOOOOOO
In [13]:
x = 'ATATATACCCCCCCCCCCCCCATATATATATATA'
logp, path = cpgHmm.viterbiL(x)
print(x)
print(path) # oops! - this is just a bunch of Cs.  What went wrong?
ATATATACCCCCCCCCCCCCCATATATATATATA
OOOOOOOIIIIIIIIIIIIIIOOOOOOOOOOOOO