In [1]:
import math
import numpy as np
from pprint import pprint
import timeit
In [2]:
def simple_beam_search(data, k):
    dd = dict(zip(range(len(data)), data))
    sort = sorted(dd.items(), key=lambda x:x[1], reverse=True)
    res = [w for w,p in sort[:k]]
    return sorted(res)
In [3]:
A = np.array(
    [0.3777,0.0110,0.0009,0.0084,0.0584,0.0090,0.0025,
     0.0008,0.0002,0.7968,0.0005,0.0008,0.1698,0.0041,
     0.0322,0.0005,0.0050,0.0837,0.0615,0.0514,0.2231,
     0.0366,0.0004,0.0001,0.0733,0.4509,0.0036,0.0036,
     0.0096,0.0176,0.0014,0.0086,0.1216,0.0177,0.0068,
     0.0068,0.0102,0.1011,0.1012,0.0120,0.0728,0.0479,
     0.1147,0.0021,0.0002,0.2157,0.4744,0.0102,0.0017]).reshape(7, 7)
B = np.array(
    [0.000032,0,0,0.000048,0,
     0,0.308431,0,0,0,
     0,0.000028,0.000672,0,0.000028,
     0,0,0.000340,0,0,
     0,0.000200,0.000223,0,0.002337,
     0,0,0.010446,0,0,
     0,0,0,0.506099,0]).reshape(7, 5)
# B = np.array(
#     [0, 0.000048, 0, 0, 0.000032,
#      0, 0, 0, 0.308431, 0,
#      0.000028, 0, 0.000672, 0.000028, 0,
#      0, 0, 0.000340, 0, 0,
#      0.002337, 0, 0.000223, 0.000200, 0,
#      0, 0, 0.010446, 0, 0,
#      0, 0.506099, 0, 0, 0]).reshape(7, 5)
pi = np.array([0.2767,0.0006,0.0031,0.0453,0.0449,0.0510,0.2026])
In [4]:
O = "Janet will back the hill".split()
S = "NNP MD VB JJ NN RB DT".split()
lens = len(S)
leno = len(O)
P = np.zeros((lens, leno))
V = np.zeros((lens, leno))
bp = np.zeros(leno, dtype=np.int8)
In [5]:
for i in range(lens):
    V[i, 0] = pi[i] * B[i, 0]
    P[i, 0] = 0
In [6]:
index = range(V.shape[0])
for o in range(1, leno):
    for s in range(lens):
        prob = V[index, o-1] * A[index, s] * B[s, o]
        V[s, o] = max(prob)
        P[s, o] = index[np.argmax(prob)]
    index = simple_beam_search(V[:, o], 2)
bp[-1] = np.argmax(V[:, -1])
for i in reversed(range(1, leno)):
    bp[i-1] = P[bp[i], i]
bp
Out[6]:
array([0, 1, 2, 6, 4], dtype=int8)
In [7]:
def viterbi(y, A, B, pi, beam_size=None):
    """
    Parameters
    ----------
    y : array (T,)
        Observation state sequence. int dtype. sequence index in B
    """
    lens = A.shape[0]
    leno = len(y)
    P = np.zeros((lens, leno))
    V = np.zeros((lens, leno))
    bp = np.zeros(leno, dtype=np.int8)
    # 因为只有 <s> 一个,所以不需要像下面那样取 max 处理
    for i in range(lens):
        V[i, 0] = pi[i] * B[i, y[0]]
        P[i, 0] = 0
    # 初始化选择所有的 states
    index = range(V.shape[0])
    for o in range(1, leno):
        for s in range(lens):
            # 这里的概率是上一层所有的 states 到这一步某一个 state 的概率,array(len(states), )
            prob = V[index, o-1] * A[index, s] * B[s, y[o]]
            # 选择(实际上是上一层)最大的概率
            V[s, o] = max(prob)
            # P 的计算用 V[:, o-1] * A[:, s] 更容易理解,表示上一层的最大概率
            # 选择上一层最大概率对应的 state
            # beam search 时这里需要注意修改(从注释的样子修改为现在的样子)
            P[s, o] = index[np.argmax(prob)] # np.argmax(prob)
        # 这一步可以使用 beam search 对 S 进行过滤
        if beam_size:
            index = simple_beam_search(V[:, o], beam_size)
    # 因为是往前计算的,所以最后得到的是最后一层的概率,之前的需要回溯
    bp[-1] = np.argmax(V[:, -1])
    for i in reversed(range(1, leno)):
        bp[i-1] = P[bp[i], i]
    return V, P, bp
In [8]:
V, P, bp = viterbi(np.array(range(5)), A, B, pi, beam_size=10)
bp
Out[8]:
array([0, 1, 2, 6, 4], dtype=int8)
In [9]:
%timeit viterbi(np.array(range(5)), A, B, pi, beam_size=None)
775 µs ± 65.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [10]:
# 结果很直观
%timeit viterbi(np.array(range(5)), A, B, pi, beam_size=2)
569 µs ± 26.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [9]:
# 将 B 反序后,4 3 2 1 0 表示 O 中的每一个 token 在 B 中对应的 index
V, P, bp = viterbi(np.array([4, 3, 2, 1, 0]), A, B, pi)
bp
Out[9]:
array([0, 1, 2, 6, 4], dtype=int8)
In [11]:
[O[o]+"/"+S[s] for o,s in enumerate(bp)]
Out[11]:
['Janet/NNP', 'will/MD', 'back/VB', 'the/DT', 'hill/NN']
In [14]:
V
Out[14]:
array([[8.85440000e-06, 0.00000000e+00, 0.00000000e+00, 2.48613983e-17,
        0.00000000e+00],
       [0.00000000e+00, 3.00406859e-08, 0.00000000e+00, 0.00000000e+00,
        0.00000000e+00],
       [0.00000000e+00, 2.23130880e-13, 1.60852733e-11, 0.00000000e+00,
        1.01707158e-20],
       [0.00000000e+00, 0.00000000e+00, 5.10691660e-15, 0.00000000e+00,
        0.00000000e+00],
       [0.00000000e+00, 1.03419392e-10, 5.35925837e-15, 0.00000000e+00,
        2.01357071e-15],
       [0.00000000e+00, 0.00000000e+00, 5.32840899e-11, 0.00000000e+00,
        0.00000000e+00],
       [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.81619925e-12,
        0.00000000e+00]])
In [15]:
P
Out[15]:
array([[0., 0., 0., 2., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 1., 0., 6.],
       [0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 6.],
       [0., 0., 1., 0., 0.],
       [0., 0., 0., 2., 0.]])
In [13]:
viterbi2(np.array(range(len(O))), A, B, pi)
Out[13]:
(array([0, 1, 2, 6, 4], dtype=uint8),
 array([[8.85440000e-06, 0.00000000e+00, 0.00000000e+00, 2.48613983e-17,
         0.00000000e+00],
        [0.00000000e+00, 3.00406859e-08, 0.00000000e+00, 0.00000000e+00,
         0.00000000e+00],
        [0.00000000e+00, 2.23130880e-13, 1.60852733e-11, 0.00000000e+00,
         1.01707158e-20],
        [0.00000000e+00, 0.00000000e+00, 5.10691660e-15, 0.00000000e+00,
         0.00000000e+00],
        [0.00000000e+00, 1.03419392e-10, 5.35925837e-15, 0.00000000e+00,
         2.01357071e-15],
        [0.00000000e+00, 0.00000000e+00, 5.32840899e-11, 0.00000000e+00,
         0.00000000e+00],
        [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.81619925e-12,
         0.00000000e+00]]),
 array([[0, 0, 1, 2, 6],
        [0, 0, 1, 5, 6],
        [0, 0, 1, 5, 6],
        [0, 0, 1, 5, 6],
        [0, 0, 1, 2, 6],
        [0, 0, 1, 5, 6],
        [0, 0, 1, 2, 6]], dtype=uint8))
In [12]:
# from https://stackoverflow.com/questions/9729968/python-implementation-of-viterbi-algorithm
def viterbi2(y, A, B, Pi=None):
    """
    Return the MAP estimate of state trajectory of Hidden Markov Model.

    Parameters
    ----------
    y : array (T,)
        Observation state sequence. int dtype.
    A : array (K, K)
        State transition matrix. See HiddenMarkovModel.state_transition  for
        details.
    B : array (K, M)
        Emission matrix. See HiddenMarkovModel.emission for details.
    Pi: optional, (K,)
        Initial state probabilities: Pi[i] is the probability x[0] == i. If
        None, uniform initial distribution is assumed (Pi[:] == 1/K).

    Returns
    -------
    x : array (T,)
        Maximum a posteriori probability estimate of hidden state trajectory,
        conditioned on observation sequence y under the model parameters A, B,
        Pi.
    T1: array (K, T)
        the probability of the most likely path so far
    T2: array (K, T)
        the x_j-1 of the most likely path so far
    """
    # Cardinality of the state space
    K = A.shape[0]
    # Initialize the priors with default (uniform dist) if not given by caller
    Pi = Pi if Pi is not None else np.full(K, 1 / K)
    T = len(y)
    T1 = np.empty((K, T), 'd')
    T2 = np.empty((K, T), 'B')

    # Initilaize the tracking tables from first observation
    T1[:, 0] = Pi * B[:, y[0]]
    T2[:, 0] = 0

    # Iterate throught the observations updating the tracking tables
    for i in range(1, T):
        T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1)
        T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1)

    # Build the output, optimal model trajectory
    x = np.empty(T, 'B')
    x[-1] = np.argmax(T1[:, T - 1])
    for i in reversed(range(1, T)):
        x[i - 1] = T2[x[i], i]

    return x, T1, T2
In [ ]:
 
In [ ]:
 
In [ ]: