Last active
April 21, 2024 13:41
-
-
Save mblondel/1451300 to your computer and use it in GitHub Desktop.
Fuzzy K-means and K-medians
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright Mathieu Blondel December 2011 | |
# License: BSD 3 clause | |
import numpy as np | |
import pylab as pl | |
from sklearn.base import BaseEstimator | |
from sklearn.utils import check_random_state | |
from sklearn.cluster import MiniBatchKMeans | |
from sklearn.cluster import KMeans as KMeansGood | |
from sklearn.metrics.pairwise import euclidean_distances, manhattan_distances | |
from sklearn.datasets.samples_generator import make_blobs | |
############################################################################## | |
# Generate sample data | |
np.random.seed(0) | |
batch_size = 45 | |
centers = [[1, 1], [-1, -1], [1, -1]] | |
n_clusters = len(centers) | |
X, labels_true = make_blobs(n_samples=1200, centers=centers, cluster_std=0.3) | |
class KMeans(BaseEstimator): | |
def __init__(self, k, max_iter=100, random_state=0, tol=1e-4): | |
self.k = k | |
self.max_iter = max_iter | |
self.random_state = random_state | |
self.tol = tol | |
def _e_step(self, X): | |
self.labels_ = euclidean_distances(X, self.cluster_centers_, | |
squared=True).argmin(axis=1) | |
def _average(self, X): | |
return X.mean(axis=0) | |
def _m_step(self, X): | |
X_center = None | |
for center_id in range(self.k): | |
center_mask = self.labels_ == center_id | |
if not np.any(center_mask): | |
# The centroid of empty clusters is set to the center of | |
# everything | |
if X_center is None: | |
X_center = self._average(X) | |
self.cluster_centers_[center_id] = X_center | |
else: | |
self.cluster_centers_[center_id] = \ | |
self._average(X[center_mask]) | |
def fit(self, X, y=None): | |
n_samples = X.shape[0] | |
vdata = np.mean(np.var(X, 0)) | |
random_state = check_random_state(self.random_state) | |
self.labels_ = random_state.permutation(n_samples)[:self.k] | |
self.cluster_centers_ = X[self.labels_] | |
for i in xrange(self.max_iter): | |
centers_old = self.cluster_centers_.copy() | |
self._e_step(X) | |
self._m_step(X) | |
if np.sum((centers_old - self.cluster_centers_) ** 2) < self.tol * vdata: | |
break | |
return self | |
class KMedians(KMeans): | |
def _e_step(self, X): | |
self.labels_ = manhattan_distances(X, self.cluster_centers_).argmin(axis=1) | |
def _average(self, X): | |
return np.median(X, axis=0) | |
class FuzzyKMeans(KMeans): | |
def __init__(self, k, m=2, max_iter=100, random_state=0, tol=1e-4): | |
""" | |
m > 1: fuzzy-ness parameter | |
The closer to m is to 1, the closter to hard kmeans. | |
The bigger m, the fuzzier (converge to the global cluster). | |
""" | |
self.k = k | |
assert m > 1 | |
self.m = m | |
self.max_iter = max_iter | |
self.random_state = random_state | |
self.tol = tol | |
def _e_step(self, X): | |
D = 1.0 / euclidean_distances(X, self.cluster_centers_, squared=True) | |
D **= 1.0 / (self.m - 1) | |
D /= np.sum(D, axis=1)[:, np.newaxis] | |
# shape: n_samples x k | |
self.fuzzy_labels_ = D | |
self.labels_ = self.fuzzy_labels_.argmax(axis=1) | |
def _m_step(self, X): | |
weights = self.fuzzy_labels_ ** self.m | |
# shape: n_clusters x n_features | |
self.cluster_centers_ = np.dot(X.T, weights).T | |
self.cluster_centers_ /= weights.sum(axis=0)[:, np.newaxis] | |
def fit(self, X, y=None): | |
n_samples, n_features = X.shape | |
vdata = np.mean(np.var(X, 0)) | |
random_state = check_random_state(self.random_state) | |
self.fuzzy_labels_ = random_state.rand(n_samples, self.k) | |
self.fuzzy_labels_ /= self.fuzzy_labels_.sum(axis=1)[:, np.newaxis] | |
self._m_step(X) | |
for i in xrange(self.max_iter): | |
centers_old = self.cluster_centers_.copy() | |
self._e_step(X) | |
self._m_step(X) | |
if np.sum((centers_old - self.cluster_centers_) ** 2) < self.tol * vdata: | |
break | |
return self | |
kmeans = KMeans(k=3) | |
kmeans.fit(X) | |
kmedians = KMedians(k=3) | |
kmedians.fit(X) | |
fuzzy_kmeans = FuzzyKMeans(k=3, m=2) | |
fuzzy_kmeans.fit(X) | |
fig = pl.figure() | |
colors = ['#4EACC5', '#FF9C34', '#4E9A06'] | |
objects = (kmeans, kmedians, fuzzy_kmeans) | |
for i, obj in enumerate(objects): | |
ax = fig.add_subplot(1, len(objects), i + 1) | |
for k, col in zip(range(obj.k), colors): | |
my_members = obj.labels_ == k | |
cluster_center = obj.cluster_centers_[k] | |
ax.plot(X[my_members, 0], X[my_members, 1], 'w', | |
markerfacecolor=col, marker='.') | |
ax.plot(cluster_center[0], cluster_center[1], 'o', markerfacecolor=col, | |
markeredgecolor='k', markersize=6) | |
ax.set_title(obj.__class__.__name__) | |
pl.show() |
FuzzyKMeans only supports .fit()?, not .fit_predict()?
FuzzyKMeans only supports .fit()?, not .fit_predict()?
Try pairwise_distances_argmin from sklearn
It seems like as of scikit-learn v0.24.0, make_blobs() is in datasets module, not datasets.samples_generator(), so line 12 needs to be updated.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
So sklearn KMeans (KMeansGood here) has initializes the K centroids n_init times and returns the results from where inertia_ ("sum of squared distances of samples to their closest cluster center" is smallest. It does not seem like KMedians() does this, correct? Would it be possible to implement this functionality? (I realize I could write the wrapper to do it.)
I appreciate an implementation of K-medians for python; I would be even more appreciative if there were more comments. (For example, why axis=1 in manhattan_distances().argmin()?)