import numpy as np
from sklearn.datasets import load_boston
from sklearn.tree import DecisionTreeRegressor as skDecisionTreeRegressor
class TreeNode():
def __init__(self):
self.left_child = -1
self.right_child = -1
self.feature = None
self.threshold = None
self.impurity = None
self.n_node = None
self.value = None
class DecisionTreeRegressor():
def __init__(self, max_depth=2):
self.max_depth = max_depth
def _build_tree(self, X, y, cur_depth, parent, is_left):
if cur_depth == self.max_depth:
cur_node = TreeNode()
cur_node.impurity = np.mean(np.square(y)) - np.square(np.mean(y))
cur_node.n_node = X.shape[0]
cur_node.value = np.mean(y)
cur_id = len(self.tree_)
self.tree_.append(cur_node)
if parent is not None:
if is_left:
self.tree_[parent].left_child = cur_id
else:
self.tree_[parent].right_child = cur_id
return
best_improvement = -np.inf
best_feature = None
best_threshold = None
best_left_ind = None
best_right_ind = None
sum_total = np.sum(y)
for i in range(X.shape[1]):
sum_left = 0
sum_right = sum_total
n_left = 0
n_right = X.shape[0]
ind = np.argsort(X[:, i])
for j in range(ind.shape[0] - 1):
sum_left += y[ind[j]]
sum_right -= y[ind[j]]
n_left += 1
n_right -= 1
if j + 1 < ind.shape[0] - 1 and np.isclose(X[ind[j], i], X[ind[j + 1], i]):
continue
cur_improvement = sum_left * sum_left / n_left + sum_right * sum_right / n_right
if cur_improvement > best_improvement:
best_improvement = cur_improvement
best_feature = i
best_threshold = X[ind[j], i]
best_left_ind = ind[:j + 1]
best_right_ind = ind[j + 1:]
cur_node = TreeNode()
cur_node.feature = best_feature
cur_node.threshold = best_threshold
cur_node.impurity = np.mean(np.square(y)) - np.square(np.mean(y))
cur_node.n_node = X.shape[0]
cur_node.value = np.mean(y)
cur_id = len(self.tree_)
self.tree_.append(cur_node)
if parent is not None:
if is_left:
self.tree_[parent].left_child = cur_id
else:
self.tree_[parent].right_child = cur_id
if cur_depth < self.max_depth:
self._build_tree(X[best_left_ind], y[best_left_ind], cur_depth + 1, cur_id, True)
self._build_tree(X[best_right_ind], y[best_right_ind], cur_depth + 1, cur_id, False)
def fit(self, X, y):
self.n_features = X.shape[1]
self.tree_ = []
self._build_tree(X, y, 0, None, None)
return self
def apply(self, X):
pred = np.zeros(X.shape[0], dtype=np.int)
for i in range(X.shape[0]):
cur_node = 0
while self.tree_[cur_node].left_child != -1:
if X[i][self.tree_[cur_node].feature] <= self.tree_[cur_node].threshold:
cur_node = self.tree_[cur_node].left_child
else:
cur_node = self.tree_[cur_node].right_child
pred[i] = cur_node
return pred
def predict(self, X):
pred = self.apply(X)
return np.array([self.tree_[p].value for p in pred])
@property
def feature_importances_(self):
importances = np.zeros(self.n_features)
for node in self.tree_:
if node.left_child != -1:
left_child = self.tree_[node.left_child]
right_child = self.tree_[node.right_child]
importances[node.feature] += (node.n_node * node.impurity
- left_child.n_node * left_child.impurity
- right_child.n_node * right_child.impurity)
# feature importance used in GBDT
# importances /= self.tree_[0].n_node
return importances / np.sum(importances)
X, y = load_boston(return_X_y=True)
clf1 = DecisionTreeRegressor(max_depth=2).fit(X, y)
clf2 = skDecisionTreeRegressor(max_depth=2, random_state=0).fit(X, y)
assert np.array_equal([node.left_child for node in clf1.tree_],
clf2.tree_.children_left)
assert np.array_equal([node.right_child for node in clf1.tree_],
clf2.tree_.children_right)
clf1_leaf = [node.left_child != -1 for node in clf1.tree_]
clf2_leaf = clf2.tree_.children_left != -1
assert np.array_equal(clf1_leaf, clf2_leaf)
assert np.array_equal(np.array([node.feature for node in clf1.tree_])[clf1_leaf],
clf2.tree_.feature[clf2_leaf])
# threshold is slightly different
# because scikit-learn users the average value between current point and the next point
assert np.allclose([node.impurity for node in clf1.tree_],
clf2.tree_.impurity)
assert np.array_equal([node.n_node for node in clf1.tree_],
clf2.tree_.n_node_samples)
assert np.allclose([node.value for node in clf1.tree_],
clf2.tree_.value.ravel())
assert np.allclose(clf1.feature_importances_, clf2.feature_importances_)
pred1 = clf1.apply(X)
pred2 = clf2.apply(X)
assert np.array_equal(pred1, pred2)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.allclose(pred1, pred2)
class DecisionTreeRegressor():
def __init__(self, max_depth=2):
self.max_depth = max_depth
def _build_tree(self, X, y, cur_depth, parent, is_left):
if cur_depth == self.max_depth:
cur_node = TreeNode()
cur_node.impurity = np.mean(np.square(y)) - np.square(np.mean(y))
cur_node.n_node = X.shape[0]
cur_node.value = np.mean(y)
cur_id = len(self.tree_)
self.tree_.append(cur_node)
if parent is not None:
if is_left:
self.tree_[parent].left_child = cur_id
else:
self.tree_[parent].right_child = cur_id
return
best_improvement = -np.inf
best_feature = None
best_threshold = None
best_left_ind = None
best_right_ind = None
sum_total = np.sum(y)
for i in range(X.shape[1]):
sum_left = 0
sum_right = sum_total
n_left = 0
n_right = X.shape[0]
ind = np.argsort(X[:, i])
for j in range(ind.shape[0] - 1):
sum_left += y[ind[j]]
sum_right -= y[ind[j]]
n_left += 1
n_right -= 1
if j + 1 < ind.shape[0] - 1 and np.isclose(X[ind[j], i], X[ind[j + 1], i]):
continue
cur_improvement = n_left * n_right * np.square(sum_left / n_left - sum_right / n_right)
if cur_improvement > best_improvement:
best_improvement = cur_improvement
best_feature = i
best_threshold = X[ind[j], i]
best_left_ind = ind[:j + 1]
best_right_ind = ind[j + 1:]
cur_node = TreeNode()
cur_node.feature = best_feature
cur_node.threshold = best_threshold
cur_node.impurity = np.mean(np.square(y)) - np.square(np.mean(y))
cur_node.n_node = X.shape[0]
cur_node.value = np.mean(y)
cur_id = len(self.tree_)
self.tree_.append(cur_node)
if parent is not None:
if is_left:
self.tree_[parent].left_child = cur_id
else:
self.tree_[parent].right_child = cur_id
if cur_depth < self.max_depth:
self._build_tree(X[best_left_ind], y[best_left_ind], cur_depth + 1, cur_id, True)
self._build_tree(X[best_right_ind], y[best_right_ind], cur_depth + 1, cur_id, False)
def fit(self, X, y):
self.n_features = X.shape[1]
self.tree_ = []
self._build_tree(X, y, 0, None, None)
return self
def apply(self, X):
pred = np.zeros(X.shape[0], dtype=np.int)
for i in range(X.shape[0]):
cur_node = 0
while self.tree_[cur_node].left_child != -1:
if X[i][self.tree_[cur_node].feature] <= self.tree_[cur_node].threshold:
cur_node = self.tree_[cur_node].left_child
else:
cur_node = self.tree_[cur_node].right_child
pred[i] = cur_node
return pred
def predict(self, X):
pred = self.apply(X)
return np.array([self.tree_[p].value for p in pred])
@property
def feature_importances_(self):
importances = np.zeros(self.n_features)
for node in self.tree_:
if node.left_child != -1:
left_child = self.tree_[node.left_child]
right_child = self.tree_[node.right_child]
importances[node.feature] += (node.n_node * node.impurity
- left_child.n_node * left_child.impurity
- right_child.n_node * right_child.impurity)
# feature importance used in GBDT
# importances /= self.tree_[0].n_node
return importances / np.sum(importances)
X, y = load_boston(return_X_y=True)
clf1 = DecisionTreeRegressor(max_depth=2).fit(X, y)
clf2 = skDecisionTreeRegressor(max_depth=2, criterion="friedman_mse", random_state=0).fit(X, y)
assert np.array_equal([node.left_child for node in clf1.tree_],
clf2.tree_.children_left)
assert np.array_equal([node.right_child for node in clf1.tree_],
clf2.tree_.children_right)
clf1_leaf = [node.left_child != -1 for node in clf1.tree_]
clf2_leaf = clf2.tree_.children_left != -1
assert np.array_equal(clf1_leaf, clf2_leaf)
assert np.array_equal(np.array([node.feature for node in clf1.tree_])[clf1_leaf],
clf2.tree_.feature[clf2_leaf])
# threshold is slightly different
# because scikit-learn users the average value between current point and the next point
assert np.allclose([node.impurity for node in clf1.tree_],
clf2.tree_.impurity)
assert np.array_equal([node.n_node for node in clf1.tree_],
clf2.tree_.n_node_samples)
assert np.allclose([node.value for node in clf1.tree_],
clf2.tree_.value.ravel())
assert np.allclose(clf1.feature_importances_, clf2.feature_importances_)
pred1 = clf1.apply(X)
pred2 = clf2.apply(X)
assert np.array_equal(pred1, pred2)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.allclose(pred1, pred2)