import numpy as np
from scipy.linalg import lstsq
from copy import deepcopy
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.datasets import load_boston
from sklearn.ensemble import BaggingRegressor as skBaggingRegressor
def r2_score(y_true, y_pred):
numerator = np.sum((y_true - y_pred) ** 2)
denominator = np.sum((y_true - np.mean(y_true)) ** 2)
return 1 - numerator / denominator
class LinearRegression(BaseEstimator, RegressorMixin):
def fit(self, X, y):
X_train = np.hstack((np.ones((X.shape[0], 1)), X))
coef, _, _, _ = lstsq(X_train, y)
self.coef_ = coef[1:]
self.intercept_ = coef[0]
return self
def predict(self, X):
y_pred = np.dot(X, self.coef_) + self.intercept_
return y_pred
class BaggingRegressor():
def __init__(self, base_estimator, n_estimators, oob_score, random_state):
self.base_estimator = base_estimator
self.n_estimators = n_estimators
self.oob_score = oob_score
self.random_state = random_state
def fit(self, X, y):
MAX_INT = np.iinfo(np.int32).max
rng = np.random.RandomState(self.random_state)
self._seeds = rng.randint(MAX_INT, size=self.n_estimators)
self.estimators_ = []
self.estimators_samples_ = []
for i in range(self.n_estimators):
est = deepcopy(self.base_estimator)
rng = np.random.RandomState(self._seeds[i])
sample_indices = rng.randint(0, X.shape[0], X.shape[0])
self.estimators_samples_.append(sample_indices)
est.fit(X[sample_indices], y[sample_indices])
self.estimators_.append(est)
if self.oob_score:
self._set_oob_score(X, y)
return self
def _set_oob_score(self, X, y):
predictions = np.zeros(X.shape[0])
n_predictions = np.zeros(X.shape[0])
for i in range(self.n_estimators):
mask = np.ones(X.shape[0], dtype=bool)
mask[self.estimators_samples_[i]] = False
predictions[mask] += self.estimators_[i].predict(X[mask])
n_predictions[mask] += 1
predictions /= n_predictions
self.oob_prediction_ = predictions
self.oob_score_ = r2_score(y, predictions)
def predict(self, X):
pred = np.zeros(X.shape[0])
for est in self.estimators_:
pred += est.predict(X)
pred /= self.n_estimators
return pred
X, y = load_boston(return_X_y=True)
clf1 = BaggingRegressor(base_estimator=LinearRegression(),
n_estimators=100, oob_score=True, random_state=0).fit(X, y)
clf2 = skBaggingRegressor(base_estimator=LinearRegression(),
n_estimators=100, oob_score=True, random_state=0).fit(X, y)
assert np.allclose(clf1._seeds, clf2._seeds)
assert np.array_equal(clf1.estimators_samples_, clf2.estimators_samples_)
for i in range(clf1.n_estimators):
assert np.allclose(clf1.estimators_[i].coef_, clf2.estimators_[i].coef_)
assert np.allclose(clf1.estimators_[i].intercept_, clf2.estimators_[i].intercept_)
assert np.allclose(clf1.oob_prediction_, clf2.oob_prediction_)
assert np.allclose(clf1.oob_score_, clf2.oob_score_)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.allclose(pred1, pred2)