Skip to content

Instantly share code, notes, and snippets.

@fluffywaffles
Last active May 11, 2016 03:32
Show Gist options
  • Save fluffywaffles/3d2f836d1f631bebbd6aa080037e306f to your computer and use it in GitHub Desktop.
Save fluffywaffles/3d2f836d1f631bebbd6aa080037e306f to your computer and use it in GitHub Desktop.
import math
from node import Node
from collections import Counter
#import sys
import timeit
iterations = 0
notNone = lambda x: x is not None
def ID3(data_set, attribute_metadata, numerical_splits_count, depth):
'''
See Textbook for algorithm. Make sure to handle unknown values, some
suggested approaches were given in lecture.
================================================================================
Input: A data_set, attribute_metadata, maximum number of splits to
consider for numerical attributes, maximum depth to search to (depth = 0
indicates that this node should output a label)
===============================================================================
Output: The node representing the decision tree learned over the given data set
===============================================================================
'''
global iterations
iterations = 0
def dip_the_fuck_out(n, data_set):
print("dip the fuck out")
n.label = mode(data_set)
return n
def _ID3(data_set, depth):
global iterations
iterations += 1
print(iterations)
n = Node()
n.label = mode(data_set) if depth == 0 else check_homogenous(data_set)
if n.label is not None:
return n
print("Pick Best Attribute")
attr, threshold = pick_best_attribute(data_set,
attribute_metadata,
numerical_splits_count)
print("Chosen: " + str(attr))
n.decision_attribute = attr
if not attr:
print("dip out - terminal node")
return dip_the_fuck_out(n, data_set)
print("And it was good")
n.splitting_value = threshold
metadata = attribute_metadata[attr]
n.is_nominal = metadata["is_nominal"]
n.name = metadata["name"]
print("get da mode")
n.best_guess = mode(data_set)
print("nice guesswork")
print(attribute_metadata[attr])
print(n.is_nominal)
print(n.name)
if n.is_nominal:
print("re-split nom")
splits = split_on_nominal(data_set, attr)
else:
print("re-split num")
splits = split_on_numerical(data_set, attr, threshold)
num_splits = len(splits) - 1
print("num splits: " + str(num_splits))
if num_splits > numerical_splits_count[attr]:
print("dip out - no more splits")
return dip_the_fuck_out(n, data_set)
print("the split is good")
numerical_splits_count[attr] -= 1
print("recur " + str(len(splits)) + " times")
splits = splits.iteritems() if n.is_nominal else enumerate(splits)
n.children = {
str(k) if k is None else k: _ID3(s, depth - 1) for k, s in splits
}
return n
biased = bias_replace_missing_with_avg(data_set, attribute_metadata)
return _ID3(biased, depth)
def mean(lst):
'''
Calculate the mean of the input list.
'''
l = len(lst)
return float(sum(lst)) / l if l > 0 else None
def list_mode(lst):
return Counter(lst).most_common(1)[0][0]
def bias_replace_missing_with_avg(data_set, _):
'''
For some reason, this is my longest function.
It's all the partitioning and partition undoing.
Replace 'None' values (missing values) with the average of all existing
values for that attribute.
'''
partitioned_by_attr = [
(mean(filter(notNone, attr_values)), attr_values)
for attr_values in [ getall(data_set, attr) for attr in
range(len(data_set[0])) ]
]
filled_in = [
[ avg if value is None else value for value in values ]
for (avg, values) in partitioned_by_attr
]
return [
[ biased_attr_values[i] for biased_attr_values in filled_in ]
for i in range(len(data_set))
]
def bias_replace_all_missing(data_set, attribute_metadata):
'''
For some reason, this is my longest function.
It's all the partitioning and partition undoing.
Replace 'None' values (missing values) with the average of all existing
values for that attribute.
'''
notNone = lambda x: x is not None
bestGuess = lambda x, nominal: mean(x) if nominal else list_mode(x)
partitioned_by_attr = [
(bestGuess(filter(notNone, attr_values), data["is_nominal"]), attr_values)
for (attr_values, data) in [
(getall(data_set, attr), attribute_metadata[attr])
for attr in range(len(data_set[0]))
]
]
filled_in = [
[ bestGuess if value is None else value for value in values ]
for (bestGuess, values) in partitioned_by_attr
]
return [
[ biased_attr_values[i] for biased_attr_values in filled_in ]
for i in range(len(data_set))
]
def check_homogenous(data_set):
'''
===============================================================================
Input: A data_set
===============================================================================
Job: Checks if the output value (index 0) is the same for all examples
in the the data_set, if so return that output value, otherwise return None.
===============================================================================
Output: Return either the homogenous attribute or None
===============================================================================
'''
check = data_set[0][0]
return check if all([check == datum[0] for datum in data_set]) else None
# ======== Test Cases =============================
#data_set = [[0],[1],[1],[1],[1],[1]]
#print( check_homogenous(data_set) == None )
#data_set = [[0],[1],[None],[0]]
#print( check_homogenous(data_set) == None )
#data_set = [[1],[1],[1],[1],[1],[1]]
#print( check_homogenous(data_set) == 1 )
def pick_best_attribute(data_set, attribute_metadata, numerical_splits_count):
'''
===============================================================================
Input: A data_set, attribute_metadata, splits counts for numeric
===============================================================================
Job: Find the attribute that maximizes the gain ratio. If attribute is
numeric return best split value. If nominal, then split value is False. If
gain ratio of all the attributes is 0, then return False, False.
Only consider numeric splits for which numerical_splits_count is greater than
zero.
===============================================================================
Output: best attribute, split value if numeric
===============================================================================
'''
max_gain = -1
result = (-1, None)
for attribute, metadata in enumerate(attribute_metadata[1:]):
attr = attribute + 1
t = False
if metadata['is_nominal']:
gain = gain_ratio_nominal(data_set, attr)
print("nominal gain ratio")
elif numerical_splits_count[attr] > 0:
gain, t = gain_ratio_numeric(data_set, attr)
print("numeric gain ratio")
else:
gain = False
if gain > max_gain:
max_gain = gain
result = (attr, t)
print("new local max!")
print((attr, max_gain, t))
return result if max_gain > 0 else (False, False)
def pick_best_attribute_slow(data_set, attribute_metadata, numerical_splits_count):
'''
===============================================================================
Input: A data_set, attribute_metadata, splits counts for numeric
===============================================================================
Job: Find the attribute that maximizes the gain ratio. If attribute is
numeric return best split value. If nominal, then split value is False. If
gain ratio of all the attributes is 0, then return False, False.
Only consider numeric splits for which numerical_splits_count is greater than
zero.
===============================================================================
Output: best attribute, split value if numeric
===============================================================================
'''
values = [
(attr, (gain_ratio_nominal(data_set, attr), False)) if metadata['is_nominal']
else (attr, gain_ratio_numeric(data_set, attr))
if numerical_splits_count[attr] != 0 else (attr, (False, False))
for attr, metadata in enumerate(attribute_metadata)
]
print("\t values calculated")
attribute, (gain_ratio, split) = max(values[1:], key=lambda x:x[1][0])
return (attribute, split) if gain_ratio > 0 else (False, False)
# # ======== Test Cases =============================
# numerical_splits_count = [20,20]
# attribute_metadata = [{'name': "winner",'is_nominal': True},{'name': "opprundifferential",'is_nominal': False}]
# data_set = [[1, 0.27], [0, 0.42], [0, 0.86], [0, 0.68], [0, 0.04], [1, 0.01], [1, 0.33], [1, 0.42], [0, 0.51], [1, 0.4]]
# pick_best_attribute(data_set, attribute_metadata, numerical_splits_count) == (1, 0.51)
# attribute_metadata = [{'name': "winner",'is_nominal': True},{'name': "weather",'is_nominal': True}]
# data_set = [[0, 0], [1, 0], [0, 2], [0, 2], [0, 3], [1, 1], [0, 4], [0, 2], [1, 2], [1, 5]]
# pick_best_attribute(data_set, attribute_metadata, numerical_splits_count) == (1, False)
# Uses gain_ratio_nominal or gain_ratio_numeric to calculate gain ratio.
### Utility functions
def getall(data_set, i):
return map(lambda d: d[i], data_set)
def output(data_set):
'''
An O(n) method of extracting output values from a data_set.
As efficient as can be for doing just this, but should probably be done in
parallel with processing of the list in order to make it fast.
'''
return getall(data_set, 0)
def mode(data_set):
'''
===============================================================================
Input: A data_set
===============================================================================
Job: Takes a data_set and finds mode of index 0.
===============================================================================
Output: mode of index 0.
===============================================================================
'''
o = output(data_set)
return list_mode(o)
# ======== Test case =============================
#data_set = [[0],[1],[1],[1],[1],[1]]
#print(mode(data_set) == 1)
#data_set = [[0],[1],[0],[0]]
#print(mode(data_set) == 0)
def uniq(lst):
'''
An efficient method of finding unique values in a list.
'''
s = set()
add = s.add
return [v for v in lst if not (v in s or add(v))]
def list_entropy(lst):
'''
Calculates the entropy of a list
'''
ct = len(lst)
return -sum([ p * math.log(p, 2)
for p in [float(lst.count(uniq_v)) / ct for uniq_v in uniq(lst)]])
def fast_list_entropy(lst, ct):
'''
Fasterly do it as you up there do
'''
ent = 0
for uniq_v in set(lst):
p = float(lst.count(uniq_v)) / ct
ent -= p * math.log(p, 2)
return ent
def entropy(data_set):
'''
===============================================================================
Input: A data_set
===============================================================================
Job: Calculates the entropy of the attribute at the 0th index, the value we want to predict.
===============================================================================
Output: Returns entropy. -sum(p*log(p))
===============================================================================
'''
return list_entropy(output(data_set))
# ======== Test case =============================
#data_set = [[0], [1], [1], [1], [0], [1], [1], [1]]
#print(entropy(data_set))
#print(entropy(data_set) == 0.811)
#data_set = [[0], [0], [1], [1], [0], [1], [1], [0]]
#print(entropy(data_set))
#print(entropy(data_set) == 1.0)
#data_set = [[0], [0], [0], [0], [0], [0], [0], [0]]
#print(entropy(data_set))
#print(entropy(data_set) == 0)
def information_gain(splits, H, c):
'''
Calculate information gain for an attribute in a data_set
IG = Entropy(data_set) - sum^attr_vals(count(val in data_set)/count(data_set)
* Entropy(val in data_set))
'''
return H - sum([ (float(len(x)) / c) * entropy(x) for x in splits ])
def intrinsic_value(splits, H, c):
'''
Calculate intrinsic value for an attribute in a data_set
IV = -1 * sum^attr_vals(freq(attr) * log2(freq(attr)))
'''
return -1 * sum([
a * math.log(a, 2) for a in
[ float(len(x)) / c for x in splits ]
])
def ig_iv_old(data_set, splits):
'''
Calculate information gain and intrinsic value, return as tuple.
More efficient than recalculating all the shared data.
Slightly less efficient than calculating them simultaneously, but
clearer.
'''
H = entropy(data_set)
c = len(data_set)
return (information_gain(splits, H, c),
intrinsic_value(splits, H, c))
def ig_iv_num(t, r, H, c):
'''
Calculate information gain and intrinsic value, return as tuple.
More efficient than recalculating all the shared data.
Calculates the two simultaneously, sharing everything it can.
'''
ig = H
iv = 0
t_ct = len(t)
t_pct = float(t_ct) / c
#t_ent = fast_list_entropy(output(t), t_ct)
ig -= t_pct * fast_list_entropy(output(t), t_ct)
iv -= t_pct * math.log(t_pct, 2)
r_ct = len(r)
r_pct = 1 - t_pct
#r_ent = fast_list_entropy(output(r), r_ct)
ig -= r_pct * fast_list_entropy(output(r), r_ct)
iv -= r_pct * math.log(r_pct, 2)
return (ig, iv)
def ig_iv(splits, H, c):
'''
Calculate information gain and intrinsic value, return as tuple.
More efficient than recalculating all the shared data.
Calculates the two simultaneously, sharing everything it can.
'''
ig = 0
iv = 0
for x in splits:
ct = len(x)
pct = float(ct) / c
# This is faster don't ask why it's because reasons
ent = fast_list_entropy(map(lambda y: y[0], x), ct)
ig -= pct * ent
iv -= pct * math.log(pct, 2)
ig += H
return (ig, iv)
#vals = [ (float(len(x)) / c, entropy(x)) for x in splits ]
#ig = H - sum(pct * ent for pct, ent in vals)
#iv = -sum([ pct * math.log(pct, 2) for pct, _ in vals ])
#return (ig, iv)
def gain_ratio_nominal(data_set, attribute):
'''
===============================================================================
Input: Subset of data_set, index for a nominal attribute
===============================================================================
variable we are training on.
===============================================================================
https://en.wikipedia.org/wiki/Information_gain_ratio
===============================================================================
'''
splits = split_on_nominal(data_set, attribute)
ig, iv = ig_iv(splits.itervalues(), entropy(data_set), len(data_set))
return ig / iv if iv != 0 else 0
def gain_ratio_nominal_slow(data_set, attribute):
'''
===============================================================================
Input: Subset of data_set, index for a nominal attribute
===============================================================================
variable we are training on.
===============================================================================
https://en.wikipedia.org/wiki/Information_gain_ratio
===============================================================================
'''
attr_vals = uniq(getall(data_set, attribute))
splits_by_attr = [
[ x for x in data_set if x[attribute] == av]
for av in attr_vals
]
ig, iv = ig_iv(data_set, splits_by_attr)
return ig / iv if iv != 0 else 0
# ======== Test case =============================
#data_set, attr = [[1, 2], [1, 0], [1, 0], [0, 2], [0, 2], [0, 0], [1, 3], [0, 4], [0, 3], [1, 1]], 1
#print(gain_ratio_nominal(data_set, attr))
#print(gain_ratio_nominal(data_set,attr) == 0.11470666361703151)
#data_set, attr = [[1, 2], [1, 2], [0, 4], [0, 0], [0, 1], [0, 3], [0, 0], [0, 0], [0, 4], [0, 2]], 1
#print(gain_ratio_nominal(data_set,attr))
#print(gain_ratio_nominal(data_set,attr) == 0.2056423328155741)
# data_set, attr = [[0, 3], [0, 3], [0, 3], [0, 4], [0, 4], [0, 4], [0, 0], [0, 2], [1, 4], [0, 4]], 1
# gain_ratio_nominal(data_set,attr) == 0.06409559743967516
def gain_ratio_numeric(data_set, attribute, steps=21):
'''
===============================================================================
size for normalizing the data.
===============================================================================
Job:
Calculate the gain_ratio_numeric and find the best single threshold
value. The threshold will be used to split examples into two sets those with
attribute value GREATER THAN OR EQUAL TO threshold those with attribute
value LESS THAN threshold. Use the equation here:
https://en.wikipedia.org/wiki/Information_gain_ratio
And restrict your search for possible thresholds to examples with array
index mod(step) == 0
===============================================================================
Output: This function returns the gain ratio and threshold value
===============================================================================
'''
threshs = set(data_set[i][attribute] for i in xrange(0, len(data_set), steps))
print("Considering " + str(len(threshs)) + " possible thresholds")
max_gain = -1
max_t = -1
H = entropy(data_set)
c = len(data_set)
igi = 0
startt = timeit.default_timer()
sponnum_tott = 0
ig_iv_tott = 0
for i, t in enumerate(threshs):
if not i % 100 and i > 0:
print("\tThresh " + str(i) + " " + str(t))
endt = timeit.default_timer()
duration = endt - startt
print("\t 100 @ " + str(duration) + "_s"
+ "\n\t avg it time " + str(duration * 10) + "_ms")
print("\t sponnum total time: " + str(sponnum_tott))
print("\t >> sponnum avg time " + str(sponnum_tott/100)
+ "\n\t >> which is " + str(100*sponnum_tott/duration) + "%")
startt = endt
sponnum_tott = 0
sponnum_startt = timeit.default_timer()
(th, r) = split_on_numerical(data_set, attribute, t)
sponnum_tott += timeit.default_timer() - sponnum_startt
if th and r:
if not igi % 100 and igi > 0:
print("\t ig_iv total time: " + str(ig_iv_tott))
print("\t >> ig_iv avg time " + str(ig_iv_tott/100))
print("\t >> which is " + str((ig_iv_tott / duration) * 100) + "%")
ig_iv_tott = 0
igi += 1
ig_iv_startt = timeit.default_timer()
ig, iv = ig_iv_num(th, r, H, c)
ig_iv_tott += timeit.default_timer() - ig_iv_startt
gain = ig / iv
if gain > max_gain:
max_gain = gain
max_t = t
print("selected t " + str(max_t))
if max_gain == -1: return (0, False)
return max_gain, max_t
def gain_ratio_numeric_slow(data_set, attribute, steps=1):
'''
===============================================================================
size for normalizing the data.
===============================================================================
Job:
Calculate the gain_ratio_numeric and find the best single threshold
value. The threshold will be used to split examples into two sets those with
attribute value GREATER THAN OR EQUAL TO threshold those with attribute
value LESS THAN threshold. Use the equation here:
https://en.wikipedia.org/wiki/Information_gain_ratio
And restrict your search for possible thresholds to examples with array
index mod(step) == 0
===============================================================================
Output: This function returns the gain ratio and threshold value
===============================================================================
'''
attr_vals = getall(data_set, attribute)
threshs = [ attr_vals[i] for i in range(0, len(attr_vals), steps) ]
print("Considering " + str(len(threshs)) + " possible thresholds")
splits = list()
for t in threshs:
split = split_on_numerical(data_set, attribute, t)
if split[0] and split[1]: splits.append((split, t))
if not splits: return (0, False)
return max([
((ig / iv if iv != 0 else 0), t) for (ig, iv), t in [
(ig_iv(data_set, split), t) for split, t in splits
]], key=lambda choice: choice[0])
# ======== Test case =============================
# data_set,attr,step = [[0,0.05], [1,0.17], [1,0.64], [0,0.38], [0,0.19], [1,0.68], [1,0.69], [1,0.17], [1,0.4], [0,0.53]], 1, 2
# gain_ratio_numeric(data_set,attr,step) == (0.31918053332474033, 0.64)
# data_set,attr,step = [[1, 0.35], [1, 0.24], [0, 0.67], [0, 0.36], [1, 0.94], [1, 0.4], [1, 0.15], [0, 0.1], [1, 0.61], [1, 0.17]], 1, 4
# gain_ratio_numeric(data_set,attr,step) == (0.11689800358692547, 0.94)
# data_set,attr,step = [[1, 0.1], [0, 0.29], [1, 0.03], [0, 0.47], [1, 0.25], [1, 0.12], [1, 0.67], [1, 0.73], [1, 0.85], [1, 0.25]], 1, 1
# gain_ratio_numeric(data_set,attr,step) == (0.23645279766002802, 0.29)
def split_on_nominal(data_set, attribute):
'''
===============================================================================
Input: subset of data set, the index for a nominal attribute.
===============================================================================
Job: Creates a dictionary of all values of the attribute.
================================================================================
Output: Dictionary of all values pointing to a list of all the data with that attribute
================================================================================
'''
splits = dict()
for av in sorted(uniq(getall(data_set, attribute))):
splits[av] = []
for datum in data_set:
if datum[attribute] == av:
splits[av].append(datum)
return splits
def split_on_nominal_slow(data_set, attribute):
'''
===============================================================================
Input: subset of data set, the index for a nominal attribute.
===============================================================================
Job: Creates a dictionary of all values of the attribute.
================================================================================
Output: Dictionary of all values pointing to a list of all the data with that attribute
================================================================================
'''
return { i: x for i,x in enumerate([
[ x for x in data_set if x[attribute] == av ]
for av in sorted(uniq(getall(data_set, attribute)))
])}
# [[1, 3.2], [4, 2.1], [4, 1.2], [1, 234.1], [0, 4.5]]
# ======== Test case =============================
#data_set, attr = [[0, 4], [1, 3], [1, 2], [0, 0], [0, 0], [0, 4], [1, 4], [0, 2], [1, 2], [0, 1]], 1
#print(split_on_nominal(data_set, attr))
#print(split_on_nominal(data_set, attr) == {0: [[0, 0], [0, 0]], 1: [[0, 1]], 2: [[1, 2], [0, 2], [1, 2]], 3: [[1, 3]], 4: [[0, 4], [0, 4], [1, 4]]})
#data_set, attr = [[1, 2], [1, 0], [0, 0], [1, 3], [0, 2], [0, 3], [0, 4], [0, 4], [1, 2], [0, 1]], 1
#print(split_on_nominal(data_set, attr))
#print(split_on_nominal(data_set, attr) == {0: [[1, 0], [0, 0]], 1: [[0, 1]], 2: [[1, 2], [0, 2], [1, 2]], 3: [[1, 3], [0, 3]], 4: [[0, 4], [0, 4]]})
def split_on_numerical_numpy(data_set, attribute, splitting_value):
'''
================================================================================
Input: Subset of data set, the index for a numeric attribute, threshold
(splitting) value
================================================================================
Job: Splits data_set into a tuple of two lists, the first list contains
the examples where the given attribute has value less than the splitting
value, the second list contains the other examples
================================================================================
Output: Tuple of two lists as described above
================================================================================
'''
# Make a good guess how big the splits might be, and this gets a bit faster!
t = data_set[data_set[:,attribute] < splitting_value]
r = data_set[data_set[:,attribute] >= splitting_value]
return t, r
def split_on_numerical(data_set, attribute, splitting_value):
'''
================================================================================
Input: Subset of data set, the index for a numeric attribute, threshold
(splitting) value
================================================================================
Job: Splits data_set into a tuple of two lists, the first list contains
the examples where the given attribute has value less than the splitting
value, the second list contains the other examples
================================================================================
Output: Tuple of two lists as described above
================================================================================
'''
# Make a good guess how big the splits might be, and this gets a bit faster!
size = len(data_set)
t = size * [None]
r = size * [None]
ti = 0
ri = 0
for datum in data_set:
if datum[attribute] < splitting_value:
t[ti] = datum
ti += 1
else:
r[ri] = datum
ri += 1
return filter(notNone, t), filter(notNone, r)
def split_on_numerical_slow(data_set, attribute, splitting_value):
thresh = [ datum for datum in data_set if datum[attribute] < splitting_value ]
rest = [ datum for datum in data_set if datum[attribute] >= splitting_value ]
return thresh, rest
# ======== Test case =============================
#d_set,a,sval = [[1, 0.25], [1, 0.89], [0, 0.93], [0, 0.48], [1, 0.19],
# [1, 0.49], [0, 0.6], [0, 0.6], [1, 0.34], [1, 0.19]], 1, 0.48
#print(split_on_numerical(d_set, a, sval))
#print(split_on_numerical(d_set,a,sval) == (
# [[1, 0.25], [1, 0.19], [1, 0.34], [1, 0.19]],
# [[1, 0.89], [0, 0.93], [0, 0.48], [1, 0.49], [0, 0.6], [0, 0.6]]
#))
# d_set,a,sval = [[0, 0.91], [0, 0.84], [1, 0.82], [1, 0.07], [0, 0.82],[0, 0.59], [0, 0.87], [0, 0.17], [1, 0.05], [1, 0.76]],1,0.17
# split_on_numerical(d_set,a,sval) == ([[1, 0.07], [1, 0.05]],[[0, 0.91],[0, 0.84], [1, 0.82], [0, 0.82], [0, 0.59], [0, 0.87], [0, 0.17], [1, 0.76]])
# vim: ts=4 sw=4 et tw=80 wrap nolinebreak
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment