In [1]:
import numpy as np
import os
os.chdir('../')
from ml_models import utils
import matplotlib.pyplot as plt
%matplotlib inline


一.简介¶

$$\delta(t)=\frac{1}{1+e^{-t}}$$ 它的函数形状如下：

In [2]:
t=np.arange(-8,8,0.5)
d_t=1/(1+np.exp(-t))
plt.plot(t,d_t)

Out[2]:
[<matplotlib.lines.Line2D at 0x233d3c47a58>]

$$f(x)=\delta(w^Tx^*)=\frac{1}{1+e^{-(w^Tx^*)}}$$

$Sigmoid$函数决定了模型的输出在$(0,1)$区间，所以逻辑回归模型可以用作区间在$(0,1)$的回归任务，也可以用作$\{0,1\}$的二分类任务；同样，由于模型的输出在$(0,1)$区间，所以逻辑回归模型的输出也可以看作这样的“概率”模型：
$$P(y=1\mid x)=f(x)\\ P(y=0\mid x)=1-f(x)$$

$$L(w)=-\sum_{j=1}^n y_j log(f(x_j))+(1-y_j)log(1-f(x_j))$$

$$\frac{\partial L}{\partial w}=-\sum_{i=1}^n (y_i-f(x_i))x_i^*$$

$$w:=w-\eta \frac{\partial L}{\partial w}$$

二.代码实现¶

In [3]:
class LogisticRegression(object):
def __init__(self, fit_intercept=True, solver='sgd', if_standard=True, l1_ratio=None, l2_ratio=None, epochs=10,
eta=None, batch_size=16):

self.w = None
self.fit_intercept = fit_intercept
self.solver = solver
self.if_standard = if_standard
if if_standard:
self.feature_mean = None
self.feature_std = None
self.epochs = epochs
self.eta = eta
self.batch_size = batch_size
self.l1_ratio = l1_ratio
self.l2_ratio = l2_ratio
# 注册sign函数
self.sign_func = np.vectorize(utils.sign)
# 记录losses
self.losses = []

def init_params(self, n_features):
"""
初始化参数
:return:
"""
self.w = np.random.random(size=(n_features, 1))

def _fit_closed_form_solution(self, x, y):
"""
直接求闭式解
:param x:
:param y:
:return:
"""
self._fit_sgd(x, y)

def _fit_sgd(self, x, y):
"""
随机梯度下降求解
:param x:
:param y:
:return:
"""
x_y = np.c_[x, y]
count = 0
for _ in range(self.epochs):
np.random.shuffle(x_y)
for index in range(x_y.shape[0] // self.batch_size):
count += 1
batch_x_y = x_y[self.batch_size * index:self.batch_size * (index + 1)]
batch_x = batch_x_y[:, :-1]
batch_y = batch_x_y[:, -1:]

dw = -1 * (batch_y - utils.sigmoid(batch_x.dot(self.w))).T.dot(batch_x) / self.batch_size
dw = dw.T

# 添加l1和l2的部分
dw_reg = np.zeros(shape=(x.shape[1] - 1, 1))
if self.l1_ratio is not None:
dw_reg += self.l1_ratio * self.sign_func(self.w[:-1]) / self.batch_size
if self.l2_ratio is not None:
dw_reg += 2 * self.l2_ratio * self.w[:-1] / self.batch_size
dw_reg = np.concatenate([dw_reg, np.asarray([[0]])], axis=0)

dw += dw_reg
self.w = self.w - self.eta * dw

# 计算losses
cost = -1 * np.sum(
np.multiply(y, np.log(utils.sigmoid(x.dot(self.w)))) + np.multiply(1 - y, np.log(
1 - utils.sigmoid(x.dot(self.w)))))
self.losses.append(cost)

def fit(self, x, y):
"""
:param x: ndarray格式数据: m x n
:param y: ndarray格式数据: m x 1
:return:
"""
y = y.reshape(x.shape[0], 1)
# 是否归一化feature
if self.if_standard:
self.feature_mean = np.mean(x, axis=0)
self.feature_std = np.std(x, axis=0) + 1e-8
x = (x - self.feature_mean) / self.feature_std
# 是否训练bias
if self.fit_intercept:
x = np.c_[x, np.ones_like(y)]
# 初始化参数
self.init_params(x.shape[1])
# 更新eta
if self.eta is None:
self.eta = self.batch_size / np.sqrt(x.shape[0])

if self.solver == 'closed_form':
self._fit_closed_form_solution(x, y)
elif self.solver == 'sgd':
self._fit_sgd(x, y)

def get_params(self):
"""
输出原始的系数
:return: w,b
"""
if self.fit_intercept:
w = self.w[:-1]
b = self.w[-1]
else:
w = self.w
b = 0
if self.if_standard:
w = w / self.feature_std.reshape(-1, 1)
b = b - w.T.dot(self.feature_mean.reshape(-1, 1))
return w.reshape(-1), b

def predict_proba(self, x):
"""
预测为y=1的概率
:param x:ndarray格式数据: m x n
:return: m x 1
"""
if self.if_standard:
x = (x - self.feature_mean) / self.feature_std
if self.fit_intercept:
x = np.c_[x, np.ones(x.shape[0])]
return utils.sigmoid(x.dot(self.w))

def predict(self, x):
"""
预测类别，默认大于0.5的为1，小于0.5的为0
:param x:
:return:
"""
proba = self.predict_proba(x)
return (proba > 0.5).astype(int)

def plot_decision_boundary(self, x, y):
"""
绘制前两个维度的决策边界
:param x:
:param y:
:return:
"""
y = y.reshape(-1)
weights, bias = self.get_params()
w1 = weights[0]
w2 = weights[1]
bias = bias[0][0]
x1 = np.arange(np.min(x), np.max(x), 0.1)
x2 = -w1 / w2 * x1 - bias / w2
plt.scatter(x[:, 0], x[:, 1], c=y, s=50)
plt.plot(x1, x2, 'r')
plt.show()

def plot_losses(self):
plt.plot(range(0, len(self.losses)), self.losses)
plt.show()


三.校验¶

In [4]:
from sklearn.datasets import make_classification
data,target=make_classification(n_samples=100, n_features=2,n_classes=2,n_informative=1,n_redundant=0,n_repeated=0,n_clusters_per_class=1)

In [5]:
data.shape,target.shape

Out[5]:
((100, 2), (100,))
In [6]:
plt.scatter(data[:, 0], data[:, 1], c=target,s=50)

Out[6]:
<matplotlib.collections.PathCollection at 0x233d4c86748>

训练模型¶

In [7]:
lr = LogisticRegression(l1_ratio=0.01,l2_ratio=0.01)
lr.fit(data, target)


查看loss值变化¶

In [8]:
lr.plot_losses()


绘制决策边界：¶

In [9]:
lr.plot_decision_boundary(data,target)

In [10]:
#计算F1
from sklearn.metrics import f1_score
f1_score(target,lr.predict(data))

Out[10]:
0.96

与sklearn对比¶

In [11]:
from sklearn.linear_model import LogisticRegression

In [12]:
lr = LogisticRegression()
lr.fit(data, target)

D:\app\Anaconda3\lib\site-packages\sklearn\linear_model\logistic.py:432: FutureWarning: Default solver will be changed to 'lbfgs' in 0.22. Specify a solver to silence this warning.
FutureWarning)

Out[12]:
LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
intercept_scaling=1, l1_ratio=None, max_iter=100,
multi_class='warn', n_jobs=None, penalty='l2',
random_state=None, solver='warn', tol=0.0001, verbose=0,
warm_start=False)
In [13]:
w1=lr.coef_[0][0]
w2=lr.coef_[0][1]
bias=lr.intercept_[0]
w1,w2,bias

Out[13]:
(3.119650945418208, 0.38515595805512637, -0.478776183999758)
In [14]:
x1=np.arange(np.min(data),np.max(data),0.1)
x2=-w1/w2*x1-bias/w2

In [15]:
plt.scatter(data[:, 0], data[:, 1], c=target,s=50)
plt.plot(x1,x2,'r')

Out[15]:
[<matplotlib.lines.Line2D at 0x233d5f84cf8>]
In [16]:
#计算F1
f1_score(target,lr.predict(data))

Out[16]:
0.96

四.问题讨论：损失函数为何不用mse?¶

$$L(w)=\frac{1}{2}\sum {i=1}^n(y^i-f(x^i))^2$$ 则有： $$\frac{\partial L}{\partial w}=\sum_{i=1}^n(f(x^i)-y^i)f(x^i)(1-f(x^i))x^i$$ 我们简单看两个极端的情况： （1）$y^i=0,f(x^i)=1$时，$\frac{\partial L}{\partial w}=0$； （2）$y^i=1,f(x^i)=0$时，$\frac{\partial L}{\partial w}=0$ 接下来，我们绘图对比一下两者梯度变化的情况，假设在$y=1,x\in(-10,10),w=1,b=0$的情况下

In [17]:
y=1
x0=np.arange(-10,10,0.5)
#交叉熵
x1=np.multiply(utils.sigmoid(x0)-y,x0)

In [18]:
#mse
x2=np.multiply(utils.sigmoid(x0)-y,utils.sigmoid(x0))
x2=np.multiply(x2,1-utils.sigmoid(x0))
x2=np.multiply(x2,x0)

In [19]:
plt.plot(x0,x1)
plt.plot(x0,x2)

Out[19]:
[<matplotlib.lines.Line2D at 0x233d6046048>]

In [ ]: