Created
January 11, 2017 14:45
-
-
Save pjankiewicz/b4fca15272317dd963c03145dbfb0b0c to your computer and use it in GitHub Desktop.
Hierarchical Classifier
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from sklearn.base import BaseEstimator, ClassifierMixin, clone | |
from collections import defaultdict | |
class HClassifier(BaseEstimator, ClassifierMixin): | |
ROOT = object() | |
def __init__(self, base_estimator, min_obs=None, max_level=None): | |
self.base_estimator = base_estimator | |
self.min_obs = min_obs | |
self.max_level = max_level | |
self.estimators = {} | |
def fit(self, X, y, **args): | |
y_with_root = self._add_root(y) | |
reverse_index = self._generate_reverse_index(y_with_root) | |
self._make_classifiers(X, y_with_root, reverse_index) | |
return self | |
def predict(self, X): | |
y_hat = [[self.ROOT] for _ in range(X.shape[0])] | |
while True: | |
# check if there are categories in the dictionary | |
# if not stop | |
classes = self._get_last_elems(y_hat) | |
classes_present = set([k for k in classes if k in self.estimators]) | |
if len(classes_present) == 0: | |
break | |
circular = False | |
for cl in set(classes_present): | |
ind = np.where(classes == cl)[0] | |
X_ = X[ind, :] | |
pred = self.estimators[cl].predict(X_) | |
for i, p in zip(ind, pred): | |
y_hat[i].append(p) | |
if len(y_hat[i]) > 20: | |
circular = True | |
if circular: | |
break | |
y_hat_without_root = [y[1:] for y in y_hat] | |
return y_hat_without_root | |
def _add_root(self, y): | |
return [tuple([self.ROOT] + list(k)) for k in y] | |
def _get_last_elems(self, v): | |
# returns last elements of list[list] | |
return np.array([e[-1] for e in v]) | |
def _generate_reverse_index(self, y): | |
# generates an index of where each class appears and on which position | |
reverse_index = defaultdict(list) | |
for obs_i, obs_y in enumerate(y): | |
for i, y_ in enumerate(obs_y[:-1]): | |
if self.max_level and i < self.max_level: | |
reverse_index[y_].append((obs_i, i)) | |
reverse_index = dict(reverse_index.items()) | |
return reverse_index | |
def _make_classifiers(self, X, y, reverse_index, monitor=iter): | |
for root, indices in monitor(reverse_index.items()): | |
ind = np.array([k[0] for k in indices]) | |
X_ = X[ind, :] | |
y_ = np.array([y[i][p + 1] for i, p in indices]) | |
if self.min_obs and len(y_) < self.min_obs: | |
continue | |
if len(set(y_)) == 1: | |
est = ConstantClassifier(y=y_[0]) | |
else: | |
est = clone(self.base_estimator) | |
est.fit(X_, y_) | |
self.estimators[root] = est | |
class ConstantClassifier(BaseEstimator, ClassifierMixin): | |
def __init__(self, y): | |
self.y = y | |
def fit(self, X, y, **args): | |
return self | |
def predict(self, X): | |
return np.repeat(self.y, X.shape[0]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment