Created
November 27, 2014 13:31
-
-
Save jnothman/274710f945e311697466 to your computer and use it in GitHub Desktop.
examples of resamplers for scikit-learn
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function, division | |
import numpy as np | |
from sklearn.base import BaseEstimator | |
from sklearn.cluster import MiniBatchKMeans, SpectralClustering | |
from sklearn.neighbors import KNeighborsClassifier | |
from sklearn.utils.random import sample_without_replacement | |
from sklearn.svm import OneClassSVM | |
from sklearn.linear_model import LogisticRegression | |
from sklearn import datasets | |
class OutlierDetector(BaseEstimator): | |
def __init__(self, estimator, df_threshold=0): | |
self.estimator = estimator | |
self.df_threshold = df_threshold | |
def fit_resample(self, X, y=None, sample_weight=None): | |
if y is not None: | |
if sample_weight is None: | |
self.estimator.fit(X, y) | |
else: | |
self.estimator.fit(X, y, sample_weight=sample_weight) | |
else: | |
if sample_weight is None: | |
self.estimator.fit(X) | |
else: | |
self.estimator.fit(X, sample_weight=sample_weight) | |
return self.resample(X, y, sample_weight) | |
def resample(self, X, y=None, sample_weight=None): | |
df = self.estimator.decision_function(X) | |
# TODO: check df has right dimensions | |
df = df.ravel() | |
if sample_weight is None: | |
sample_weight = np.ones(X.shape[0]) | |
else: | |
sample_weight = sample_weight.copy() | |
sample_weight[df < self.df_threshold] = 0 | |
# TODO: perhaps only return y if not None | |
return {'X': X, 'y': y, 'sample_weight': sample_weight} | |
class Undersampler(BaseEstimator): | |
def fit_resample(self, X, y): | |
# TODO: support sample_weight | |
label_set, labels = np.unique(y, return_inverse=True) | |
samples_per_class = np.min(np.bincount(labels)) | |
indices = [] | |
for label in label_set: | |
label_indices = np.flatnonzero(y == label) | |
selected = sample_without_replacement(len(label_indices), | |
samples_per_class) | |
indices.extend(label_indices.take(selected)) | |
sample_weight = np.zeros(len(y)) | |
sample_weight[indices] = 1 | |
return {'X': X, 'y': y, 'sample_weight': sample_weight} | |
class WeightsAsReplication(BaseEstimator): | |
def fit_resample(self, X, y=None, sample_weight=None): | |
if sample_weight is None: | |
out = {'X': X, 'y': y, 'sample_weight': sample_weight} | |
else: | |
# find some better way to quantize | |
repeats = (sample_weight / np.abs(sample_weight[sample_weight != 0]).min()) | |
repeats = np.ceil(repeats).astype(int) | |
out = {'X': np.repeat(X, repeats, axis=0), | |
'y': np.repeat(y, repeats, axis=0) if y is not None else None} | |
if y is None: | |
out.pop('y') | |
return out | |
class ResampleAsCentroidsMixin(object): | |
def fit_resample(self, X): | |
self.fit(X) | |
return {'X': self.cluster_centers_} | |
@property | |
def fit_transform(self): | |
raise AttributeError() | |
@property | |
def transform(self): | |
raise AttributeError() | |
class KMeansResampler(ResampleAsCentroidsMixin, MiniBatchKMeans): | |
pass | |
class ClustersAsLabels(object): | |
# This is probably a bad use of the model and there are alternatives... | |
def __init__(self, estimator): | |
self.estimator = estimator | |
def fit_resample(self, X): | |
return {'X': X, 'y': self.estimator.fit_predict(X)} | |
class MyPipeline(BaseEstimator): | |
def __init__(self, estimators): | |
self.estimators = estimators | |
def _fit(self, X, y=None, sample_weight=None): | |
args = (X,) if y is None else (X, y) | |
kwargs = ({} if sample_weight is None | |
else {'sample_weight': sample_weight}) | |
for estimator in self.estimators[:-1]: | |
if hasattr(estimator, 'fit_resample'): | |
if hasattr(estimator, 'fit_transform') or \ | |
hasattr(estimator, 'transform'): | |
raise ValueError('Transformer cannot also be Resampler: {!r}'.format(estimator)) | |
print(estimator, map(type, args), kwargs.keys()) | |
kwargs = estimator.fit_resample(*args, **kwargs) | |
X = kwargs.pop('X', None) | |
y = kwargs.pop('y', None) | |
args = (X,) if y is None else (X, y) | |
else: | |
X = estimator.fit_transform(*args, **kwargs) | |
args = (X,) + args[1:] | |
return args, kwargs | |
def fit(self, X, y=None, sample_weight=None): | |
args, kwargs = self._fit(X, y, sample_weight) | |
print(self.estimators[-1], len(args), kwargs.keys()) | |
self.estimators[-1].fit(*args, **kwargs) | |
return self | |
def predict(self, X): | |
for estimator in self.estimators[:-1]: | |
if hasattr(estimator, 'fit_resample'): | |
pass | |
else: | |
X = estimator.transform(X) | |
return self.estimators[-1].predict(X) | |
# Impractical but possible usage: | |
sub_spectral = MyPipeline([KMeansResampler(n_clusters=200), | |
ClustersAsLabels(SpectralClustering(n_clusters=2, eigen_solver='arpack', affinity='nearest_neighbors')), | |
KNeighborsClassifier(1)]) | |
X, y = datasets.make_moons(1000) | |
sub_spectral.fit(X).predict(X) | |
# More common usage: | |
inlier_model = MyPipeline([OutlierDetector(OneClassSVM()), | |
WeightsAsReplication(), | |
LogisticRegression()]) | |
inlier_model.fit(X, y).predict(X) | |
resamp_model = MyPipeline([Undersampler(), | |
WeightsAsReplication(), | |
LogisticRegression()]) | |
resamp_model.fit(X, y).predict(X) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment