import numpy as np
from sklearn.datasets import load_boston
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import SGDRegressor as skSGDRegressor
def _loss(x, y, coef, intercept):
p = np.dot(x, coef) + intercept
return 0.5 * (p - y) * (p - y)
def _grad(x, y, coef, intercept):
p = np.dot(x, coef) + intercept
# clip gradient (consistent with scikit-learn)
dloss = np.clip(p - y, -1e12, 1e12)
coef_grad = dloss * x
intercept_grad = dloss
return coef_grad, intercept_grad
class SGDRegressor():
def __init__(self, penalty="l2", alpha=0.0001, max_iter=1000, tol=1e-3,
shuffle=True, random_state=0,
eta0=0.01, power_t=0.25, n_iter_no_change=5):
self.penalty = penalty
self.alpha = alpha
self.max_iter = max_iter
self.tol = tol
self.shuffle = shuffle
self.random_state = random_state
self.eta0 = eta0
self.power_t = power_t
self.n_iter_no_change = n_iter_no_change
def fit(self, X, y):
coef = np.zeros(X.shape[1])
intercept = 0
best_loss = np.inf
no_improvement_count = 0
t = 1
rng = np.random.RandomState(self.random_state)
for epoch in range(self.max_iter):
# different from how data is shuffled in scikit-learn
if self.shuffle:
ind = rng.permutation(X.shape[0])
X, y = X[ind], y[ind]
sumloss = 0
for i in range(X.shape[0]):
sumloss += _loss(X[i], y[i], coef, intercept)
eta = self.eta0 / np.power(t, self.power_t)
coef_grad, intercept_grad = _grad(X[i], y[i], coef, intercept)
if self.penalty == "l2":
coef *= 1 - eta * self.alpha
coef -= eta * coef_grad
intercept -= eta * intercept_grad
t += 1
if sumloss > best_loss - self.tol * X.shape[0]:
no_improvement_count += 1
else:
no_improvement_count = 0
if no_improvement_count == self.n_iter_no_change:
break
if sumloss < best_loss:
best_loss = sumloss
self.coef_ = coef
self.intercept_ = np.array([intercept])
self.n_iter_ = epoch + 1
return self
def predict(self, X):
y_pred = np.dot(X, self.coef_) + self.intercept_
return y_pred
# shuffle=False penalty="none"
X, y = load_boston(return_X_y=True)
X = StandardScaler().fit_transform(X)
clf1 = SGDRegressor(shuffle=False, penalty="none").fit(X, y)
clf2 = skSGDRegressor(shuffle=False, penalty="none").fit(X, y)
assert np.allclose(clf1.coef_, clf2.coef_)
assert np.allclose(clf1.intercept_, clf2.intercept_)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.allclose(pred1, pred2)
# shuffle=False penalty="l2"
for alpha in [0.1, 1, 10]:
X, y = load_boston(return_X_y=True)
X = StandardScaler().fit_transform(X)
clf1 = SGDRegressor(shuffle=False, alpha=alpha).fit(X, y)
clf2 = skSGDRegressor(shuffle=False, alpha=alpha).fit(X, y)
assert np.allclose(clf1.coef_, clf2.coef_)
assert np.allclose(clf1.intercept_, clf2.intercept_)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.allclose(pred1, pred2)
def _loss(x, y, coef, intercept, epsilon):
p = np.dot(x, coef) + intercept
r = p - y
if np.abs(r) <= epsilon:
return 0.5 * r * r
else:
return epsilon * (np.abs(r) - 0.5 * epsilon)
def _grad(x, y, coef, intercept, epsilon):
p = np.dot(x, coef) + intercept
r = p - y
if np.abs(r) <= epsilon:
dloss = r
elif r > epsilon:
dloss = epsilon
else:
dloss = -epsilon
dloss = np.clip(dloss, -1e12, 1e12)
coef_grad = dloss * x
intercept_grad = dloss
return coef_grad, intercept_grad
class SGDRegressor():
def __init__(self, penalty="l2", alpha=0.0001, max_iter=1000, tol=1e-3,
shuffle=True, epsilon=0.1, random_state=0,
eta0=0.01, power_t=0.25, n_iter_no_change=5):
self.penalty = penalty
self.alpha = alpha
self.max_iter = max_iter
self.tol = tol
self.shuffle = shuffle
self.epsilon = epsilon
self.random_state = random_state
self.eta0 = eta0
self.power_t = power_t
self.n_iter_no_change = n_iter_no_change
def fit(self, X, y):
coef = np.zeros(X.shape[1])
intercept = 0
best_loss = np.inf
no_improvement_count = 0
t = 1
rng = np.random.RandomState(self.random_state)
for epoch in range(self.max_iter):
# different from how data is shuffled in scikit-learn
if self.shuffle:
ind = rng.permutation(X.shape[0])
X, y = X[ind], y[ind]
sumloss = 0
for i in range(X.shape[0]):
sumloss += _loss(X[i], y[i], coef, intercept, self.epsilon)
eta = self.eta0 / np.power(t, self.power_t)
coef_grad, intercept_grad = _grad(X[i], y[i], coef, intercept, self.epsilon)
if self.penalty == "l2":
coef *= 1 - eta * self.alpha
coef -= eta * coef_grad
intercept -= eta * intercept_grad
t += 1
if sumloss > best_loss - self.tol * X.shape[0]:
no_improvement_count += 1
else:
no_improvement_count = 0
if no_improvement_count == self.n_iter_no_change:
break
if sumloss < best_loss:
best_loss = sumloss
self.coef_ = coef
self.intercept_ = np.array([intercept])
self.n_iter_ = epoch + 1
return self
def predict(self, X):
y_pred = np.dot(X, self.coef_) + self.intercept_
return y_pred
# shuffle=False penalty="none"
X, y = load_boston(return_X_y=True)
X = StandardScaler().fit_transform(X)
clf1 = SGDRegressor(shuffle=False, penalty="none").fit(X, y)
clf2 = skSGDRegressor(loss="huber", shuffle=False, penalty="none").fit(X, y)
assert np.allclose(clf1.coef_, clf2.coef_)
assert np.allclose(clf1.intercept_, clf2.intercept_)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.allclose(pred1, pred2)
# shuffle=False penalty="l2"
for alpha in [0.1, 1, 10]:
X, y = load_boston(return_X_y=True)
X = StandardScaler().fit_transform(X)
clf1 = SGDRegressor(shuffle=False, alpha=alpha).fit(X, y)
clf2 = skSGDRegressor(loss="huber", shuffle=False, alpha=alpha).fit(X, y)
assert np.allclose(clf1.coef_, clf2.coef_)
assert np.allclose(clf1.intercept_, clf2.intercept_)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.allclose(pred1, pred2)