一.马尔可夫链简介

这一节聊一下另一个非常实用的模型:马尔科夫链,它的概率图模型如下:
avatar

自然,它的联合概率分布公式可以写作如下:

$$ p(X_1,X_2,...,X_n)=p(X_1)\prod_{i=2}^np(X_i\mid X_{i-1}) $$

我们通常处理的马尔科夫链是齐次、离散、有限状态的情况,它假设模型的状态来源于某一有限集合:$S={S_1,S_2,...,S_M}$,且状态转移概率与时间无关,即对任意时刻$t_1,t_2$,任意状态$S_k,S_l\in S$,都有:$p(X_{t_1}=S_m\mid X_{t_1-1}=S_l)=p(X_{t_2}=S_m\mid X_{t_2-1}=S_l)$,所以模型参数可以由两部分构成$\lambda=\{\pi_0,P\}$:

$\pi_0$表示初始时刻状态的概率分布:

$$ \pi_0=[\pi_0^1,\pi_0^2,...,\pi_0^M]^T\\ s.t. \pi_0^i\geq 0,i=1,2,...,M \\ \sum_{i=1}^M\pi_0^i=1 $$


这里$\pi_o^i$表示初始时刻$t=0$时,$S_i$的概率

P表示状态转移概率:

$$ P=\begin{bmatrix} P_{1,1} & P_{1,2} & \cdots &P_{1,M} \\ P_{2,1} &P_{2,2} & \cdots & P_{2,M}\\ \vdots &\vdots & P_{i,j} & \vdots \\ P_{M,1} & P_{M,2} & \cdots & P_{M,M} \end{bmatrix}\\ s.t. \sum_{i=1}^MP_{i,j}=1,j=1,2,...,M $$

其中,$P_{i,j}=p(X_t=S_i\mid X_{t-1}=S_j),t=1,2,...$

所以,我们可以非常方便的得到任意时刻的状态分布:

$$ \pi_1=P\pi_0\\ \pi_2=P\pi_1=P^2\pi_0\\ \cdots\\ \pi_t=P^t\pi_0 $$

二.马尔可夫链的应用

上面介绍了最简单的马尔可夫链的定义,那么一个自然的问题就是马尔可夫链有什么用?

(1)一般来说,我们可以通过马尔可夫链来判断某个状态序列$P(X_0,X_1,...,X_M)$出现的概率,这一点在NLP中应用较多,比如语言模型(下一节会撕),它本质就是一个马尔可夫链;

(2)另外,我们可以用马尔可夫链来作预测,由于马尔科夫假设,未来的状态仅仅与现在的状态有关,而与过去的状态无关,所以预测下一个时刻状态只需要当前时刻的状态信息;

(3)求马尔可夫链的稳定态,这个初听会有些抽象,其实在某些情况下,马尔可夫链会收敛到某一个稳定的状态分布,即$t\rightarrow \infty时,P^t\pi_0\rightarrow稳定的分布$,这个性质非常有用,它可被应用于马尔科夫蒙特卡洛抽样(后面撕);

(4)由于马尔可夫链是一个生成模型,所以我们也可以用它来生成一些随机状态,比如在训练好的语言模型基础上生成一段文本(下一节会撕)

接下来,我们先对前两点应用作介绍和实现

联合概率计算

联合概率计算时,直观感觉需要一步一步的计算,其实不必,由于齐次假设,状态转移概率矩阵不会随着时间变化,所以联合概率完全可以并行计算,比如下面的马尔可夫链,可以拆开为A,B,C三部分同时计算,最后再合并相乘即可(后续HMM的前向后向算法也是一样的道理)

avatar

关于预测没啥好说的,直接通过概率转移矩阵求解下一个最有可能的状态即可

三.代码实现

In [1]:
"""
齐次时间、离散、有限状态、一阶马尔可夫链的实现,封装到ml_models.pgm
"""
import numpy as np


class SimpleMarkovModel(object):
    def __init__(self, status_num=None):
        # 初始状态向量
        self.pi = np.zeros(shape=(status_num, 1))
        # 状态转移概率矩阵
        self.P = np.zeros(shape=(status_num, status_num))

    def fit(self, x):
        """
        根据训练数据,统计计算初始状态向量以及状态转移概率矩阵
        :param x: x可以是单列表或者是列表的列表,比如[s1,s2,...,sn]或者[[s11,s12,...,s1m],[s21,s22,...,s2n],...],
                 计算初始状态向量的方式会有差异,单列表会统计所有所有状态作为初始状态分布,列表的列表会统计所有子列表开头
                 状态的分布
        :return:
        """
        if type(x[0]) == list:
            for clist in x:
                self.pi[clist[0]] += 1
                for cindex in range(0, len(clist) - 1):
                    self.P[clist[cindex + 1], clist[cindex]] += 1
        else:
            for index in range(0, len(x) - 1):
                self.pi[x[index]] += 1
                self.P[x[index + 1], x[index]] += 1
        # 归一化
        self.pi = self.pi / np.sum(self.pi)
        self.P = self.P / np.sum(self.P, axis=0)

    def predict_log_joint_prob(self, status_list):
        """
        计算联合概率的对数
        :param status_list:
        :return:
        """
        # 这里偷懒就不并行计算了...
        log_prob = np.log(self.pi[status_list[0], 0])
        for index in range(0, len(status_list) - 1):
            log_prob += np.log(self.P[status_list[index + 1], status_list[index]])
        return log_prob

    def predict_prob_distribution(self, time_steps=None, set_init_prob=None, set_prob_trans_matrix=None):
        """
        计算time_steps后的概率分布,允许通过set_init_prob和set_prob_trans_matrix设置初始概率分布和概率转移矩阵
        :param time_steps:
        :param set_init_prob:
        :param set_prob_trans_matrix:
        :return:
        """
        prob = self.pi if set_init_prob is None else set_init_prob
        trans_matrix = self.P if set_prob_trans_matrix is None else set_prob_trans_matrix
        for _ in range(0, time_steps):
            prob = trans_matrix.dot(prob)
        return prob

    def predict_next_step_prob_distribution(self, current_status=None):
        """
        预测下一时刻的状态分布
        :param current_status:
        :return:
        """
        return self.P[:, [current_status]]

    def predict_next_step_status(self, current_status=None):
        """
        预测下一个时刻最有可能的状态
        :param current_status:
        :return:
        """
        return np.argmax(self.predict_next_step_prob_distribution(current_status))

案例

这里收集了深圳宝安一个月(4月22日-5月21日)的天气情况,如下图,将天气分为三类,一种是晴天(0),一种是阴天(1),一种是雨天(2),所以状态空间$S=\{0,1,2\}$ avatar

In [2]:
#我们用其训练一个马尔科夫链
train_data=[2,1,2,1,0,0,0,0,0,0,0,1,1,2,2,1,1,1,0,0,0,0,1,0,1,1,1,1,1,1]
smm=SimpleMarkovModel(status_num=3)
smm.fit(train_data)

我们看看状态转移概率矩阵的情况,可以发现晴天转晴天,雨天转阴天,阴天转阴天的概率非常高,而雨天转晴天或者晴天转雨天的情况不会发生,这基本符合我们的认知

In [3]:
smm.P
Out[3]:
array([[0.75      , 0.23076923, 0.        ],
       [0.25      , 0.61538462, 0.75      ],
       [0.        , 0.15384615, 0.25      ]])

四.马尔科夫链的平稳态

接下来,我们看看以当前的初始状态再过3、5、7、10、20天后的概率分布情况

In [4]:
import pandas as pd
pd.DataFrame({"第3天":smm.predict_prob_distribution(3).reshape(-1).tolist(),
              "第5天":smm.predict_prob_distribution(5).reshape(-1).tolist(),
              "第7天":smm.predict_prob_distribution(7).reshape(-1).tolist(),
              "第10天":smm.predict_prob_distribution(10).reshape(-1).tolist(),
              "第20天":smm.predict_prob_distribution(20).reshape(-1).tolist()})
