一.维特比算法

隐状态预测是指在$P(O\mid\lambda)$给定的情况下,求一个隐状态序列$I$,使得$P(O,I\mid\lambda)$最大,比如下图,有三种隐状态,求在$O=(o_1,o_2,o_3,o_4)$的情况下,选择一条最优路径使得$P(O,I\mid\lambda)$最大,假设为图中的红色路径,那么最优路径为$i_1=q_1,i_2=q_2,i_3=q_1,i_4=q_3$ avatar

那如何求最优路径呢?下面这句话画个重点:

如果求得$i_1\rightarrow i_4$的最优路径为$i_1^*\rightarrow i_4^*$,那么$i_3^*\rightarrow i_4^*$必然为$i_3\rightarrow i_4$的最优路径

将时刻$t$状态为$i$的所有路径中的概率最大值表示为$\delta_t(i)$,那么在$\delta_3(1),\delta_3(2),\delta_3(3)$已知的情况下,要求$\delta_4(\cdot)$,我们只需在所有$i_3\rightarrow i_4$的9条路径中找出最优的一条即可,即这时可以将前面的看做黑箱即可 avatar

在$i_3\rightarrow i_4$这一步,我们是假设$\delta_3(\cdot)$已知了,那如何求解$\delta_3(\cdot)$呢?我们继续往前在$i_2\rightarrow i_3$的9条路径中选择即可,即:

avatar

所以,递推关系就出来了:

$$ \delta_t(i)=\max_{1\leq j\leq N}[\delta_{t-1}(j)a_{ji}]b_i(o_t),i=1,2,..,N $$

由于我们需要求得各节点的状态序列,所以还需要去保存我们的路径状态,从上面的递推过程也可以看出,我们在第$i_4$步的时候就可以确定使其最概率最大的第$i_3$步的取值了,所以我们做一个定义,在时刻$t$状态为$i$的所有单个路径$(i_1,i_2,...,i_{t-1},i_t)$中概率最大的路径的第$t-1$个节点为:

$$ \psi_t(i)=arg\max_{1\leq j\leq N}[\delta_{t-1}(j)a_{ji}],i=1,2,...,N $$

接下来叙述完整的流程

算法流程

已知模型参数$\lambda=(A,B,\pi)$和观测数据$O=(o_1,o_2,...,o_T)$:

(1)初始化

$$ \delta_1(i)=\pi_ib_i(o_1),i=1,2,...,N\\ \psi_t(i)=0,i=1,2,...,N $$

(2)递推,对$t=2,3,...,T$

$$ \delta_t(i)=\max_{1\leq j\leq N}[\delta_{t-1}(j)a_{ji}]b_i(o_t),i=1,2,..,N\\ \psi_t(i)=arg\max_{1\leq j\leq N}[\delta_{t-1}(j)a_{ji}],i=1,2,...,N $$

(3)终止

$$ P^*=\max_{1\leq i\leq N}\delta_T(i)\\ i_T^*=arg\max_{1\leq i\leq N}[\delta_T(i)] $$

(4)回溯最优路径,对$t=T-1,T-2,...,1$

$$ i_t^*=\psi_{t+1}(i_{t+1}^*) $$

返回最优路径$I^*=(i_1^*,i_2^*,...,i_T^*)$

二.代码实现

继续在上一节的代码上添加...

In [1]:
import numpy as np

"""
隐马尔科夫模型实现,封装到ml_models.pgm
"""

class HMM(object):
    def __init__(self, hidden_status_num=None, visible_status_num=None):
        """
        :param hidden_status_num: 隐状态数
        :param visible_status_num: 观测状态数
        """
        self.hidden_status_num = hidden_status_num
        self.visible_status_num = visible_status_num
        # 定义HMM的参数
        self.pi = None  # 初始隐状态概率分布 shape:[hidden_status_num,1]
        self.A = None  # 隐状态转移概率矩阵 shape:[hidden_status_num,hidden_status_num]
        self.B = None  # 观测状态概率矩阵 shape:[hidden_status_num,visible_status_num]

    def predict_joint_visible_prob(self, visible_list=None, forward_type="forward"):
        """
        前向/后向算法计算观测序列出现的概率值
        :param visible_list:
        :param forward_type:forward前向,backward后向
        :return:
        """
        if forward_type == "forward":
            # 计算初始值
            alpha = self.pi * self.B[:, [visible_list[0]]]
            # 递推
            for step in range(1, len(visible_list)):
                alpha = self.A.T.dot(alpha) * self.B[:, [visible_list[step]]]
            # 求和
            return np.sum(alpha)
        else:
            # 计算初始值
            beta = np.ones_like(self.pi)
            # 递推
            for step in range(len(visible_list) - 2, -1, -1):
                beta = self.A.dot(self.B[:, [visible_list[step + 1]]] * beta)
            # 最后一步
            return np.sum(self.pi * self.B[:, [visible_list[0]]] * beta)

    def fit_with_hidden_status(self, visible_list, hidden_list):
        """
        包含隐状态的参数估计
        :param visible_list: [[],[],...,[]]
        :param hidden_list: [[],[],...,[]]
        :return:
        """
        self.pi = np.zeros(shape=(self.hidden_status_num, 1))
        self.A = np.zeros(shape=(self.hidden_status_num, self.hidden_status_num))
        self.B = np.zeros(shape=(self.hidden_status_num, self.visible_status_num))
        for i in range(0, len(visible_list)):
            visible_status = visible_list[i]
            hidden_status = hidden_list[i]
            self.pi[hidden_status[0]] += 1
            for j in range(0, len(hidden_status) - 1):
                self.A[hidden_status[j], hidden_status[j + 1]] += 1
                self.B[hidden_status[j], visible_status[j]] += 1
            self.B[hidden_status[j + 1], visible_status[j + 1]] += 1
        # 归一化
        self.pi = self.pi / np.sum(self.pi)
        self.A = self.A / np.sum(self.A, axis=0)
        self.B = self.B / np.sum(self.B, axis=0)

    def fit_without_hidden_status(self, visible_list=None, tol=1e-5, n_iter=10):
        """
        不包含隐状态的参数估计:Baum-Welch算法
        :param visible_list: [...]
        :param tol:当pi,A,B的增益值变化小于tol时终止
        :param n_iter:迭代次数
        :return:
        """
        # 初始化参数
        if self.pi is None:
            self.pi = np.ones(shape=(self.hidden_status_num, 1)) + np.random.random(size=(self.hidden_status_num, 1))
            self.pi = self.pi / np.sum(self.pi)
        if self.A is None:
            self.A = np.ones(shape=(self.hidden_status_num, self.hidden_status_num)) + np.random.random(
                size=(self.hidden_status_num, self.hidden_status_num))
            self.A = self.A / np.sum(self.A, axis=0)
        if self.B is None:
            self.B = np.ones(shape=(self.hidden_status_num, self.visible_status_num)) + np.random.random(
                size=(self.hidden_status_num, self.visible_status_num))
            self.B = self.B / np.sum(self.B, axis=0)
        for _ in range(0, n_iter):
            # 计算前向概率
            alphas = []
            alpha = self.pi * self.B[:, [visible_list[0]]]
            alphas.append(alpha)
            for step in range(1, len(visible_list)):
                alpha = self.A.T.dot(alpha) * self.B[:, [visible_list[step]]]
                alphas.append(alpha)
            # 计算后向概率
            betas = []
            beta = np.ones_like(self.pi)
            betas.append(beta)
            for step in range(len(visible_list) - 2, -1, -1):
                beta = self.A.dot(self.B[:, [visible_list[step + 1]]] * beta)
                betas.append(beta)
            betas.reverse()
            # 计算gamma值
            gammas = []
            for i in range(0, len(alphas)):
                gammas.append((alphas[i] * betas[i])[:, 0])
            gammas = np.asarray(gammas)
            # 计算xi值
            xi = np.zeros_like(self.A)
            for i in range(0, self.hidden_status_num):
                for j in range(0, self.hidden_status_num):
                    xi_i_j = 0.0
                    for t in range(0, len(visible_list) - 1):
                        xi_i_j += alphas[t][i][0] * self.A[i, j] * self.B[j, visible_list[t + 1]] * \
                                  betas[t + 1][j][0]
                    xi[i, j] = xi_i_j
            loss = 0.0  # 统计累计误差
            # 更新参数
            # 初始概率
            for i in range(0, self.hidden_status_num):
                new_pi_i = gammas[0][i]
                loss += np.abs(new_pi_i - self.pi[i][0])
                self.pi[i] = new_pi_i
            # 隐状态转移概率
            for i in range(0, self.hidden_status_num):
                for j in range(0, self.hidden_status_num):
                    new_a_i_j = xi[i, j] / np.sum(gammas[:, i][:-1])
                    loss += np.abs(new_a_i_j - self.A[i, j])
                    self.A[i, j] = new_a_i_j
            # 观测概率矩阵
            for j in range(0, self.hidden_status_num):
                for k in range(0, self.visible_status_num):
                    new_b_j_k = np.sum(gammas[:, j] * (np.asarray(visible_list) == k)) / np.sum(gammas[:, j])
                    loss += np.abs(new_b_j_k - self.B[j, k])
                    self.B[j, k] = new_b_j_k
            # 归一化
            self.pi = self.pi / np.sum(self.pi)
            self.A = self.A / np.sum(self.A, axis=0)
            self.B = self.B / np.sum(self.B, axis=0)
            if loss < tol:
                break

    def predict_hidden_status(self, visible_list):
        """
        维特比算法解码概率最大的隐状态
        :param visible_list:
        :return:
        """
        # 初始化
        delta = self.pi * self.B[:, [visible_list[0]]]
        psi = [[0] * self.hidden_status_num]
        # 递推
        for visible_index in range(1, len(visible_list)):
            new_delta = np.zeros_like(delta)
            new_psi = []
            # 当前节点
            for i in range(0, self.hidden_status_num):
                best_pre_index_i = -1
                best_pre_index_value_i = 0
                delta_i = 0
                # 上一轮节点
                for j in range(0, self.hidden_status_num):
                    delta_i_j = delta[j][0] * self.A[j, i] * self.B[i, visible_list[visible_index]]
                    if delta_i_j > delta_i:
                        delta_i = delta_i_j
                    best_pre_index_value_i_j = delta[j][0] * self.A[j, i]
                    if best_pre_index_value_i_j > best_pre_index_value_i:
                        best_pre_index_value_i = best_pre_index_value_i_j
                        best_pre_index_i = j
                new_delta[i, 0] = delta_i
                new_psi.append(best_pre_index_i)
            delta = new_delta
            psi.append(new_psi)
        # 回溯
        best_hidden_status = [np.argmax(delta)]
        for psi_index in range(len(visible_list) - 1, 0, -1):
            next_status = psi[psi_index][best_hidden_status[-1]]
            best_hidden_status.append(next_status)
        best_hidden_status.reverse()
        return best_hidden_status
In [2]:
#用例10.3做测试
pi = np.asarray([[0.2], [0.4], [0.4]])
A = np.asarray([[0.5, 0.2, 0.3],
                [0.3, 0.5, 0.2],
                [0.2, 0.3, 0.5]])
B = np.asarray([[0.5, 0.5],
                [0.4, 0.6],
                [0.7, 0.3]])

hmm = HMM(hidden_status_num=3, visible_status_num=2)
hmm.pi = pi
hmm.A = A
hmm.B = B
print(hmm.predict_hidden_status([0, 1, 0]))
[2, 2, 2]
In [ ]: