Created
April 16, 2013 13:55
-
-
Save jnothman/5396078 to your computer and use it in GitHub Desktop.
More functionality in scikit-learn `Scorer`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
from abc import ABCMeta, abstractmethod | |
from functools import partial | |
import numpy as np | |
from sklearn.metrics import precision_recall_fscore_support | |
from sklearn.base import BaseEstimator | |
class BaseScorer(object): | |
__meta__ = ABCMeta | |
@abstractmethod | |
def __init__(self, fields=['score'], mean_fields=None, objective_field='score', objective_is_loss=False, dtypes=None): | |
self.fields = fields | |
self.dtypes = dtypes | |
self.mean_fields = mean_fields | |
self.objective_field = objective_field | |
self.objective_is_loss = objective_is_loss | |
def as_tuple(self, *args, **kwargs): | |
""" | |
Calculates the score, and if it is not a tuple, wraps it as a 1-tuple | |
""" | |
# XXX perhaps we should just require __call__ to return a tuple | |
res = self(*args, **kwargs) | |
if type(res) != tuple: | |
return (res,) | |
return res | |
def to_records(self, scores): | |
# TODO: if already records, move on | |
# TODO: handle non-tuple (?) | |
if type(scores) == tuple: | |
zero_dim = True | |
scores = [scores] | |
else: | |
zero_dim = False | |
scores = np.rec.fromrecords(scores, names=self.fields, formats=self.dtypes) | |
if zero_dim: | |
return scores[0] | |
return scores | |
def mean(self, scores, weights=None, axis=-1): | |
# TODO: decorate and undecorate non-structured-arrays and perhaps non-structured arrays | |
# TODO: handle empty mean_fields (can't construct empty datatype) | |
mean_fields = self.mean_fields | |
if mean_fields is None: | |
mean_fields = self.fields | |
if weights is None: | |
weights = np.ones(scores.shape) | |
denom = weights.sum(axis=axis) | |
arrays = [] | |
for field in mean_fields: | |
arrays.append((scores[field] * weights).sum(axis=axis) / denom) | |
return np.rec.fromarrays(arrays, names=mean_fields) | |
def to_objective(self, scores): | |
if type(scores) == tuple: | |
res = scores[self.fields.index(self.objective_field)] | |
else: | |
# assume record array: | |
res = scores[self.objective_field] | |
if self.objective_is_loss: | |
return -res | |
return res | |
class MockSingleScorer(BaseScorer): | |
def __init__(self): | |
super(MockSingleScorer, self).__init__(objective_is_loss=True) | |
def __call__(self, estimator, X, y): | |
return 0.5 | |
class MockMultiScorer(BaseScorer): | |
def __init__(self): | |
super(MockMultiScorer, self).__init__(fields=['P', 'R', 'F'], mean_fields=['F'], objective_field='F', dtypes=['f4', 'f4', 'f4']) | |
def __call__(self, estimator, X, y): | |
return 0.3, 0.8, 2 * 0.3 * 0.8 / (.3 + .8) | |
class MultiMetricScorer(BaseScorer): | |
def __init__(self, predict_funcs={}, threshold_funcs={}, mean_fields=None, objective_field='score', objective_is_loss=False): | |
fields = [] | |
if predict_funcs: | |
predict_fields, self.predict_func_list = zip(*list(predict_funcs.iteritems())) | |
fields.extend(predict_fields) | |
else: | |
self.predict_func_list = None | |
if threshold_funcs: | |
threshold_fields, self.threshold_func_list = zip(*list(threshold_funcs.iteritems())) | |
fields.extend(threshold_fields) | |
else: | |
self.threshold_func_list = None | |
if objective_field not in fields: | |
raise ValueError('objective_field {!r} not matched to function'.format(objective_field)) | |
super(MultiMetricScorer, self).__init__(fields=fields, mean_fields=None, objective_field=objective_field, objective_is_loss=objective_is_loss) | |
def __call__(self, estimator, X, y=None): | |
scores = [] | |
if self.predict_func_list: | |
y_pred = estimator.predict(X) | |
args = (y, y_pred) if y is not None else (y_pred,) | |
scores.extend(score_func(*args) for score_func in self.predict_func_list) | |
if self.threshold_func_list: | |
if y is not None and len(np.unique(y)) > 2: | |
raise ValueError("This classification score only " | |
"supports binary classification.") | |
try: | |
y_pred = estimator.decision_function(X).ravel() | |
except (NotImplementedError, AttributeError): | |
y_pred = estimator.predict_proba(X)[:, 1] | |
args = (y, y_pred) if y is not None else (y_pred,) | |
scores.extend(score_func(*args) for score_func in self.threshold_func_list) | |
return tuple(scores) | |
class Scorer(MultiMetricScorer): | |
def __init__(self, score_func, greater_is_better=True, | |
needs_threshold=False, **kwargs): | |
if kwargs: | |
score_func = partial(score_func, **kwargs) | |
if needs_threshold: | |
kw = 'threshold_funcs' | |
else: | |
kw = 'predict_funcs' | |
super(Scorer, self).__init__(**{kw: {'score': score_func}, 'objective_is_loss': not greater_is_better}) | |
class EstimatorScorer(BaseScorer): | |
def __init__(self): | |
super(EstimatorScorer, self).__init__() | |
def __call__(self, estimator, X, y): | |
return estimator.score(X, y) | |
class PRFScorer(BaseScorer): | |
def __init__(self, beta=1.): | |
super(PRFScorer, self).__init__(fields=['P', 'R', 'F', 'support'], mean_fields=['P', 'R', 'F'], objective_field='F', dtypes=['f4', 'f4', 'f4', 'i']) | |
self.beta = beta | |
def __call__(self, estimator, X, y): | |
return precision_recall_fscore_support(estimator.predict(X), y, beta=self.beta, average='weighted') | |
class MockEstimator(BaseEstimator): | |
labels_ = [-1, 1] | |
def fit(self, X, y=None): | |
return self | |
def predict_proba(self, X): | |
return np.array([[.3, .7], [.7, .3], [.2, .8], [.1, .9]]) | |
def predict(self, X): | |
return np.array([1, -1, 1, 1]) | |
def score(self, X, y): | |
return 0.1 | |
if __name__ == '__main__': | |
from sklearn import metrics | |
make_multi = partial(MultiMetricScorer, predict_funcs={'f1': metrics.f1_score, 'f2': partial(metrics.fbeta_score, beta=2.)}, threshold_funcs={'auc': metrics.auc_score}, objective_field='f1') | |
make_hamming = partial(Scorer, metrics.hamming_loss, greater_is_better=False) | |
make_auc = partial(Scorer, metrics.auc_score, needs_threshold=True, greater_is_better=True) | |
estimator = MockEstimator() | |
X = np.array([[0], [0], [0], [0]]) | |
y = np.array([1, -1, -1, 1]) | |
for construct in [MockSingleScorer, MockMultiScorer, EstimatorScorer, PRFScorer, make_multi, make_hamming, make_auc]: | |
scorer = construct() | |
fold_scores = [scorer.as_tuple(estimator, X, y) for fold in xrange(2)] | |
fold_scores = scorer.to_records(fold_scores) | |
mean = scorer.mean(fold_scores) | |
objective = scorer.to_objective(fold_scores) | |
mean_objective = scorer.to_objective(mean) | |
print('''{} | |
fold_scores: {} | |
objective: {} | |
mean: {} | |
mean_objective: {} | |
'''.format(scorer, repr(fold_scores), repr(objective), repr(mean), repr(mean_objective))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment