import numpy as np
from scipy.special import expit
from scipy.optimize import minimize
from copy import deepcopy
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.datasets import load_iris
from sklearn.ensemble import BaggingClassifier as skBaggingClassifier
#from sklearn.linear_model import LogisticRegression
def accuracy_score(y_true, y_pred):
return np.mean(y_true == y_pred)
class LogisticRegression(BaseEstimator, ClassifierMixin):
def __init__(self, C=1.0):
self.C = C
def _encode(self, y):
classes = np.unique(y)
y_train = np.full((y.shape[0], len(classes)), -1)
for i, c in enumerate(classes):
y_train[y == c, i] = 1
if len(classes) == 2:
y_train = y_train[:, 1].reshape(-1, 1)
return classes, y_train
@staticmethod
def _cost_grad(w, X, y, alpha):
def _log_logistic(x):
if x > 0:
return -np.log(1 + np.exp(-x))
else:
return x - np.log(1 + np.exp(x))
yz = y * (np.dot(X, w[:-1]) + w[-1])
cost = -np.sum(np.vectorize(_log_logistic)(yz)) + 0.5 * alpha * np.dot(w[:-1], w[:-1])
grad = np.zeros(len(w))
t = (expit(yz) - 1) * y
grad[:-1] = np.dot(X.T, t) + alpha * w[:-1]
grad[-1] = np.sum(t)
return cost, grad
def _solve_lbfgs(self, X, y):
result = np.zeros((y.shape[1], X.shape[1] + 1))
for i in range(y.shape[1]):
cur_y = y[:, i]
w0 = np.zeros(X.shape[1] + 1)
res = minimize(fun=self._cost_grad, jac=True, x0=w0,
args=(X, cur_y, 1 / self.C), method='L-BFGS-B')
result[i] = res.x
return result[:, :-1], result[:, -1]
def fit(self, X, y):
self.classes_, y_train = self._encode(y)
self.coef_, self.intercept_ = self._solve_lbfgs(X, y_train)
return self
def decision_function(self, X):
scores = np.dot(X, self.coef_.T) + self.intercept_
if scores.shape[1] == 1:
return scores.ravel()
else:
return scores
def predict(self, X):
scores = self.decision_function(X)
if len(scores.shape) == 1:
indices = (scores > 0).astype(int)
else:
indices = np.argmax(scores, axis=1)
return self.classes_[indices]
def predict_proba(self, X):
scores = self.decision_function(X)
prob = expit(scores)
if len(scores.shape) == 1:
prob = np.vstack((1 - prob, prob)).T
else:
prob /= np.sum(prob, axis=1)[:, np.newaxis]
return prob
class BaggingClassifier():
def __init__(self, base_estimator, n_estimators, oob_score, random_state):
self.base_estimator = base_estimator
self.n_estimators = n_estimators
self.oob_score = oob_score
self.random_state = random_state
def fit(self, X, y):
self.classes_, y_train = np.unique(y, return_inverse=True)
self.n_classes_ = len(self.classes_)
MAX_INT = np.iinfo(np.int32).max
rng = np.random.RandomState(self.random_state)
self._seeds = rng.randint(MAX_INT, size=self.n_estimators)
self.estimators_ = []
self.estimators_samples_ = []
for i in range(self.n_estimators):
est = deepcopy(self.base_estimator)
rng = np.random.RandomState(self._seeds[i])
sample_indices = rng.randint(0, X.shape[0], X.shape[0])
self.estimators_samples_.append(sample_indices)
est.fit(X[sample_indices], y_train[sample_indices])
self.estimators_.append(est)
if self.oob_score:
self._set_oob_score(X, y_train)
return self
def _set_oob_score(self, X, y):
predictions = np.zeros((X.shape[0], self.n_classes_))
for i in range(self.n_estimators):
mask = np.ones(X.shape[0], dtype=bool)
mask[self.estimators_samples_[i]] = False
predictions[mask] += self.estimators_[i].predict_proba(X[mask])
self.oob_decision_function_ = predictions / np.sum(predictions, axis=1)[:, np.newaxis]
self.oob_score_ = accuracy_score(y, np.argmax(predictions, axis=1))
def predict_proba(self, X):
proba = np.zeros((X.shape[0], self.n_classes_))
for i in range(self.n_estimators):
proba += self.estimators_[i].predict_proba(X)
proba /= self.n_estimators
return proba
def predict(self, X):
proba = self.predict_proba(X)
return self.classes_[np.argmax(proba, axis=1)]
X, y = load_iris(return_X_y=True)
clf1 = BaggingClassifier(base_estimator=LogisticRegression(),
n_estimators=100, oob_score=True, random_state=0).fit(X, y)
clf2 = skBaggingClassifier(base_estimator=LogisticRegression(),
n_estimators=100, oob_score=True, random_state=0).fit(X, y)
assert np.allclose(clf1._seeds, clf2._seeds)
assert np.array_equal(clf1.estimators_samples_, clf2.estimators_samples_)
for i in range(clf1.n_estimators):
assert np.allclose(clf1.estimators_[i].coef_, clf2.estimators_[i].coef_)
assert np.allclose(clf1.estimators_[i].intercept_, clf2.estimators_[i].intercept_)
assert np.allclose(clf1.oob_decision_function_, clf2.oob_decision_function_)
assert np.allclose(clf1.oob_score_, clf2.oob_score_)
prob1 = clf1.predict_proba(X)
prob2 = clf2.predict_proba(X)
assert np.allclose(prob1, prob2)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.array_equal(pred1, pred2)