#!/usr/bin/env python
# coding: utf-8
# For blog: [Part-of-Speech-Tagging](https://yam.gift/2019/06/11/SLP/2019-06-11-Part-of-Speech-Tagging/)
#
# Cython version:
#
# [seqlearn/viterbi.pyx at master · larsmans/seqlearn](https://github.com/larsmans/seqlearn/blob/master/seqlearn/_decode/viterbi.pyx)
# ![](http://qnimg.lovevivian.cn/slp-ch8-3.jpeg)
#
# ![](http://qnimg.lovevivian.cn/slp-ch8-4.jpeg)
#
# In[1]:
import math
import numpy as np
from pprint import pprint
import timeit
# In[2]:
def simple_beam_search(data, k):
dd = dict(zip(range(len(data)), data))
sort = sorted(dd.items(), key=lambda x:x[1], reverse=True)
res = [w for w,p in sort[:k]]
return sorted(res)
# In[3]:
A = np.array(
[0.3777,0.0110,0.0009,0.0084,0.0584,0.0090,0.0025,
0.0008,0.0002,0.7968,0.0005,0.0008,0.1698,0.0041,
0.0322,0.0005,0.0050,0.0837,0.0615,0.0514,0.2231,
0.0366,0.0004,0.0001,0.0733,0.4509,0.0036,0.0036,
0.0096,0.0176,0.0014,0.0086,0.1216,0.0177,0.0068,
0.0068,0.0102,0.1011,0.1012,0.0120,0.0728,0.0479,
0.1147,0.0021,0.0002,0.2157,0.4744,0.0102,0.0017]).reshape(7, 7)
B = np.array(
[0.000032,0,0,0.000048,0,
0,0.308431,0,0,0,
0,0.000028,0.000672,0,0.000028,
0,0,0.000340,0,0,
0,0.000200,0.000223,0,0.002337,
0,0,0.010446,0,0,
0,0,0,0.506099,0]).reshape(7, 5)
# B = np.array(
# [0, 0.000048, 0, 0, 0.000032,
# 0, 0, 0, 0.308431, 0,
# 0.000028, 0, 0.000672, 0.000028, 0,
# 0, 0, 0.000340, 0, 0,
# 0.002337, 0, 0.000223, 0.000200, 0,
# 0, 0, 0.010446, 0, 0,
# 0, 0.506099, 0, 0, 0]).reshape(7, 5)
pi = np.array([0.2767,0.0006,0.0031,0.0453,0.0449,0.0510,0.2026])
# In[4]:
O = "Janet will back the hill".split()
S = "NNP MD VB JJ NN RB DT".split()
lens = len(S)
leno = len(O)
P = np.zeros((lens, leno))
V = np.zeros((lens, leno))
bp = np.zeros(leno, dtype=np.int8)
# In[5]:
for i in range(lens):
V[i, 0] = pi[i] * B[i, 0]
P[i, 0] = 0
# In[6]:
index = range(V.shape[0])
for o in range(1, leno):
for s in range(lens):
prob = V[index, o-1] * A[index, s] * B[s, o]
V[s, o] = max(prob)
P[s, o] = index[np.argmax(prob)]
index = simple_beam_search(V[:, o], 2)
bp[-1] = np.argmax(V[:, -1])
for i in reversed(range(1, leno)):
bp[i-1] = P[bp[i], i]
bp
# In[7]:
def viterbi(y, A, B, pi, beam_size=None):
"""
Parameters
----------
y : array (T,)
Observation state sequence. int dtype. sequence index in B
"""
lens = A.shape[0]
leno = len(y)
P = np.zeros((lens, leno))
V = np.zeros((lens, leno))
bp = np.zeros(leno, dtype=np.int8)
# 因为只有 ~~ 一个，所以不需要像下面那样取 max 处理
for i in range(lens):
V[i, 0] = pi[i] * B[i, y[0]]
P[i, 0] = 0
# 初始化选择所有的 states
index = range(V.shape[0])
for o in range(1, leno):
for s in range(lens):
# 这里的概率是上一层所有的 states 到这一步某一个 state 的概率，array(len(states), )
prob = V[index, o-1] * A[index, s] * B[s, y[o]]
# 选择（实际上是上一层）最大的概率
V[s, o] = max(prob)
# P 的计算用 V[:, o-1] * A[:, s] 更容易理解，表示上一层的最大概率
# 选择上一层最大概率对应的 state
# beam search 时这里需要注意修改（从注释的样子修改为现在的样子）
P[s, o] = index[np.argmax(prob)] # np.argmax(prob)
# 这一步可以使用 beam search 对 S 进行过滤
if beam_size:
index = simple_beam_search(V[:, o], beam_size)
# 因为是往前计算的，所以最后得到的是最后一层的概率，之前的需要回溯
bp[-1] = np.argmax(V[:, -1])
for i in reversed(range(1, leno)):
bp[i-1] = P[bp[i], i]
return V, P, bp
# In[8]:
V, P, bp = viterbi(np.array(range(5)), A, B, pi, beam_size=10)
bp
# In[9]:
get_ipython().run_line_magic('timeit', 'viterbi(np.array(range(5)), A, B, pi, beam_size=None)')
# In[10]:
# 结果很直观
get_ipython().run_line_magic('timeit', 'viterbi(np.array(range(5)), A, B, pi, beam_size=2)')
# In[9]:
# 将 B 反序后，4 3 2 1 0 表示 O 中的每一个 token 在 B 中对应的 index
V, P, bp = viterbi(np.array([4, 3, 2, 1, 0]), A, B, pi)
bp
# In[11]:
[O[o]+"/"+S[s] for o,s in enumerate(bp)]
# In[14]:
V
# In[15]:
P
# In[13]:
viterbi2(np.array(range(len(O))), A, B, pi)
# In[12]:
# from https://stackoverflow.com/questions/9729968/python-implementation-of-viterbi-algorithm
def viterbi2(y, A, B, Pi=None):
"""
Return the MAP estimate of state trajectory of Hidden Markov Model.
Parameters
----------
y : array (T,)
Observation state sequence. int dtype.
A : array (K, K)
State transition matrix. See HiddenMarkovModel.state_transition for
details.
B : array (K, M)
Emission matrix. See HiddenMarkovModel.emission for details.
Pi: optional, (K,)
Initial state probabilities: Pi[i] is the probability x[0] == i. If
None, uniform initial distribution is assumed (Pi[:] == 1/K).
Returns
-------
x : array (T,)
Maximum a posteriori probability estimate of hidden state trajectory,
conditioned on observation sequence y under the model parameters A, B,
Pi.
T1: array (K, T)
the probability of the most likely path so far
T2: array (K, T)
the x_j-1 of the most likely path so far
"""
# Cardinality of the state space
K = A.shape[0]
# Initialize the priors with default (uniform dist) if not given by caller
Pi = Pi if Pi is not None else np.full(K, 1 / K)
T = len(y)
T1 = np.empty((K, T), 'd')
T2 = np.empty((K, T), 'B')
# Initilaize the tracking tables from first observation
T1[:, 0] = Pi * B[:, y[0]]
T2[:, 0] = 0
# Iterate throught the observations updating the tracking tables
for i in range(1, T):
T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1)
T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1)
# Build the output, optimal model trajectory
x = np.empty(T, 'B')
x[-1] = np.argmax(T1[:, T - 1])
for i in reversed(range(1, T)):
x[i - 1] = T2[x[i], i]
return x, T1, T2
# In[ ]:
# In[ ]:
# In[ ]:
~~