import numpy as np
from sklearn.datasets import load_iris
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.cluster import KMeans as skKmeans
class KMeans():
def __init__(self, n_clusters=8,
max_iter=300, tol=1e-4, random_state=0):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.tol = tol
self.random_state = random_state
def _labels_inertia(self, X, centers):
labels = np.zeros(X.shape[0])
inertia = 0
for sample_idx in range(X.shape[0]):
min_dis = np.inf
for center_idx in range(self.n_clusters):
d = np.sum(np.square(X[sample_idx] - centers[center_idx]))
if d < min_dis:
min_dis = d
labels[sample_idx] = center_idx
inertia += min_dis
return labels, inertia
def fit(self, X):
rng = np.random.RandomState(self.random_state)
# consistent with scikit-learn
tol = np.mean(np.var(X, axis=0)) * self.tol
centers = X[rng.permutation(X.shape[0])[:self.n_clusters]]
for i in range(self.max_iter):
centers_old = centers.copy()
labels, inertia = self._labels_inertia(X, centers)
for center_idx in range(self.n_clusters):
centers[center_idx] = np.mean(X[labels == center_idx], axis=0)
center_shift_total = np.sum(np.square(centers_old - centers))
if center_shift_total <= tol:
break
if center_shift_total > 0:
labels, inertia = self._labels_inertia(X, centers)
self.cluster_centers_ = centers
self.labels_ = labels
self.inertia_ = inertia
self.n_iter_ = i + 1
return self
def predict(self, X):
return self._labels_inertia(X, self.cluster_centers_)[0]
X, _ = load_iris(return_X_y=True)
clf1 = KMeans(n_clusters=3, random_state=0).fit(X)
clf2 = skKmeans(n_clusters=3, init="random", n_init=1, algorithm="full", random_state=0).fit(X)
assert np.allclose(clf1.cluster_centers_, clf2.cluster_centers_)
assert np.array_equal(clf1.labels_, clf2.labels_)
assert np.allclose(clf1.inertia_, clf2.inertia_)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.array_equal(pred1, pred2)
class KMeans():
def __init__(self, n_clusters=8,
max_iter=300, tol=1e-4, random_state=0):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.tol = tol
self.random_state = random_state
def _labels_inertia(self, X, centers):
labels = np.zeros(X.shape[0])
inertia = 0
for sample_idx in range(X.shape[0]):
min_dis = np.inf
for center_idx in range(self.n_clusters):
d = np.sum(np.square(X[sample_idx] - centers[center_idx]))
if d < min_dis:
min_dis = d
labels[sample_idx] = center_idx
inertia += min_dis
return labels, inertia
def fit(self, X):
# consistent with scikit-learn
tol = np.mean(np.var(X, axis=0)) * self.tol
rng = np.random.RandomState(self.random_state)
centers = np.empty((self.n_clusters, X.shape[1]), dtype=X.dtype)
centers[0] = X[rng.randint(X.shape[0])]
closest_dist_sq = euclidean_distances(centers[0, np.newaxis], X, squared=True)
n_local_trials = 2 + int(np.log(self.n_clusters))
for i in range(1, self.n_clusters):
rand_vals = rng.random_sample(n_local_trials) * np.sum(closest_dist_sq)
candidate_ids = np.searchsorted(np.cumsum(closest_dist_sq), rand_vals)
distance_to_candidates = euclidean_distances(X[candidate_ids], X)
distance_to_candidates = np.minimum(closest_dist_sq, distance_to_candidates)
candidates_pot = distance_to_candidates.sum(axis=1)
best_candidate = np.argmin(candidates_pot)
closest_dist_sq = distance_to_candidates[best_candidate]
centers[i] = X[candidate_ids[best_candidate]]
for i in range(self.max_iter):
centers_old = centers.copy()
labels, inertia = self._labels_inertia(X, centers)
for center_idx in range(self.n_clusters):
centers[center_idx] = np.mean(X[labels == center_idx], axis=0)
center_shift_total = np.sum(np.square(centers_old - centers))
if center_shift_total <= tol:
break
if center_shift_total > 0:
labels, inertia = self._labels_inertia(X, centers)
self.cluster_centers_ = centers
self.labels_ = labels
self.inertia_ = inertia
self.n_iter_ = i + 1
return self
def predict(self, X):
return self._labels_inertia(X, self.cluster_centers_)[0]
X, _ = load_iris(return_X_y=True)
clf1 = KMeans(n_clusters=3, random_state=0).fit(X)
clf2 = skKmeans(n_clusters=3, n_init=1, algorithm="full", random_state=0).fit(X)
assert np.allclose(clf1.cluster_centers_, clf2.cluster_centers_)
assert np.array_equal(clf1.labels_, clf2.labels_)
assert np.allclose(clf1.inertia_, clf2.inertia_)
pred1 = clf1.predict(X)
pred2 = clf2.predict(X)
assert np.array_equal(pred1, pred2)