Out[4]:
第10天 第20天 第3天 第5天 第7天
0 0.433556 0.433734 0.426648 0.431260 0.432870
1 0.470002 0.469880 0.474763 0.471585 0.470475
2 0.096441 0.096386 0.098590 0.097155 0.096654

可以发现概率分布逐渐逼近了一个平稳态,这或许与我们的初始状态有关,让我们换不一样的初始状态看看

In [5]:
pd.DataFrame({"晴天":smm.predict_prob_distribution(20,set_init_prob=np.asarray([[1],[0],[0]])).reshape(-1).tolist(),
              "阴天":smm.predict_prob_distribution(20,set_init_prob=np.asarray([[0],[1],[0]])).reshape(-1).tolist(),
              "雨天":smm.predict_prob_distribution(20,set_init_prob=np.asarray([[0],[0],[1]])).reshape(-1).tolist(),
              "阴雨天":smm.predict_prob_distribution(20,set_init_prob=np.asarray([[0],[0.5],[0.5]])).reshape(-1).tolist(),
              "任意天气":smm.predict_prob_distribution(20,set_init_prob=np.asarray([[0.05],[0.2],[0.75]])).reshape(-1).tolist()})
Out[5]:
任意天气 晴天 阴天 阴雨天 雨天
0 0.433719 0.433749 0.433726 0.43372 0.433715
1 0.469891 0.469870 0.469886 0.46989 0.469893
2 0.096391 0.096381 0.096388 0.09639 0.096392

概率分布基本全都一样,也就是说无论当前天气情况如何,对以后一段时间后的天气状况没多大影响,则说明马尔科夫链具有无记忆性的特点,下面对马尔科夫链收敛到平稳态做一个说明,看看它是怎么来的:

(1)首先,我们可以对$P$做特征分解,分解后其对应的特征值为$\lambda_1,\lambda_2,...,\lambda_M$,它对应的特征向量为$\beta_1,\beta_2,...,\beta_M$,所以有如下关系:

$$ P\beta_i=\lambda_i\beta_i\\ P^2\beta_i=P\lambda_i\beta_i=\lambda_iP\beta_i=\lambda_i^2\beta_i\\ ......\\ P^n\beta_i=\lambda_i^n\beta_i\\ i=1,2,...,M $$

(2)若$\beta_1,\beta_2,...\beta_M$线性无关(绝大部分情况也是如此),则任意一个初始分布$\pi_0$均可以由$\beta_1,\beta_2,...,\beta_M$唯一线性表示:

$$ \pi_0=c_1\beta_1+c_2\beta_2+\cdots+c_M\beta_M $$

(3)对于任意步长$n$有:

$$ P^n\pi_0=P^n(c_1\beta_1+c_2\beta_2+\cdots+c_M\beta_M)\\ =c_1\lambda_1^n\beta_1+c_2\lambda_2^n\beta_2+\cdots+c_M\lambda_M^n\beta_M $$

(4)由于$abs(\lambda_i)\leq 1$(证明后面补充),所以当$n\rightarrow \infty$时,只有特征值为1的那一项才会保留,假设解为单根,为$c_i\beta_i$,则:

(4.1)这里$\beta_i$仅与$P$相关,与$\pi_0$无关

(4.2)因为$P^n\pi_0=c_i\beta_i$,所以必然有$c_i\beta_i$各分量之和为1($P^n\pi_0始终满足该条件$),所以对于任意$\pi_0$都有$c_i=\frac{1}{\mid\beta_i\mid}$

PS:随机矩阵是指对每行或每列求和为1的非负矩阵;另外最大特征值为1的存在性很好证明,因为$P-I$线性相关,所以1必然为$P$的一个特征值

$abs(\lambda_i)\leq 1$的证明

这个其实反证一下就好了(特别感谢小慧老师指导),(1)首先,对于$n\rightarrow \infty$,$P^n\pi_0$向量的各分量之和一定为1;(2)$P^n\pi_0=c_1\lambda_1^n\beta_1+c_2\lambda_2^n\beta_2+\cdots+c_M\lambda_M^n\beta_M$中若有$abs(\lambda_i)>1$,则必有$abs(P^n\pi_0)\rightarrow \infty$,这与(1)矛盾了

In [ ]: