一.CRF的维特比算法

条件随机场的预测问题是给定条件随机场$P(Y\mid X)$和输入序列(观测序列)$x$,求条件概率最大的输出序列(标记序列)$y^*$的过程,即:

$$ y^*=arg\max_yP_w(y\mid x)\\ =arg\max_y\frac{exp(w^TF(y,x))}{Z_w(x)}\\ =arg\max_yexp(w^TF(y,x))\\ =arg\max_yw^TF(y,x) $$

可以发现,少了求$Z_w(x)$,计算量少了很多,上面的式子中:

$$ w=(w_1,w_2,...,w_K)^T\\ F(y,x)=(f_1(y,x),f_2(y,x),...,f_K(y,x))^T\\ f_k(y,x)=\sum_{i=1}^nf_k(y_{i-1},y_i,x,i),k=1,2,...,K $$

接下来将上面的$w^TF(y,x)$进行改写:

$$ y^*=arg\max_y\sum_{i=1}^nw^TF_i(y_{i-1},y_i,x) $$

其中:

$$ F_i(y_{i-1},y_i,x)=(f_1(y_{i-1},y_i,x,i)),f_2(y_{i-1},y_i,x,i)),...,f_K(y_{i-1},y_i,x,i)))^T $$

这个问题的求解,其实与之前的HMM问题一样(链接>>>),同样是求DAG图中的最优路径问题,同样的,我们用符号$\delta_i(l)$表示时刻$i$标签状态为$l$的所有路径中的最优值,$\psi_i(l)$记录$i$的前一步的标签状态,下面对算法流程做一个完整的梳理

算法流程

输入:模型特征向量$F(y,x)$和权值向量$w$,观测序列$x=(x_1,x_2,...,x_n)$;
输出:最优路径$y^*=(y_1^*,y_2^*,...,y_n^*)$

(1)初始化
$$ \delta_1(j)=w^TF_1(y_0=start,y_1=j,x),j=1,2,...,m $$

(2)对$i=2,3,...,n$
$$ \delta_i(l)=\max_{1\leq j\leq m}(\delta_{i-1}(j)+w^TF_i(y_{i-1}=j,y_i=l,x)),l=1,2,...,m\\ \psi_i(l)=arg\max_{1\leq j\leq m}(\delta_{i-1}(j)+w^TF_i(y_{i-1}=j,y_i=l,x)),l=1,2,...,m $$
(3)终止

$$ y_n^*=arg\max_{1\leq j \leq m}\delta_n(j) $$

(4)回溯路径
$$ y_i^*=\psi_{i+1}(y_{i+1}^*),i=n-1,n-2,...,1 $$

求得最后路径:$y^*=(y_1^*,y_2^*,...,y_n^*)$

二.代码实现

在上一小节的内容上追加...

In [1]:
import os
os.chdir('../')
from ml_models.pgm import CRFFeatureFunction
import numpy as np

"""
线性链CRF的实现,封装到ml_models.pgm
"""

class CRF(object):
    def __init__(self, epochs=10, lr=1e-3, tol=1e-5, output_status_num=None, input_status_num=None, unigram_rulers=None,
                 bigram_rulers=None):
        """
        :param epochs: 迭代次数
        :param lr: 学习率
        :param tol:梯度更新的阈值
        :param output_status_num:标签状态数
        :param input_status_num:输入状态数
        :param unigram_rulers: 状态特征规则
        :param bigram_rulers: 状态转移规则
        """
        self.epochs = epochs
        self.lr = lr
        self.tol = tol
        # 为输入序列和标签状态序列添加一个头尾id
        self.output_status_num = output_status_num + 2
        self.input_status_num = input_status_num + 2
        self.input_status_head_tail = [input_status_num, input_status_num + 1]
        self.output_status_head_tail = [output_status_num, output_status_num + 1]
        # 特征函数
        self.FF = CRFFeatureFunction(unigram_rulers, bigram_rulers)
        # 模型参数
        self.w = None

    def fit(self, x, y):
        """
        :param x: [[...],[...],...,[...]]
        :param y: [[...],[...],...,[...]]
        :return
        """
        # 为 x,y加头尾
        x = [[self.input_status_head_tail[0]] + xi + [self.input_status_head_tail[1]] for xi in x]
        y = [[self.output_status_head_tail[0]] + yi + [self.output_status_head_tail[1]] for yi in y]
        self.FF.fit(x, y)
        self.w = np.ones(len(self.FF.feature_funcs)) * 1e-5
        for _ in range(0, self.epochs):
            # 偷个懒,用随机梯度下降
            for i in range(0, len(x)):
                xi = x[i]
                yi = y[i]
                """
                1.求F(yi \mid xi)以及P_w(yi \mid xi)
                """
                F_y_x = []
                Z_x = np.ones(shape=(self.output_status_num, 1)).T
                for j in range(1, len(xi)):
                    F_y_x.append(self.FF.map(yi[j - 1], yi[j], xi, j))
                    # 构建M矩阵
                    M = np.zeros(shape=(self.output_status_num, self.output_status_num))
                    for k in range(0, self.output_status_num):
                        for t in range(0, self.output_status_num):
                            M[k, t] = np.exp(np.dot(self.w, self.FF.map(k, t, xi, j)))
                    # 前向算法求 Z(x)
                    Z_x = Z_x.dot(M)
                F_y_x = np.sum(F_y_x, axis=0)
                Z_x = np.sum(Z_x)
                # 求P_w(yi \mid xi)
                P_w = np.exp(np.dot(self.w, F_y_x)) / Z_x
                """
                2.求梯度,并更新
                """
                dw = (P_w - 1) * F_y_x
                self.w = self.w - self.lr * dw
                if (np.sqrt(np.dot(dw, dw) / len(dw))) < self.tol:
                    break

    def predict(self, x):
        """
        维特比求解最优的y
        :param x:[...]
        :return:
        """
        # 为x加头尾
        x = [self.input_status_head_tail[0]] + x + [self.input_status_head_tail[1]]
        # 初始化
        delta = np.asarray([np.dot(self.w, self.FF.map(self.output_status_head_tail[0], j, x, 1)) for j in
                            range(0, self.output_status_num)])
        psi = [[0] * self.output_status_num]
        # 递推
        for visible_index in range(2, len(x) - 1):
            new_delta = np.zeros_like(delta)
            new_psi = []
            # 当前节点
            for i in range(0, self.output_status_num):
                best_pre_index_i = -1
                best_pre_index_value_i = 0
                delta_i = 0
                # 上一轮节点
                for j in range(0, self.output_status_num):
                    delta_i_j = delta[j] + np.dot(self.w, self.FF.map(j, i, x, visible_index))
                    if delta_i_j > delta_i:
                        delta_i = delta_i_j
                    best_pre_index_value_i_j = delta[j] + np.dot(self.w, self.FF.map(j, i, x, visible_index))
                    if best_pre_index_value_i_j > best_pre_index_value_i:
                        best_pre_index_value_i = best_pre_index_value_i_j
                        best_pre_index_i = j
                new_delta[i] = delta_i
                new_psi.append(best_pre_index_i)
            delta = new_delta
            psi.append(new_psi)
        # 回溯
        best_hidden_status = [np.argmax(delta)]
        for psi_index in range(len(x) - 3, 0, -1):
            next_status = psi[psi_index][best_hidden_status[-1]]
            best_hidden_status.append(next_status)
        best_hidden_status.reverse()
        return best_hidden_status
In [2]:
# 测试一下
x = [
    [1, 2, 3, 0, 1, 3, 4],
    [1, 2, 3],
    [0, 2, 4, 2],
    [4, 3, 2, 1],
    [3, 1, 1, 1, 1],
    [2, 1, 3, 2, 1, 3, 4]
]
y = x

crf = CRF(output_status_num=5, input_status_num=5)
crf.fit(x, y)
crf.predict(x[0])
Out[2]:
[1, 2, 3, 0, 1, 3, 4]

预测结果一样就对了...

In [ ]: