Created
April 19, 2020 00:52
-
-
Save alastairparagas/16e9206c68864ed7785d0e571f33dc69 to your computer and use it in GitHub Desktop.
Decision Tree
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
class DTLearner(object): | |
leaf_size = None | |
verbose = None | |
tree = None | |
def __init__(self, leaf_size=None, verbose=False): | |
self.leaf_size = leaf_size | |
self.verbose = verbose | |
def get_best_feature_index(self, Xtrain, Ytrain, exclude=[]): | |
""" | |
Returns the column index in Xtrain of the best feature | |
vector or None if there isn't one | |
""" | |
# Discover the best feature to "split" on | |
feature_set = Xtrain.transpose() | |
correlation_matrix = np.absolute( | |
np.corrcoef(feature_set, Ytrain) | |
) | |
feature_vectors_count = feature_set.shape[0] | |
target_vector_index = feature_vectors_count | |
# If there is only 1 feature, then it's automatically the | |
# best feature | |
if feature_vectors_count == 1: | |
return 0 | |
correlation_tuplets = sorted( | |
[ | |
(correlation_matrix[i][target_vector_index], i) | |
for i in range(feature_vectors_count) | |
if i not in exclude | |
], | |
key=lambda tuplet: tuplet[0], | |
reverse=True | |
) | |
if len(correlation_tuplets) == 0: | |
return None | |
best_feature_score, best_feature_index = correlation_tuplets[0] | |
split_val = np.median(Xtrain[:, best_feature_index]) | |
left_subtree_existence = Xtrain[:, best_feature_index] <= split_val | |
# Edge case: all values of best_feature vector <= split_val - avoid infinite recursion | |
if left_subtree_existence[ | |
left_subtree_existence == True | |
].shape[0] == left_subtree_existence.shape[0]: | |
if feature_vectors_count < 2: | |
return None | |
# Find next best possible feature vector | |
return self.get_best_feature_index( | |
Xtrain, Ytrain, exclude=exclude+[best_feature_index] | |
) | |
return best_feature_index | |
def __build_tree(self, Xtrain, Ytrain, leaf_size=None): | |
""" | |
Recursively build a decision tree | |
""" | |
# Make sure inputs are valid | |
if leaf_size is not None and not isinstance(leaf_size, int): | |
raise ValueError("leaf_size parameter is incorrect. Must be None or >= 1") | |
if isinstance(leaf_size, int) and leaf_size < 1: | |
raise ValueError("leaf_size parameter is incorrect. Must be None or >= 1") | |
record_types = np.dtype([ | |
('Factor', np.object), ('SplitVal', np.float64), | |
('Left', np.int32), ('Right', np.int32) | |
]) | |
# Base cases to prevent that call stack explosion | |
if Xtrain.shape[0] == 1: | |
if self.verbose: | |
print "Leaf: 1 training sample" | |
return np.array([(-1, Ytrain[0], -1, -1)], dtype=record_types) | |
if np.all(Ytrain == Ytrain[0], axis=0): | |
if self.verbose: | |
print "Leaf: Ytrain has same values for everything" | |
return np.array([(-1, Ytrain[0], -1, -1)], dtype=record_types) | |
if leaf_size >= Ytrain.shape[0]: | |
if self.verbose: | |
print "Leaf: leaf_size forces us to aggregate" | |
return np.array([(-1, np.mean(Ytrain), -1, -1)], dtype=record_types) | |
if Xtrain.shape[1] == 1: | |
if self.verbose: | |
print "Leaf: 1 column left" | |
return np.array([(-1, np.mean(Ytrain[0]), -1, -1)], dtype=record_types) | |
# Find best feature vector to use for split | |
best_feature_index = self.get_best_feature_index(Xtrain, Ytrain) | |
if best_feature_index is None: | |
if self.verbose: | |
print "Leaf: Cannot find a supreme feature vector" | |
return np.array([(-1, np.mean(Ytrain), -1, -1)], dtype=record_types) | |
split_val = np.median(Xtrain[:, best_feature_index]) | |
left_subtree_existence = Xtrain[:, best_feature_index] <= split_val | |
if self.verbose: | |
print "Splitting on feature {index} with val: {val}".format( | |
index=best_feature_index, | |
val=split_val | |
) | |
left_subtree = self.__build_tree( | |
Xtrain[left_subtree_existence], | |
Ytrain[left_subtree_existence], | |
leaf_size | |
) | |
right_subtree = self.__build_tree( | |
Xtrain[~left_subtree_existence], | |
Ytrain[~left_subtree_existence], | |
leaf_size | |
) | |
root = np.array( | |
[(best_feature_index, split_val, 1, left_subtree.shape[0] + 1)], | |
dtype=record_types | |
) | |
return np.append( | |
np.append(root, left_subtree), right_subtree | |
) | |
def __single_query(self, test_vector, start=0): | |
feature_index = self.tree[start][0] | |
split_val = self.tree[start][1] | |
left_findex = self.tree[start][2] | |
right_findex = self.tree[start][3] | |
# Starting at a leaf - end and return leaf value | |
if feature_index == -1: | |
return split_val | |
if test_vector[feature_index] <= split_val: | |
return self.__single_query( | |
test_vector, start=start+left_findex | |
) | |
return self.__single_query( | |
test_vector, start=start+right_findex | |
) | |
def addEvidence(self, Xtrain, Ytrain): | |
self.tree = self.__build_tree( | |
Xtrain, Ytrain, 1 if self.leaf_size is None else self.leaf_size | |
) | |
def query(self, Xtest): | |
return np.array([ | |
self.__single_query(test_vector) | |
for test_vector in Xtest | |
]) | |
def author(self): | |
return 'aparagas3' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment