Last active
May 10, 2017 19:52
-
-
Save satra/c6eb113055810f19709fa7c5ebd23de8 to your computer and use it in GitHub Desktop.
Feature selection based on constructed noise feature
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sklearn | |
from sklearn.base import BaseEstimator | |
from sklearn.base import TransformerMixin | |
class CustFsNoiseWinnow(BaseEstimator, TransformerMixin): | |
"""Remove features with less importance than a noise feature""" | |
import scipy | |
import scipy.stats | |
import numpy as np | |
import sklearn | |
import sklearn.utils | |
import sklearn.utils.validation | |
import sklearn.metrics | |
def fit(self, X, y): | |
"""Fit the model with X. | |
This is the workhorse function. | |
Parameters | |
---------- | |
X: array-like, shape (n_samples, n_features) | |
Training data, where n_samples in the number of samples | |
and n_features is the number of features. | |
Returns | |
------- | |
self : object | |
Returns the instance itself. | |
self.mask_ : array | |
Logical array of features to keep | |
""" | |
n_winnow = 10 | |
clf_flag = True | |
n_estimators = 1000 | |
X_input = X.copy() | |
n_sample = np.shape(X_input)[0] | |
# Add "1" to the col dimension to account for always keeping the noise vector inside the loop | |
n_feature = np.shape(X)[1] | |
idx_keep = np.arange(n_feature + 1) | |
counter = 0 | |
noise_flag = True | |
while noise_flag: | |
counter = counter + 1 | |
# Keep regenerating a noise vector as long as the noise vector is more than 0.05 | |
# correlated with the output. | |
# Use correlation if regression, and ROC AUC if classification | |
if clf_flag: | |
noise_feature = np.random.normal(loc=0, scale=10.0, size=(n_sample,1)) | |
noise_score = sklearn.metrics.roc_auc_score(y, noise_feature, average='macro', sample_weight=None) | |
while (noise_score > 0.6) or (noise_score < 0.4): | |
noise_feature = np.random.normal(loc=0, scale=10.0, size=(n_sample,1)) | |
noise_score = sklearn.metrics.roc_auc_score(y, noise_feature, average='macro', sample_weight=None) | |
else: | |
noise_feature = np.random.normal(loc=0, scale=10.0, size=(n_sample,1)) | |
while np.abs(np.corrcoef(noise_feature, y[:, np.newaxis], rowvar=0)[0][1]) > 0.05: | |
noise_feature = np.random.normal(loc=0, scale=10.0, size=(n_sample,1)) | |
# Add noise feature | |
X = np.concatenate((X_input, noise_feature), axis=1) | |
# Initialize estimator | |
if clf_flag: | |
clf = sklearn.ensemble.ExtraTreesClassifier(n_estimators=n_estimators, | |
criterion='gini', | |
max_depth=None, | |
min_samples_split=2, min_samples_leaf=1, | |
min_weight_fraction_leaf=0.0, | |
max_features='auto', max_leaf_nodes=None, | |
min_impurity_split=1e-07, bootstrap=False, | |
oob_score=False, n_jobs=1, random_state=None, verbose=0, | |
warm_start=False, class_weight=None) | |
else: | |
clf = sklearn.ensemble.ExtraTreesRegressor(n_estimators=n_estimators, criterion='mse', max_depth=None, min_samples_split=2, | |
min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features='auto', | |
max_leaf_nodes=None, min_impurity_split=1e-07, bootstrap=False, | |
oob_score=False, n_jobs=1, random_state=None, verbose=0, warm_start=False) | |
clf.fit(X[:, idx_keep], y) | |
print('done fitting once') | |
importances = clf.feature_importances_ | |
k = 1 | |
if np.all(importances[0:-1] > k*importances[-1]): | |
print('all good') | |
# all features better than noise | |
# comment out to force counter renditions of winnowing | |
#noise_flag = False | |
elif np.all(k*importances[-1] > importances[0:-1]): | |
print('all bad') | |
# noise better than all features aka no feature better than noise | |
# Leave as separate if clause in case want to do something different than when all feat > noise | |
# comment out to force counter renditions of winnowing | |
#noise_flag = False # just take everything | |
else: | |
print('some good') | |
#Tracer()() | |
idx_keep = idx_keep[importances >= (k * importances[-1])] | |
#use >= so when saving, can always drop last index | |
importances = importances[importances >= (k * importances[-1])] | |
# always keep the noise index, which is n_feature (assuming 0 based python index) | |
#idx_keep = np.concatenate((idx_keep[:, np.newaxis], np.array([[n_feature]])), axis=0) | |
idx_keep = np.ravel(idx_keep) | |
print(np.shape(idx_keep)) | |
# fail safe | |
if counter >= n_winnow: | |
noise_flag = False | |
self.importances_ = importances[:-1] | |
self.importances_snr_ = importances[:-1]/importances[-1] | |
self.idx_keep_ = idx_keep[:-1] | |
self.mask_ = np.asarray([True if i in idx_keep[:-1] else False for i in range(n_feature)]) | |
return self | |
def fit_transform(self, X, y=None): | |
"""Fit the model with X and apply the dimensionality reduction on X. | |
Parameters | |
---------- | |
X : array-like, shape (n_samples, n_features) | |
Training data, where n_samples is the number of samples | |
and n_features is the number of features. | |
Returns | |
------- | |
X_new : array-like, shape (n_samples, n_components) | |
""" | |
self = self.fit(X, y) | |
return X[:, self.mask_] | |
def transform(self, X, y=None): | |
"""Apply dimensionality reduction to X. | |
X is masked. | |
Parameters | |
---------- | |
X : array-like, shape (n_samples, n_features) | |
New data, where n_samples is the number of samples | |
and n_features is the number of features. | |
Returns | |
------- | |
X_new : array-like, shape (n_samples, n_components) | |
""" | |
sklearn.utils.validation.check_is_fitted(self, ['mask_'], all_or_any=all) | |
X = sklearn.utils.check_array(X) | |
return X[:, self.mask_] | |
# To use this in scikit learn: | |
clf = Pipeline([('std', StandardScaler()), | |
('feature_selection', CustFsNoiseWinnow()), | |
('et', ExtraTreesClassifier(n_estimators=2000))]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment