这一节主要聊一下马尔可夫链在NLP(自然语言处理)上的一个应用:语言模型,以及如何利用马尔可夫链去生成新的数据

一.语言模型

语言模型的目的其实很简单,就是去判断一句话是不是“人话”,那如何判断呢?可以通过概率来判断,我们可以将一段话看作关于词的序列问题比如“我喜欢吃苹果”可以看作一个序列['我','喜欢','吃','苹果'],于是判断一句话的概率变成了判断一个序列的概率:

$$ p(我喜欢吃苹果)=p(我,喜欢,吃,苹果) $$

那如何计算这些概率呢?我们可以去网上搜集大量的文本数据,比如新闻、论坛...然后进行统计即可,我们再接着分析一下,汉语常用词语量在2000左右,如果判断的句子长度为2,那么我们需要$2000^2=4000000$个单词量的文本才能有效的统计,那如果句子长度为3呢,那么就需要$2000^3=8\cdot10^9$,句子长度为4,就需要$2000^4=16\cdot 10^{12}$...所以,如果随便给一个句子,那么这个句子的概率极有可能为很低、甚至为0!而我们目的是看一句话是否为“人话”,其实是一个相对概率,比如只要我们的语言模型能做到如下的判断,我们就可以认为我们的模型是可以接受的:

$$ p(我,喜欢,吃,苹果)>p(苹果,喜欢,吃,我) $$

那这样我们就可以将对一个句子的全局判断缩小到对其进行一些局部判断,比如“苹果”在“吃”之后的概率应该要大于它在其之前的概率,即:

$$ p(苹果\mid 吃)>p(吃\mid 苹果) $$

而仅考虑局部的关系,我们自然可以引入马尔可夫链咯,即判断一个句子的概率,我们可以简单看作求马尔可夫链的联合概率:

$$ p(x_1,x_2,...,x_n)=p(x_1)\cdot p(x_2\mid x_1)\cdot p(x_3\mid x_2)\cdots p(x_n\mid x_{n-1}) $$

这里$x_i,i=1,2,...,n$分别对应一个词,这里每个词只与它的前一个词有关系,这时的模型称为bi-gram模型,我们可以扩展到与前两个词相关,这时便是对应的二阶马尔可夫链,相应的语言模型称为tri-gram模型,接下来实践看看

二.利用今日头条文本数据训练语言模型

我在这里>>>下载了一些今日头条的文本分类数据并作了预处理后截取了前10000句,每句话间用"\n"分割,每个词之间用空格分割

In [1]:
text=[]
for line in open("./data/toutiao_mini.txt",encoding="utf8"):
    text.append(line.strip().split(" "))
In [2]:
#看看前两行
text[:2]
Out[2]:
[['京城', '最', '值得', '你', '来场', '文化', '之旅', '的', '博物馆'],
 ['发酵', '床', '的', '垫料', '种类', '有', '哪些', '?', '哪', '种', '更好', '?']]
In [3]:
#接下来构建词典,将汉字映射为整数方便模型训练
word2idx={}
idx2word={}
idx=0
for line in text:
    for word in line:
        if word not in word2idx:
            word2idx[word]=idx
            idx2word[idx]=word
            idx+=1
In [4]:
#接下来将汉字转换为整数
train_data=[]
for line in text:
    train_data.append([word2idx[word] for word in line])
In [5]:
#查看前两行
train_data[:2]
Out[5]:
[[0, 1, 2, 3, 4, 5, 6, 7, 8], [9, 10, 7, 11, 12, 13, 14, 15, 16, 17, 18, 15]]
In [6]:
#好了,可训练语言模型了
import os
os.chdir('../')
from ml_models.pgm import SimpleMarkovModel
In [7]:
smm=SimpleMarkovModel(status_num=len(word2idx))
smm.fit(train_data)
del train_data
del text
D:\OneDriver\OneDrive - email.swu.edu.cn\self\learning\self_project\ML_Notes\ml_models\pgm\simple_markov_model.py:33: RuntimeWarning: invalid value encountered in true_divide
  self.P = self.P / np.sum(self.P, axis=0)

