上一节推导出了$P_w(y\mid x)$求解的矩阵形式,这将会为后续的相关计算提供很大的便捷,但在此之前我们还有一根刺需要拔掉,那就是特征函数,根据上一节的推导知道,特征函数应该长这个样子:

$$ f_k(y_{i-1},y_i,x,i) $$

它是关于$(y_{i-1},y_i,x,i)$的函数,那它通常如何定义呢?下面我就结合CRF++的模板定义以及知乎上的例子进行说明

一.模板定义

crf++中的模板定义语句为%x[row,col],其中row表示选择的观测特征对当前位置的偏移量,col表示所选观测特征的所在列,下面用一个例子做说明,比如我们有如下用作分词训练的语料,其中第0列和第1列是可观测数据($x$),第2列即是我们的标签数据($y$):

词性 分词标记
N B
N E
V B
V M
E E

假设,我们目前的位置在“欢”所在行,即$i=3$,那么按照模板定义,我们可以获取相应的特征取值:

模板 取值
%x[0,0]
%x[0,1] V
%x[-1,0]
%x[-2,1] N
%x[2,1] E
%%x[0,0]/%x[0,1] 欢/V

二.unigram模板

unigram模板对应我们上一节的状态特征函数$s_l(y_i,x,i)$,对应模板格式为U+id+模板定义,比如U01:%x[0,0],每一行模板会生成一组状态特征函数,数量为$L\times N$,$L$是标签状态数,$N$是所在列特征集的数量,比如模板U01:%x[0,0],就会有$3\times 5=15$个特征函数,其中标签状态有{B,E,M},所在列特征集有{北,京,欢,迎,你},那么构成的15个特征函数如下:

func1 = if (output = B and feature=U01:"北") return 1 else return 0   
func2 = if (output = M and feature=U01:"北") return 1 else return 0   
func3 = if (output = E and feature=U01:"北") return 1 else return 0   
func4 = if (output = B and feature=U01:"京") return 1 else return 0   
...   
func13 = if (output = B and feature=U01:"你") return 1 else return 0   
func14 = if (output = M and feature=U01:"你") return 1 else return 0   
func15 = if (output = E and feature=U01:"你") return 1 else return 0

三.bigram模板

bigram模板对应我们上一节的状态特征函数$t_k(y_{i-1},y_i,x,i)$,它会比unigram模板多考虑一个上一标签状态,所以对每一行模板它会产生$L\times L\times N$个特征函数,比如对于B01:%x[0,0],对应的特征函数有:

func1 = if (prev_output = B and output = B and feature=B01:"北") return 1 else return 0   
func2 = if (prev_output = B and output = E and feature=B01:"北") return 1 else return 0  
func3 = if (prev_output = B and output = M and feature=B01:"北") return 1 else return 0  
func4 = if (prev_output = M and output = E and feature=B01:"北") return 1 else return 0  
...   
func44 = if (prev_output = M and output = E and feature=B01:"你") return 1 else return 0  
func45 = if (prev_output = E and output = E and feature=B01:"你") return 1 else return 0

四. CoNLL 2000例子

我们看一下 CoNLL 2000模板的例子:

# Unigram
U00:%x[-2,0]
U01:%x[-1,0]
U02:%x[0,0]
U03:%x[1,0]
U04:%x[2,0]
U05:%x[-1,0]/%x[0,0]
U06:%x[0,0]/%x[1,0]

U10:%x[-2,1]
U11:%x[-1,1]
U12:%x[0,1]
U13:%x[1,1]
U14:%x[2,1]
U15:%x[-2,1]/%x[-1,1]
U16:%x[-1,1]/%x[0,1]
U17:%x[0,1]/%x[1,1]
U18:%x[1,1]/%x[2,1]

U20:%x[-2,1]/%x[-1,1]/%x[0,1]
U21:%x[-1,1]/%x[0,1]/%x[1,1]
U22:%x[0,1]/%x[1,1]/%x[2,1]

# Bigram
B

这里B表示只考虑了前一个标签状态和当前标签状态,而不考虑当前特征取值的情况,它的特征函数形如:

func = if (prev_output = M and output = E) return 1 else return 0

五.代码实现

好哒,让我们自己来实现一把特征函数的生成吧,这里为了方便,就只考虑只有一种特征的情况,对比上面的例子就是用“字”去预测是否“切分”,而不考虑“词性”,即标准CRF中$P(Y\mid X)$的情况,而CRF++可以支持扩展到$P(Y\mid X_1,X_2,...,X_n)$的情况(这里$X_i$是与$Y$一一对应的队列)

In [1]:
import numpy as np

"""
线性链条件随机场的实现,封装到ml_models.pgm
"""

"""
1.实现特征函数的功能
"""


class CRFFeatureFunction(object):
    def __init__(self, unigram_rulers=None, bigram_rulers=None):
        """
        默认输入特征就一种类型
        :param unigram_rulers: 状态特征规则
        :param bigram_rulers: 状态转移规则
        """
        if unigram_rulers is None:
            self.unigram_rulers = [
                [0],  # 当前特征->标签
                [1],  # 后一个特征->标签
                [-1],  # 前一个特征->标签
                [0, 1],  # 当前特征和后一个特征->标签
                [-1, 0]  # 前一个特征和当前特征->标签
            ]
        else:
            self.unigram_rulers = unigram_rulers
        if bigram_rulers is None:
            self.bigram_rulers = [
                None,  # 不考虑当前特征,只考虑前一个标签和当前标签
                [0]  # 当前特征->前一个标签和当前标签
            ]
        else:
            self.bigram_rulers = bigram_rulers
        # 特征函数
        self.feature_funcs = []

    def fit(self, x, y):
        """
        构建特征函数,为了节省空间,训练集x,y中没有出现的特征和标签组合就不考虑了
        :param x: [[...],[...],...,[...]]
        :param y: [[...],[...],...,[...]]
        :return:
        """
        uni_cache = {}
        bi_cache = {}
        for i in range(0, len(x)):
            xi = x[i]
            yi = y[i]
            # 处理unigram_ruler
            for k, unigram_ruler in enumerate(self.unigram_rulers):
                if uni_cache.get(k) is None:
                    uni_cache[k] = []
                for j in range(max(0, 0 - np.min(unigram_ruler)), min(len(xi), len(xi) - np.max(unigram_ruler))):
                    key = "".join(str(item) for item in [xi[pos + j] for pos in unigram_ruler] + [yi[j]])
                    if key in uni_cache[k]:
                        continue
                    else:
                        self.feature_funcs.append([
                            'u',
                            unigram_ruler,
                            [xi[j + pos] for pos in unigram_ruler],
                            yi[j]
                        ])
                        uni_cache[k].append(key)
            # 处理 bigram_ruler
            for k, bigram_ruler in enumerate(self.bigram_rulers):
                if bi_cache.get(k) is None:
                    bi_cache[k] = []
                # B的情况
                if bigram_ruler is None:
                    for j in range(1, len(xi)):
                        key = "B" + "".join([str(yi[j - 1]), str(yi[j])])
                        if key in bi_cache[k]:
                            continue
                        else:
                            self.feature_funcs.append([
                                'B',
                                None,
                                None,
                                [yi[j - 1], yi[j]]
                            ])
                            bi_cache[k].append(key)
                    continue
                # 非B的情况
                for j in range(max(1, 0 - np.min(bigram_ruler)), min(len(xi), len(xi) - np.max(bigram_ruler))):
                    key = "".join(str(item) for item in [xi[pos + j] for pos in bigram_ruler] + [yi[j - 1], yi[j]])
                    if key in bi_cache[k]:
                        continue
                    else:
                        self.feature_funcs.append([
                            'b',
                            bigram_ruler,
                            [xi[j + pos] for pos in bigram_ruler],
                            [yi[j - 1], yi[j]]
                        ])
                        bi_cache[k].append(key)
        del uni_cache
        del bi_cache

    def map(self, y_pre, y_cur, x_tol, i_cur):
        """
        返回是否match特征函数的list
        :param y_pre:
        :param y_cur:
        :param x_tol:
        :param i_cur:
        :return:
        """

        def map_func_(func):
            try:
                gram_type, ruler, xi, yi = func
                if gram_type == "u" and [x_tol[i + i_cur] for i in ruler] == xi and yi == y_cur:
                    return 1
                elif gram_type == "b" and [x_tol[i + i_cur] for i in ruler] == xi and yi == [y_pre, y_cur]:
                    return 1
                elif gram_type == "B" and yi == [y_pre, y_cur]:
                    return 1
                else:
                    return 0
            except:
                # 越界的情况,默认不匹配
                return 0

        return np.asarray(list(map(map_func_, self.feature_funcs)))
In [2]:
# 测试
x = [["我", "爱", "我", "的", "祖", "国"], ["我", "爱"]]
y = [["B", "E", "B", "E", "B", "E"], ["B", "E"]]
ff = CRFFeatureFunction()
ff.fit(x, y)
print(len(ff.feature_funcs))
print(ff.map("B", "E", ["我", "爱", "我", "的"], 3))
31
[0 0 1 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 1 0 0]

feature_funcs表示特征函数的列表,map函数计算当前数据与特征函数集的匹配情况,1表示匹配上,0表示未匹配上

In [ ]: