import math
import numpy as np
from pprint import pprint
import timeit
def simple_beam_search(data, k):
dd = dict(zip(range(len(data)), data))
sort = sorted(dd.items(), key=lambda x:x[1], reverse=True)
res = [w for w,p in sort[:k]]
return sorted(res)
A = np.array(
[0.3777,0.0110,0.0009,0.0084,0.0584,0.0090,0.0025,
0.0008,0.0002,0.7968,0.0005,0.0008,0.1698,0.0041,
0.0322,0.0005,0.0050,0.0837,0.0615,0.0514,0.2231,
0.0366,0.0004,0.0001,0.0733,0.4509,0.0036,0.0036,
0.0096,0.0176,0.0014,0.0086,0.1216,0.0177,0.0068,
0.0068,0.0102,0.1011,0.1012,0.0120,0.0728,0.0479,
0.1147,0.0021,0.0002,0.2157,0.4744,0.0102,0.0017]).reshape(7, 7)
B = np.array(
[0.000032,0,0,0.000048,0,
0,0.308431,0,0,0,
0,0.000028,0.000672,0,0.000028,
0,0,0.000340,0,0,
0,0.000200,0.000223,0,0.002337,
0,0,0.010446,0,0,
0,0,0,0.506099,0]).reshape(7, 5)
# B = np.array(
# [0, 0.000048, 0, 0, 0.000032,
# 0, 0, 0, 0.308431, 0,
# 0.000028, 0, 0.000672, 0.000028, 0,
# 0, 0, 0.000340, 0, 0,
# 0.002337, 0, 0.000223, 0.000200, 0,
# 0, 0, 0.010446, 0, 0,
# 0, 0.506099, 0, 0, 0]).reshape(7, 5)
pi = np.array([0.2767,0.0006,0.0031,0.0453,0.0449,0.0510,0.2026])
O = "Janet will back the hill".split()
S = "NNP MD VB JJ NN RB DT".split()
lens = len(S)
leno = len(O)
P = np.zeros((lens, leno))
V = np.zeros((lens, leno))
bp = np.zeros(leno, dtype=np.int8)
for i in range(lens):
V[i, 0] = pi[i] * B[i, 0]
P[i, 0] = 0
index = range(V.shape[0])
for o in range(1, leno):
for s in range(lens):
prob = V[index, o-1] * A[index, s] * B[s, o]
V[s, o] = max(prob)
P[s, o] = index[np.argmax(prob)]
index = simple_beam_search(V[:, o], 2)
bp[-1] = np.argmax(V[:, -1])
for i in reversed(range(1, leno)):
bp[i-1] = P[bp[i], i]
bp
array([0, 1, 2, 6, 4], dtype=int8)
def viterbi(y, A, B, pi, beam_size=None):
"""
Parameters
----------
y : array (T,)
Observation state sequence. int dtype. sequence index in B
"""
lens = A.shape[0]
leno = len(y)
P = np.zeros((lens, leno))
V = np.zeros((lens, leno))
bp = np.zeros(leno, dtype=np.int8)
# 因为只有 <s> 一个,所以不需要像下面那样取 max 处理
for i in range(lens):
V[i, 0] = pi[i] * B[i, y[0]]
P[i, 0] = 0
# 初始化选择所有的 states
index = range(V.shape[0])
for o in range(1, leno):
for s in range(lens):
# 这里的概率是上一层所有的 states 到这一步某一个 state 的概率,array(len(states), )
prob = V[index, o-1] * A[index, s] * B[s, y[o]]
# 选择(实际上是上一层)最大的概率
V[s, o] = max(prob)
# P 的计算用 V[:, o-1] * A[:, s] 更容易理解,表示上一层的最大概率
# 选择上一层最大概率对应的 state
# beam search 时这里需要注意修改(从注释的样子修改为现在的样子)
P[s, o] = index[np.argmax(prob)] # np.argmax(prob)
# 这一步可以使用 beam search 对 S 进行过滤
if beam_size:
index = simple_beam_search(V[:, o], beam_size)
# 因为是往前计算的,所以最后得到的是最后一层的概率,之前的需要回溯
bp[-1] = np.argmax(V[:, -1])
for i in reversed(range(1, leno)):
bp[i-1] = P[bp[i], i]
return V, P, bp
V, P, bp = viterbi(np.array(range(5)), A, B, pi, beam_size=10)
bp
array([0, 1, 2, 6, 4], dtype=int8)
%timeit viterbi(np.array(range(5)), A, B, pi, beam_size=None)
775 µs ± 65.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
# 结果很直观
%timeit viterbi(np.array(range(5)), A, B, pi, beam_size=2)
569 µs ± 26.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
# 将 B 反序后(见 B 定义处),4 3 2 1 0 表示 O 中的每一个 token 在 B 中对应的 index
V, P, bp = viterbi(np.array([4, 3, 2, 1, 0]), A, B, pi)
bp
array([0, 1, 2, 6, 4], dtype=int8)
[O[o]+"/"+S[s] for o,s in enumerate(bp)]
['Janet/NNP', 'will/MD', 'back/VB', 'the/DT', 'hill/NN']
V
array([[8.85440000e-06, 0.00000000e+00, 0.00000000e+00, 2.48613983e-17, 0.00000000e+00], [0.00000000e+00, 3.00406859e-08, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00], [0.00000000e+00, 2.23130880e-13, 1.60852733e-11, 0.00000000e+00, 1.01707158e-20], [0.00000000e+00, 0.00000000e+00, 5.10691660e-15, 0.00000000e+00, 0.00000000e+00], [0.00000000e+00, 1.03419392e-10, 5.35925837e-15, 0.00000000e+00, 2.01357071e-15], [0.00000000e+00, 0.00000000e+00, 5.32840899e-11, 0.00000000e+00, 0.00000000e+00], [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.81619925e-12, 0.00000000e+00]])
P
array([[0., 0., 0., 2., 0.], [0., 0., 0., 0., 0.], [0., 0., 1., 0., 6.], [0., 0., 1., 0., 0.], [0., 0., 1., 0., 6.], [0., 0., 1., 0., 0.], [0., 0., 0., 2., 0.]])
viterbi2(np.array(range(len(O))), A, B, pi)
(array([0, 1, 2, 6, 4], dtype=uint8), array([[8.85440000e-06, 0.00000000e+00, 0.00000000e+00, 2.48613983e-17, 0.00000000e+00], [0.00000000e+00, 3.00406859e-08, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00], [0.00000000e+00, 2.23130880e-13, 1.60852733e-11, 0.00000000e+00, 1.01707158e-20], [0.00000000e+00, 0.00000000e+00, 5.10691660e-15, 0.00000000e+00, 0.00000000e+00], [0.00000000e+00, 1.03419392e-10, 5.35925837e-15, 0.00000000e+00, 2.01357071e-15], [0.00000000e+00, 0.00000000e+00, 5.32840899e-11, 0.00000000e+00, 0.00000000e+00], [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.81619925e-12, 0.00000000e+00]]), array([[0, 0, 1, 2, 6], [0, 0, 1, 5, 6], [0, 0, 1, 5, 6], [0, 0, 1, 5, 6], [0, 0, 1, 2, 6], [0, 0, 1, 5, 6], [0, 0, 1, 2, 6]], dtype=uint8))
# from https://stackoverflow.com/questions/9729968/python-implementation-of-viterbi-algorithm
def viterbi2(y, A, B, Pi=None):
"""
Return the MAP estimate of state trajectory of Hidden Markov Model.
Parameters
----------
y : array (T,)
Observation state sequence. int dtype.
A : array (K, K)
State transition matrix. See HiddenMarkovModel.state_transition for
details.
B : array (K, M)
Emission matrix. See HiddenMarkovModel.emission for details.
Pi: optional, (K,)
Initial state probabilities: Pi[i] is the probability x[0] == i. If
None, uniform initial distribution is assumed (Pi[:] == 1/K).
Returns
-------
x : array (T,)
Maximum a posteriori probability estimate of hidden state trajectory,
conditioned on observation sequence y under the model parameters A, B,
Pi.
T1: array (K, T)
the probability of the most likely path so far
T2: array (K, T)
the x_j-1 of the most likely path so far
"""
# Cardinality of the state space
K = A.shape[0]
# Initialize the priors with default (uniform dist) if not given by caller
Pi = Pi if Pi is not None else np.full(K, 1 / K)
T = len(y)
T1 = np.empty((K, T), 'd')
T2 = np.empty((K, T), 'B')
# Initilaize the tracking tables from first observation
T1[:, 0] = Pi * B[:, y[0]]
T2[:, 0] = 0
# Iterate throught the observations updating the tracking tables
for i in range(1, T):
T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1)
T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1)
# Build the output, optimal model trajectory
x = np.empty(T, 'B')
x[-1] = np.argmax(T1[:, T - 1])
for i in reversed(range(1, T)):
x[i - 1] = T2[x[i], i]
return x, T1, T2