模型训练完了,让我们看看概率为0的占比,可以发现绝大部分都很稀疏

In [8]:
import numpy as np
np.sum(smm.P==0)/(smm.P.shape[0]*smm.P.shape[1])
Out[8]:
0.9620576756873548

接下来为概率为0的语句添加一个默认概率

In [9]:
smm.P=np.where(smm.P==0,1.0/smm.P.shape[0],smm.P)

随机说句话看看概率,嗯,情理之中的结果

In [10]:
print(smm.predict_log_joint_prob([word2idx[word] for word in ["我","爱","马云","爸爸"]]))
print(smm.predict_log_joint_prob([word2idx[word] for word in ["马云","爸爸","爱","我"]]))
-29.792768141732772
-36.43292810682986

三.利用马尔可夫链生成数据

既然马尔可夫链是生成模型,那么我们可以利用其来生成数据,这一节介绍两种方式:greedy search和beam search

greedy search即贪婪搜索,每一次都选择使当前时刻概率最大的状态,即

$$ S_{next}^*=arg\max_{S_{next}}p(X_{t}=S_{next}\mid X_{t-1}=S_{current}) $$

其中$S_{current}$已知,所以只需要搜索状态转移概率矩阵就可以得到结果

greedy search最大的缺点就是会隐藏掉低概率状态后面的高概率状态,而beam search的处理方案就要聪明些,它会同时保留目前top K大的结果,最后从这K个结果里面再选择最优的结果

接下来,我们在代码中新增加这部分功能:

def generate_status(self, step_times=10, stop_status=None, set_start_status=None, search_type="greedy", beam_num=5):
        """
        生成状态序列,包括greedy search和beam search两种方式
        :param step_times: 步长不超过 step_times
        :param stop_status: 中止状态列表
        :param set_start_status: 人为设置初始状态
        :param search_type: 搜索策略,包括greedy和beam
        :param beam_num: 只有在search_type="beam"时生效,保留前top个结果
        :return:
        """
        if stop_status is None:
            stop_status = []
        # 初始状态
        start_status = np.random.choice(len(self.pi.reshape(-1)),
                                        p=self.pi.reshape(-1)) if set_start_status is None else set_start_status
        if search_type == "greedy":
            # 贪婪搜索
            rst = [start_status]
            for _ in range(0, step_times):
                next_status = self.predict_next_step_status(current_status=start_status)
                rst.append(next_status)
                if next_status in stop_status:
                    break
        else:
            # beam search
            rst = [start_status]
            top_k_rst = [[start_status]]
            top_k_prob = [0.0]
            for _ in range(0, step_times):
                new_top_k_rst = []
                new_top_k_prob = []
                for k_index, k_rst in enumerate(top_k_rst):
                    k_rst_last_status = k_rst[-1]
                    # 获取前k大的idx
                    top_k_idx = self.P[:, k_rst_last_status].argsort()[::-1][0:beam_num]
                    for top_k_status in top_k_idx:
                        new_top_k_rst.append(k_rst + [top_k_status])
                        new_top_k_prob.append(top_k_prob[k_index] + np.log(1e-12+self.P[top_k_status, k_rst_last_status]))
                # 对所有的beam_num*beam_num个结果排序取前beam_num个结果
                top_rst_idx = np.asarray(new_top_k_prob).argsort()[::-1][0:beam_num]
                rst = new_top_k_rst[top_rst_idx[0]]
                # 更新
                top_k_rst = []
                top_k_prob = []
                for top_idx in top_rst_idx[:beam_num]:
                    if new_top_k_rst[top_idx][-1] in stop_status:
                        rst = new_top_k_rst[top_idx]
                        break
                    else:
                        top_k_rst.append(new_top_k_rst[top_idx])
                        top_k_prob.append(new_top_k_prob[top_idx])
        return rst
In [17]:
[idx2word[idx] for idx in smm.generate_status(search_type="beam",stop_status=[word2idx[word] for word in ["?","!"]])]
Out[17]:
['国产', '航母', '战斗群', '浮出', '水面', ',', '你', '怎么', '对', '下联', '?']
In [ ]: