In [1]:
import numpy as np
import os
os.chdir('../')
import matplotlib.pyplot as plt
%matplotlib inline


### 一.最大熵原理¶

$$H(p)=-\sum_{i}p_i log p_i$$

In [2]:
p=np.linspace(0.1,0.9,90)

In [3]:
def entropy(p):
return -np.log(p)*p-np.log(1-p)*(1-p)

In [4]:
plt.plot(p,entropy(p))

Out[4]:
[<matplotlib.lines.Line2D at 0x245a3d6d278>]

$$\max_{p} -\sum_{i=1}^4 p_ilogp_i \\ s.t. p_1+p_2=0.4\\ p_i\geq 0,i=1,2,3,4\\ \sum_i p_i=1$$

### 二.最大熵模型¶

$$H(P)=-\sum_{x,y}\tilde{P}(x)P(y|x)logP(y|x)$$

$$\sum_{x,y}\tilde{P}(x)P(y|x)f(x,y)=\sum_{x,y}\tilde{P}(x,y)f(x,y)$$

$$f(x,y)=\left\{\begin{matrix} 1 & x与y满足某一事实\\ 0 & 否则 \end{matrix}\right.$$

$$\max_P -\sum_{x,y}\tilde{P}(x)P(y|x)logP(y|x) \\ s.t. E_P(f_i)=E_{\tilde{P}}(f_i),i=1,2,...,n\\ \sum_y P(y|x)=1$$

$$\min_P \sum_{x,y}\tilde{P}(x)P(y|x)logP(y|x) \\ s.t. E_P(f_i)-E_{\tilde{P}}(f_i)=0,i=1,2,...,n\\ \sum_y P(y|x)-1=0$$

$$L(P,w)=-H(P)+w_0(1-\sum_yP(y|x)+\sum_{i=1}^nw_i(E_{\tilde{P}}(f_i))-E_P(f_i))$$

$$\min_P\max_w L(P,w)$$

$$\max_w\min_P L(P,w)$$

$$\frac{\partial L(P,w)}{\partial P(y|x)}=\sum_{x,y}\tilde{P}(x)(logP(y|x)+1)-\sum_yw_0+\sum_{i=1}^n\sum_{x,y}\tilde{P}(x)f_i(x,y)w_i\\ =\sum_{x,y}\tilde{P}(x)(logP(y|x)+1-w_0-\sum_{i=1}^nw_if_i(x,y))\\ =0$$

$$P_w(y|x)=exp(\sum_{i=1}^nw_if_i(x,y)+w_0-1)\\ =\frac{exp(\sum_{i=1}^nw_if_i(x,y))}{exp(1-w_0)}\\ =\frac{exp(\sum_{i=1}^nw_if_i(x,y))}{\sum_y exp(\sum_{i=1}^nw_if_i(x,y))}$$

$$w^*=arg\max_w L(P_w,w)$$

$$L(P_w,w)=\sum_{x,y}\tilde{P}(x)P_w(y|x)logP_w(y|x)+\sum_{i=1}^nw_i\sum_{x,y}(\tilde{P}(x,y)f_i(x,y)-\tilde{P}(x)P_w(y|x)f_i(x,y))\\ =\sum_{x,y}\tilde{P}(x,y)\sum_{i=1}^nw_if_i(x,y)+\sum_{x,y}\tilde{P}(x)P_w(y|x)(logP_w(y|x)-\sum_{i=1}^nw_if_i(x,y))\\ =\sum_{x,y}\tilde{P}(x,y)\sum_{i=1}^nw_if_i(x,y)-\sum_{x,y}\tilde{P}(x)P_w(y|x)log(\sum_{y^{'}}exp(\sum_{i=1}^nw_if_i(x,y^{'})))\\ =\sum_{x,y}\tilde{P}(x,y)\sum_{i=1}^nw_if_i(x,y)-\sum_{x}\tilde{P}(x)log(\sum_{y^{'}}exp(\sum_{i=1}^nw_if_i(x,y^{'})))\\ =\sum_{x,y}\tilde{P}(x,y)w^Tf(x,y)-\sum_{x}\tilde{P}(x)log(\sum_{y^{'}}exp(w^Tf(x,y^{'})))$$

$$\frac{\partial L(P_w,w)}{\partial w}=\sum_{x,y}\tilde{P}(x,y)f(x,y)-\sum_x\tilde{P}(x)\frac{exp(w^Tf(x,y))f(x,y)}{\sum_{y^{'}}exp(w^Tf(x,y^{'}))}$$

$$w=w+\eta\frac{\partial L(P_w,w)}{\partial w}$$

### 三.对特征函数的进一步理解¶

$x_1:$一打火柴 $y_1:$量词
$x_2:$三打啤酒 $y_2:$量词
$x_3:$打电话 $y_3:$ 动词
$x_4:$打篮球 $y_4:$ 动词

$$f_1(x,y)=\left\{\begin{matrix} 1 & "打"前是数字\\ 0 & 否则 \end{matrix}\right.$$

$$f_2(x,y)=\left\{\begin{matrix} 1 & "打"后是名词，且前面无数字\\ 0 & 否则 \end{matrix}\right.$$

$$f(x,y)=\left\{\begin{matrix} 1 & "打"前是"一","打"后是"火柴"\\ 0 & 否则 \end{matrix}\right.$$

$$f(x,y)=\left\{\begin{matrix} 1 & "打"前是"三","打"后是"啤酒"\\ 0 & 否则 \end{matrix}\right.$$

#### 一种简单的特征函数设计¶

（1）离散型：
$$f(x,y)=\left\{\begin{matrix} 1 & x_i=某值,y=某类\\ 0 & 否则 \end{matrix}\right.$$

（2）连续型：
$$f(x,y)=\left\{\begin{matrix} 1 & 某值1\leq x_i< 某值2,y=某类\\ 0 & 否则 \end{matrix}\right.$$

### 四.代码实现¶

In [5]:
# 测试
from sklearn import datasets
from sklearn import model_selection
from sklearn.metrics import f1_score

data = iris['data']
target = iris['target']
X_train, X_test, y_train, y_test = model_selection.train_test_split(data, target, test_size=0.2,random_state=0)


In [6]:
class DataBinWrapper(object):
def __init__(self, max_bins=10):
# 分段数
self.max_bins = max_bins
# 记录x各个特征的分段区间
self.XrangeMap = None

def fit(self, x):
n_sample, n_feature = x.shape
# 构建分段数据
self.XrangeMap = [[] for _ in range(0, n_feature)]
for index in range(0, n_feature):
tmp = x[:, index]
for percent in range(1, self.max_bins):
percent_value = np.percentile(tmp, (1.0 * percent / self.max_bins) * 100.0 // 1)
self.XrangeMap[index].append(percent_value)

def transform(self, x):
"""
抽取x_bin_index
:param x:
:return:
"""
if x.ndim == 1:
return np.asarray([np.digitize(x[i], self.XrangeMap[i]) for i in range(0, x.size)])
else:
return np.asarray([np.digitize(x[:, i], self.XrangeMap[i]) for i in range(0, x.shape[1])]).T

In [7]:
data_bin_wrapper=DataBinWrapper(max_bins=10)
data_bin_wrapper.fit(X_train)
X_train=data_bin_wrapper.transform(X_train)
X_test=data_bin_wrapper.transform(X_test)

In [8]:
X_train[:5,:]

Out[8]:
array([[7, 6, 8, 7],
[3, 5, 5, 6],
[2, 8, 2, 2],
[6, 5, 6, 7],
[7, 2, 8, 8]], dtype=int64)
In [9]:
X_test[:5,:]

Out[9]:
array([[5, 2, 7, 9],
[5, 0, 4, 3],
[3, 9, 1, 2],
[9, 3, 9, 7],
[1, 8, 2, 2]], dtype=int64)

In [10]:
class SimpleFeatureFunction(object):
def __init__(self):
"""
记录特征函数
{
(x_index,x_value,y_index)
}
"""
self.feature_funcs = set()

# 构建特征函数
def build_feature_funcs(self, X, y):
n_sample, _ = X.shape
for index in range(0, n_sample):
x = X[index, :].tolist()
for feature_index in range(0, len(x)):

# 获取特征函数总数
def get_feature_funcs_num(self):
return len(self.feature_funcs)

# 分别命中了那几个特征函数
def match_feature_funcs_indices(self, x, y):
match_indices = []
index = 0
for feature_index, feature_value, feature_y in self.feature_funcs:
if feature_y == y and x[feature_index] == feature_value:
match_indices.append(index)
index += 1
return match_indices


In [11]:
def softmax(x):
if x.ndim == 1:
return np.exp(x) / np.exp(x).sum()
else:
return np.exp(x) / np.exp(x).sum(axis=1, keepdims=True)


In [12]:
from ml_models import utils
class MaxEnt(object):
def __init__(self, feature_func, epochs=5, eta=0.01):
self.feature_func = feature_func
self.epochs = epochs
self.eta = eta

self.class_num = None
"""
记录联合概率分布:
{
(x_0,x_1,...,x_p,y_index):p
}
"""
self.Pxy = {}
"""
记录边缘概率分布:
{
(x_0,x_1,...,x_p):p
}
"""
self.Px = {}

"""
w[i]-->feature_func[i]
"""
self.w = None

def init_params(self, X, y):
"""
初始化相应的数据
:return:
"""
n_sample, n_feature = X.shape
self.class_num = np.max(y) + 1

# 初始化联合概率分布、边缘概率分布、特征函数
for index in range(0, n_sample):
range_indices = X[index, :].tolist()

if self.Px.get(tuple(range_indices)) is None:
self.Px[tuple(range_indices)] = 1
else:
self.Px[tuple(range_indices)] += 1

if self.Pxy.get(tuple(range_indices + [y[index]])) is None:
self.Pxy[tuple(range_indices + [y[index]])] = 1
else:
self.Pxy[tuple(range_indices + [y[index]])] = 1

for key, value in self.Pxy.items():
self.Pxy[key] = 1.0 * self.Pxy[key] / n_sample
for key, value in self.Px.items():
self.Px[key] = 1.0 * self.Px[key] / n_sample

# 初始化参数权重
self.w = np.zeros(self.feature_func.get_feature_funcs_num())

def _sum_exp_w_on_all_y(self, x):
"""
sum_y exp(self._sum_w_on_feature_funcs(x))
:param x:
:return:
"""
sum_w = 0
for y in range(0, self.class_num):
tmp_w = self._sum_exp_w_on_y(x, y)
sum_w += np.exp(tmp_w)
return sum_w

def _sum_exp_w_on_y(self, x, y):
tmp_w = 0
match_feature_func_indices = self.feature_func.match_feature_funcs_indices(x, y)
for match_feature_func_index in match_feature_func_indices:
tmp_w += self.w[match_feature_func_index]
return tmp_w

def fit(self, X, y):
self.eta = max(1.0 / np.sqrt(X.shape[0]), self.eta)
self.init_params(X, y)
x_y = np.c_[X, y]
for epoch in range(self.epochs):
count = 0
np.random.shuffle(x_y)
for index in range(x_y.shape[0]):
count += 1
x_point = x_y[index, :-1]
y_point = x_y[index, -1:][0]
# 获取联合概率分布
p_xy = self.Pxy.get(tuple(x_point.tolist() + [y_point]))
# 获取边缘概率分布
p_x = self.Px.get(tuple(x_point))
# 更新w
dw = np.zeros(shape=self.w.shape)
match_feature_func_indices = self.feature_func.match_feature_funcs_indices(x_point, y_point)
if len(match_feature_func_indices) == 0:
continue
if p_xy is not None:
for match_feature_func_index in match_feature_func_indices:
dw[match_feature_func_index] = p_xy
if p_x is not None:
sum_w = self._sum_exp_w_on_all_y(x_point)
for match_feature_func_index in match_feature_func_indices:
dw[match_feature_func_index] -= p_x * np.exp(self._sum_exp_w_on_y(x_point, y_point)) / (
1e-7 + sum_w)
# 更新
self.w += self.eta * dw
# 打印训练进度
if count % (X.shape[0] // 4) == 0:
print("processing:\tepoch:" + str(epoch + 1) + "/" + str(self.epochs) + ",percent:" + str(
count) + "/" + str(X.shape[0]))

def predict_proba(self, x):
"""
预测为y的概率分布
:param x:
:return:
"""
y = []
for x_point in x:
y_tmp = []
for y_index in range(0, self.class_num):
match_feature_func_indices = self.feature_func.match_feature_funcs_indices(x_point, y_index)
tmp = 0
for match_feature_func_index in match_feature_func_indices:
tmp += self.w[match_feature_func_index]
y_tmp.append(tmp)
y.append(y_tmp)
return utils.softmax(np.asarray(y))

def predict(self, x):
return np.argmax(self.predict_proba(x), axis=1)

In [13]:
# 构建特征函数类
feature_func=SimpleFeatureFunction()
feature_func.build_feature_funcs(X_train,y_train)

maxEnt = MaxEnt(feature_func=feature_func)
maxEnt.fit(X_train, y_train)
y = maxEnt.predict(X_test)

print('f1:', f1_score(y_test, y, average='macro'))

processing:	epoch:1/5,percent:30/120
processing:	epoch:1/5,percent:60/120
processing:	epoch:1/5,percent:90/120
processing:	epoch:1/5,percent:120/120
processing:	epoch:2/5,percent:30/120
processing:	epoch:2/5,percent:60/120
processing:	epoch:2/5,percent:90/120
processing:	epoch:2/5,percent:120/120
processing:	epoch:3/5,percent:30/120
processing:	epoch:3/5,percent:60/120
processing:	epoch:3/5,percent:90/120
processing:	epoch:3/5,percent:120/120
processing:	epoch:4/5,percent:30/120
processing:	epoch:4/5,percent:60/120
processing:	epoch:4/5,percent:90/120
processing:	epoch:4/5,percent:120/120
processing:	epoch:5/5,percent:30/120
processing:	epoch:5/5,percent:60/120
processing:	epoch:5/5,percent:90/120
processing:	epoch:5/5,percent:120/120
f1: 0.9295631904327557


$$f(x,y)=\left\{\begin{matrix} 1 & x_i=某值,x_j=某值,y=某类\\ 0 & 否则 \end{matrix}\right.$$

In [14]:
class UserDefineFeatureFunction(object):
def __init__(self):
"""
记录特征函数
{
(x_index1,x_value1,x_index2,x_value2,y_index)
}
"""
self.feature_funcs = set()

# 构建特征函数
def build_feature_funcs(self, X, y):
n_sample, _ = X.shape
for index in range(0, n_sample):
x = X[index, :].tolist()
for feature_index in range(0, len(x)):
for new_feature_index in range(0,len(x)):
if feature_index!=new_feature_index:

# 获取特征函数总数
def get_feature_funcs_num(self):
return len(self.feature_funcs)

# 分别命中了那几个特征函数
def match_feature_funcs_indices(self, x, y):
match_indices = []
index = 0
for item in self.feature_funcs:
if len(item)==5:
feature_index1, feature_value1,feature_index2,feature_value2, feature_y=item
if feature_y == y and x[feature_index1] == feature_value1 and x[feature_index2]==feature_value2:
match_indices.append(index)
else:
feature_index1, feature_value1, feature_y=item
if feature_y == y and x[feature_index1] == feature_value1:
match_indices.append(index)
index += 1
return match_indices

In [15]:
# 检验
feature_func=UserDefineFeatureFunction()
feature_func.build_feature_funcs(X_train,y_train)

maxEnt = MaxEnt(feature_func=feature_func)
maxEnt.fit(X_train, y_train)
y = maxEnt.predict(X_test)

print('f1:', f1_score(y_test, y, average='macro'))

processing:	epoch:1/5,percent:30/120
processing:	epoch:1/5,percent:60/120
processing:	epoch:1/5,percent:90/120
processing:	epoch:1/5,percent:120/120
processing:	epoch:2/5,percent:30/120
processing:	epoch:2/5,percent:60/120
processing:	epoch:2/5,percent:90/120
processing:	epoch:2/5,percent:120/120
processing:	epoch:3/5,percent:30/120
processing:	epoch:3/5,percent:60/120
processing:	epoch:3/5,percent:90/120
processing:	epoch:3/5,percent:120/120
processing:	epoch:4/5,percent:30/120
processing:	epoch:4/5,percent:60/120
processing:	epoch:4/5,percent:90/120
processing:	epoch:4/5,percent:120/120
processing:	epoch:5/5,percent:30/120
processing:	epoch:5/5,percent:60/120
processing:	epoch:5/5,percent:90/120
processing:	epoch:5/5,percent:120/120
f1: 0.957351290684624


In [ ]: