这一节也在中文分词上看看效果,分词原理以及数据集与之前的《12_11_PGM_HMM_隐马模型实战:中文分词》一样,就不赘述了,这里主要聊一下代码优化,因为如果直接利用前几节的代码进行实践,运行速度会非常非常的慢,优化主要包括两部分,一部分是特征函数,另一部分是梯度更新,下面分别做介绍
前面对特征函数构建以及map(求$f_k(y_{i-1},y_i,x,i)$)都很直接的采用了if...else...
这样的结构,这在实际运行中,效率会很低,笔者的思路是对$f_k(y_{i-1},y_i,x,i)$做一个hash映射,而这个映射是两个向量内积的形式,如下:
这里,假设特征函数只考虑了当前位置$i$的前后一位的情况,$v_k(x_{i-1})$表示特征函数$f_k$对应于$x_{i-1}$的权重值,其他的同理,$b_k$表示$f_k$对应的偏置值,那这里$v_k(\cdot)$以及$b_k$如果取值呢?
下面简单举一个例子,假如的unigram有两个规则:[0],[-1,0],即考虑$x_i->y_i$以及$x_{i-1},x_i->y_i$之间的关系,bigram有一个规则[[0]],即考虑$x_i->y_{i-1},y_i$之间的关系,假设$x_i,i=1,2,..,n$的所有可能取值有$L$种,且$x_i$已经映射到数集$[0,L-1]$中,$y_i,i=1,2,...,n$的所有可能取值有$M$种,且$y_i$也已经映射到数集$[0,M-1]$中,那么:
(1)首先,对于unigram规则[0],我们定义它为第一个特征函数$f_1$,可以定义:
$$ [v_k(x_{i-1}),v_k(x_i),v_k(x_{i+1}),v_k(y_{i-1}),v_k(y_i),b_k]=[0,L,0,0,1,0] $$即$v(x_i)=L,v(y_i)=1$,其他的均为0,其实规则大家也看出来了,则两向量内积结果为:$f_1(y_{-1},y_i,x,i)=x_i\times L+y_i$,其实可以看做$(x_i,y_i)$所有的$L\times M$ 种结果所组成向量的下标
(2)那么,对于unigram规则[[-1,1,0]],定义它为第二个特征函数$f_2$,可以定义:
$$ [v_k(x_{i-1}),v_k(x_i),v_k(x_{i+1}),v_k(y_{i-1}),v_k(y_i),b_k]=[L^2,L,0,0,1,L\times M] $$所以,$f_2(y_{i-1},y_i,x,i)=L^2\times x_{i-1}+L\times x_i+y_i+L\times M$,这里$b_2=L\times M$的目的即是不让后面的映射值与前面起冲突;
(3)同样地,对于bigram规则[0],定义它为第三个特征函数$f_3$,可以定义:
$$ [v_k(x_{i-1}),v_k(x_i),v_k(x_{i+1}),v_k(y_{i-1}),v_k(y_i),b_k]=[0,L\times M,0,M,1,L\times M+L^2\times M] $$所以,$f_3(y_{i-1},y_i,x,i)=L\times M\times x_i+M\times y_{i-1}+y_i+L\times M+L^2\times M$
对于,所有的特征函数$f_1,f_2,f_3$,我们使用一个矩阵来表示即可:
$$ B=\begin{bmatrix} 0 & L & 0 & 0 & 1 & 0\\ L^2 & L & 0 & 0 & 1 & L\times M\\ 0 &L\times M & 0 & M & 1 & L\times M+L^2\times M \end{bmatrix} $$而对于训练数据,我们也可以用一个矩阵来表示:
$$ A=[X_{-1},X,X_{+1},Y_{-1},Y,1] $$这里,$X_{-1}$表示向量$X$后移一位的结果,而$X_{-1}[0]$可用$b_{K+1}$填充,$X_{-1}$的最后一位需要去掉,比如$X=[1,2,3]^T$,它对应的$X_{-1}=[b_{K+1},1,2]$,$b_{K+1}$表示$b_K$再加上$f_K$的range,那么:
$$ AB^T $$中所有排除掉大于$b_{K+1}$后剩下的元素的集合即是我们训练集的特征函数的映射值,我们将这个集合张成一个向量,例如:
$$ F=[12,90,100,345,...,1213,...] $$那么,$F$的大小就对应了特征函数的多少,好了,构建阶段的目的就两个,学到一个象征特征函数的矩阵$B$,一个特征函数合理取值的向量$F$,对于map阶段其实就很简单了....
这一阶段和前面很类似,首先利用预测数据构建一个类似于$A$的矩阵$\hat{A}$,然后统计$\hat{A}B^T$中各个元素命中$F$中元素的次数,即是相应的$F(y,x)$的结果,比如$\hat{A}B^T$的部分结果:
$$ \begin{bmatrix} 12 & ... & 345 \\ ... & ... & 12\\ ... & ... & 90 \end{bmatrix} $$那它对应的$F(y,x)$就可能是$[2,1,0,1,...]$
代码如下,map_,fit_
为保留的之前的操作,便于理解原始的操作流程,map,fit
函数即是优化后的操作,它所要完成的功能与map_,fit_
完全一样
import numpy as np
from tqdm import tqdm
"""
实现特征函数的功能
"""
class CRFFeatureFunction(object):
def __init__(self, unigram_rulers=None, bigram_rulers=None, output_status_num=None, input_status_num=None):
"""
默认输入特征就一种类型
:param unigram_rulers: 状态特征规则
:param bigram_rulers: 状态转移规则
:param output_status_num:输出状态数
:param input_status_num:输入状态数
"""
self.output_status_num = output_status_num
self.input_status_num = input_status_num
if unigram_rulers is None:
self.unigram_rulers = [
[0], # 当前特征->标签
[1], # 后一个特征->标签
[-1], # 前一个特征->标签
[0, 1], # 当前特征和后一个特征->标签
[-1, 0] # 前一个特征和当前特征->标签
]
else:
self.unigram_rulers = unigram_rulers
if bigram_rulers is None:
self.bigram_rulers = [
None, # 不考虑当前特征,只考虑前一个标签和当前标签
[0] # 当前特征->前一个标签和当前标签
]
else:
self.bigram_rulers = bigram_rulers
# 特征函数
self.feature_funcs = None
self.x_tol_bias = None
self.tol_hash_weights = None
self.max_range = None
def fit_(self, x, y):
"""
弃用,请使用fit
构建特征函数,为了节省空间,训练集x,y中没有出现的特征和标签组合就不考虑了
:param x: [[...],[...],...,[...]]
:param y: [[...],[...],...,[...]]
:return:
"""
uni_cache = {}
bi_cache = {}
for i in range(0, len(x)):
xi = x[i]
yi = y[i]
# 处理unigram_ruler
for k, unigram_ruler in enumerate(self.unigram_rulers):
if uni_cache.get(k) is None:
uni_cache[k] = []
for j in range(max(0, 0 - np.min(unigram_ruler)), min(len(xi), len(xi) - np.max(unigram_ruler))):
key = "".join(str(item) for item in [xi[pos + j] for pos in unigram_ruler] + [yi[j]])
if key in uni_cache[k]:
continue
else:
self.feature_funcs.append([
'u',
unigram_ruler,
[xi[j + pos] for pos in unigram_ruler],
yi[j]
])
uni_cache[k].append(key)
# 处理 bigram_ruler
for k, bigram_ruler in enumerate(self.bigram_rulers):
if bi_cache.get(k) is None:
bi_cache[k] = []
# B的情况
if bigram_ruler is None:
for j in range(1, len(xi)):
key = "B" + "".join([str(yi[j - 1]), str(yi[j])])
if key in bi_cache[k]:
continue
else:
self.feature_funcs.append([
'B',
None,
None,
[yi[j - 1], yi[j]]
])
bi_cache[k].append(key)
continue
# 非B的情况
for j in range(max(1, 0 - np.min(bigram_ruler)), min(len(xi), len(xi) - np.max(bigram_ruler))):
key = "".join(str(item) for item in [xi[pos + j] for pos in bigram_ruler] + [yi[j - 1], yi[j]])
if key in bi_cache[k]:
continue
else:
self.feature_funcs.append([
'b',
bigram_ruler,
[xi[j + pos] for pos in bigram_ruler],
[yi[j - 1], yi[j]]
])
bi_cache[k].append(key)
del uni_cache
del bi_cache
def map_(self, y_pre, y_cur, x_tol, i_cur):
"""
弃用,请使用map
返回是否match特征函数的list
:param y_pre:
:param y_cur:
:param x_tol:
:param i_cur:
:return:
"""
def map_func_(func):
try:
gram_type, ruler, xi, yi = func
if gram_type == "u" and [x_tol[i + i_cur] for i in ruler] == xi and yi == y_cur:
return 1
elif gram_type == "b" and [x_tol[i + i_cur] for i in ruler] == xi and yi == [y_pre, y_cur]:
return 1
elif gram_type == "B" and yi == [y_pre, y_cur]:
return 1
else:
return 0
except:
# 越界的情况,默认不匹配
return 0
return np.asarray(list(map(map_func_, self.feature_funcs)))
def fit(self, x, y):
"""
:param x:[[...],[...],...,[...]]
:param y: [[...],[...],...,[...]]
:return:
"""
# 收集所有的x的bias项
x_tol_bias = set()
for ruler in self.unigram_rulers:
x_tol_bias |= set(ruler)
for ruler in self.bigram_rulers:
if ruler is not None:
x_tol_bias |= set(ruler)
x_tol_bias = sorted(list(x_tol_bias))
self.x_tol_bias = x_tol_bias
x_bias_map = {}
for index, value in enumerate(x_tol_bias):
x_bias_map[value] = index
"""
1.构建hash映射函数的权重
"""
self.tol_hash_weights = []
bias = 0
for unigram_ruler in self.unigram_rulers:
bias_append = 1
weights = [0] * len(x_tol_bias)
for index in range(0, len(unigram_ruler)):
bias_append *= self.input_status_num
weights[x_bias_map[unigram_ruler[index]]] = np.power(self.input_status_num, len(unigram_ruler) - index)
weights.extend([1, 0])
bias_append *= self.output_status_num
weights.append(bias)
# 前面n-1个为内积向量,最后一个为bias
self.tol_hash_weights.append(weights)
bias += bias_append
for bigram_ruler in self.bigram_rulers:
bias_append = 1
weights = [0] * len(x_tol_bias)
if bigram_ruler is None:
weights = weights + [self.output_status_num, 1, bias]
self.tol_hash_weights.append(weights)
bias += self.output_status_num * self.output_status_num
else:
for index in range(0, len(bigram_ruler)):
bias_append *= self.input_status_num
weights[x_bias_map[bigram_ruler[index]]] = np.power(self.input_status_num, len(
bigram_ruler) - index) * self.output_status_num
weights.extend([self.output_status_num, 1, bias])
self.tol_hash_weights.append(weights)
bias_append = bias_append * self.output_status_num * self.output_status_num
bias += bias_append
self.tol_hash_weights = np.asarray(self.tol_hash_weights)
self.max_range = bias + 1
"""
2.构建feature_funcs
"""
print("构造特征函数...")
self.feature_funcs = set()
for i in tqdm(range(0, len(x))):
xi = x[i]
yi = y[i]
tol_data = []
# 添加x
for pos in self.x_tol_bias:
if pos < 0:
data = [self.max_range] * abs(pos) + xi[0:len(xi) + pos]
elif pos > 0:
data = xi[0 + pos:len(xi)] + [self.max_range] * abs(pos)
else:
data = xi
tol_data.append(data)
# 添加y
tol_data.append(yi)
tol_data.append([self.max_range] + yi[0:-1])
# 添加bias
tol_data.append([1] * len(tol_data[-1]))
tol_data = np.asarray(tol_data)
self.feature_funcs |= set(
[item for item in tol_data.T.dot(self.tol_hash_weights.T).reshape(-1) if
item >= 0 and item < self.max_range])
new_feature_func = {}
for index, item in enumerate(self.feature_funcs):
new_feature_func[item] = index
self.feature_funcs = new_feature_func
print("特征函数数量:", len(self.feature_funcs))
def map(self, y_pre, y_cur, x_tol, i_cur):
"""
即求f(y_{i-1},y_i,x,i)
:param y_pre:
:param y_cur:
:param x_tol:
:param i_cur:
:return:
"""
vec = []
rst = np.zeros(len(self.feature_funcs))
for pos in self.x_tol_bias:
# 越界
if pos + i_cur < 0 or pos + i_cur >= len(x_tol):
vec.append(self.max_range)
else:
vec.append(x_tol[i_cur + pos])
vec.extend([y_cur, y_pre, 1])
vec = np.asarray([vec])
tol_features = vec.dot(self.tol_hash_weights.T).reshape(-1)
for feature in tol_features:
feature_index = self.feature_funcs.get(feature)
if feature_index:
rst[feature_index] += 1
return rst
def map_sequence(self, y, x):
"""
即求F(y,x)
:param y:
:param x:
:return:
"""
tol_data = []
# 添加x
for pos in self.x_tol_bias:
if pos < 0:
data = [self.max_range] * abs(pos) + x[0:len(x) + pos]
elif pos > 0:
data = x[0 + pos:len(x)] + [self.max_range] * abs(pos)
else:
data = x
tol_data.append(data)
# 添加y
tol_data.append(y)
tol_data.append([self.max_range] + y[0:-1])
# 添加bias
tol_data.append([1] * len(tol_data[-1]))
tol_data = np.asarray(tol_data)
# 与hashmap做内积
tol_features = tol_data.T.dot(self.tol_hash_weights.T).reshape(-1)
# 统计结果
rst = np.zeros(len(self.feature_funcs))
for feature in tol_features:
feature_index = self.feature_funcs.get(feature)
if feature_index:
rst[feature_index] += 1
return rst
这部分就做的比较粗暴了...,看一下前面的梯度函数:
$$ g(w^t)=\sum_{j=1}^N(P_{w^t}(y_j\mid x_j)-1)F(y_j,x_j)) $$发现$P_{w^t}(y_j\mid x_j)$主要起到调节步长的作用,而它的计算量很大,所以想到的一个简单的方法就是将它去掉,哈哈哈~~~ ,在fit
阶段添加了一个if_drop_p
的参数来决定是否要计算$P_{w^t}(y_j\mid x_j)$这一项,默认不计算,修改后的代码如下
"""
线性链CRF的实现
"""
class CRF(object):
def __init__(self, epochs=10, lr=1e-3, output_status_num=None, input_status_num=None, unigram_rulers=None,
bigram_rulers=None):
"""
:param epochs: 迭代次数
:param lr: 学习率
:param output_status_num:标签状态数
:param input_status_num:输入状态数
:param unigram_rulers: 状态特征规则
:param bigram_rulers: 状态转移规则
"""
self.epochs = epochs
self.lr = lr
# 为输入序列和标签状态序列添加一个头尾id
self.output_status_num = output_status_num + 2
self.input_status_num = input_status_num + 2
self.input_status_head_tail = [input_status_num, input_status_num + 1]
self.output_status_head_tail = [output_status_num, output_status_num + 1]
# 特征函数
self.FF = CRFFeatureFunction(unigram_rulers, bigram_rulers, input_status_num=self.input_status_num,
output_status_num=self.output_status_num)
# 模型参数
self.w = None
def fit(self, x, y, if_drop_p=True):
"""
:param x: [[...],[...],...,[...]]
:param y: [[...],[...],...,[...]]
:param if_drop_p:是否去掉P_w(x)项
:return
"""
# 为 x,y加头尾
x = [[self.input_status_head_tail[0]] + xi + [self.input_status_head_tail[1]] for xi in x]
y = [[self.output_status_head_tail[0]] + yi + [self.output_status_head_tail[1]] for yi in y]
self.FF.fit(x, y)
self.w = np.ones(len(self.FF.feature_funcs)) * 1e-5
if if_drop_p:
print("训练进度...")
for i in tqdm(range(0, len(x))):
xi = x[i]
yi = y[i]
self.w = self.w + self.epochs * self.lr * self.FF.map_sequence(yi, xi)
else:
for epoch in range(0, self.epochs):
# 偷个懒,用随机梯度下降
print("\n 模型训练,epochs:" + str(epoch + 1) + "/" + str(self.epochs))
for i in tqdm(range(0, len(x))):
xi = x[i]
yi = y[i]
"""
1.求F(yi \mid xi)以及P_w(yi \mid xi)
"""
F_y_x = self.FF.map_sequence(yi, xi)
Z_x = np.ones(shape=(self.output_status_num, 1)).T
for j in range(1, len(xi)):
# 构建M矩阵
M = np.zeros(shape=(self.output_status_num, self.output_status_num))
for k in range(0, self.output_status_num):
for t in range(0, self.output_status_num):
M[k, t] = np.exp(np.dot(self.w, self.FF.map(k, t, xi, j)))
# 前向算法求 Z(x)
Z_x = Z_x.dot(M)
Z_x = np.sum(Z_x)
P_w = np.exp(np.dot(self.w, F_y_x)) / Z_x
"""
2.求梯度,并更新
"""
dw = (P_w - 1) * F_y_x
self.w = self.w - self.lr * dw
def predict(self, x):
"""
维特比求解最优的y
:param x:[...]
:return:
"""
# 为x加头尾
x = [self.input_status_head_tail[0]] + x + [self.input_status_head_tail[1]]
# 初始化
delta = np.asarray([np.dot(self.w, self.FF.map(self.output_status_head_tail[0], j, x, 1)) for j in
range(0, self.output_status_num)])
psi = [[0] * self.output_status_num]
# 递推
for visible_index in range(2, len(x) - 1):
new_delta = np.zeros_like(delta)
new_psi = []
# 当前节点
for i in range(0, self.output_status_num):
best_pre_index_i = -1
best_pre_index_value_i = 0
delta_i = 0
# 上一轮节点
for j in range(0, self.output_status_num):
delta_i_j = delta[j] + np.dot(self.w, self.FF.map(j, i, x, visible_index))
if delta_i_j > delta_i:
delta_i = delta_i_j
best_pre_index_value_i_j = delta[j] + np.dot(self.w, self.FF.map(j, i, x, visible_index))
if best_pre_index_value_i_j > best_pre_index_value_i:
best_pre_index_value_i = best_pre_index_value_i_j
best_pre_index_i = j
new_delta[i] = delta_i
new_psi.append(best_pre_index_i)
delta = new_delta
psi.append(new_psi)
# 回溯
best_hidden_status = [np.argmax(delta)]
for psi_index in range(len(x) - 3, 0, -1):
next_status = psi[psi_index][best_hidden_status[-1]]
best_hidden_status.append(next_status)
best_hidden_status.reverse()
return best_hidden_status
visible_seqs=[]
hidden_seqs=[]
char2idx={}
idx2hidden={0:"B",1:"M",2:"E",3:"S"}
count=0
for line in open("./data/people_daily_mini.txt",encoding="utf8"):
visible_seq=[]
hidden_seq=[]
arrs=line.strip().split(" ")
for item in arrs:
if len(item)==1:
hidden_seq.append(3)
elif len(item)==2:
hidden_seq.extend([0,2])
else:
hidden_seq.extend([0]+[1]*(len(item)-2)+[2])
for c in item:
if c in char2idx:
visible_seq.append(char2idx[c])
else:
char2idx[c]=count
visible_seq.append(count)
count+=1
visible_seqs.append(visible_seq)
hidden_seqs.append(hidden_seq)
len(visible_seqs),len(hidden_seqs),len(char2idx)
(1087083, 1087083, 4656)
训练模型,请耐心等待...哈哈~~~
crf=CRF(output_status_num=4,input_status_num=len(char2idx),epochs=1,unigram_rulers=[[0],[-1,0]],bigram_rulers=[None])
crf.fit(visible_seqs,hidden_seqs)
构造特征函数...
100%|██████████████████████████████████████████████████████████████████████| 1087083/1087083 [05:41<00:00, 3179.33it/s]
特征函数数量: 21520 训练进度...
100%|██████████████████████████████████████████████████████████████████████| 1087083/1087083 [07:15<00:00, 2494.64it/s]
看看分词效果
def seg(vis,hid):
rst=[]
for i in range(0,len(hid)):
if hid[i] in [2,3]:
rst.append(vis[i])
rst.append(" ")
else:
rst.append(vis[i])
return "".join(rst)
seq="我和我的祖国,一刻也不能分离"
seg(seq,crf.predict([char2idx[c] for c in seq]))
'我和 我的 祖国 ,一 刻也 不能 分离 '
seq="小龙女说,我也想过过过过过过过的生活"
seg(seq,crf.predict([char2idx[c] for c in seq]))
'小龙 女说 , 我也 想过 过过 过过 过过 的 生活 '
seq="我爱北京天安门"
seg(seq,crf.predict([char2idx[c] for c in seq]))
'我爱 北京 天 安门 '
效果不太make sense,个人分析可能有以下的原因:
(1)特征空间不够大,主要是特征函数有些单一,可以再造一些复杂点的特征函数进去;
(2)对于某些模式,可以造一些带惩罚的特征函数(返回负值),比如上面的“,一”是怎么也不可能分在一起的;
待后续优化,哈哈~~~