Created
May 23, 2014 16:03
-
-
Save jmasonherr/92202a9513768cd9ebd0 to your computer and use it in GitHub Desktop.
Naive Bayes and Fisher Classifier based on PCI book
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import math | |
""" | |
Statistical classification classes based on Toby Segarin's Programming Collective Intelligence. I wanted a version that was a little easier to read and didn't talk to SQLite. Its meant to be trained and tested on blocks of text. | |
This is good for separating things of discrete classes. Spam detection is used as the example in the book | |
The book is excellent. If you haven't read it, you should | |
""" | |
def split_on_whitespace(sentence): | |
""" Default feature generating function. Spits a sentence or document on whitespace """ | |
return sentence.split() | |
class Classifier(object): | |
def __init__(self, get_features=split_on_whitespace): | |
""" | |
Initialize with a function that will return a list of features from an object. When you call 'train', get_features is called, returns a list, and trains the classifier on those individual features. The default splits a sentence on whitespace | |
""" | |
self.get_features = get_features | |
# Counts of feature/category combinations | |
self.feature_count = {} | |
# Counts of documents in each category | |
self.category_count = {} | |
def increment_feature(self, f, cat): | |
self.feature_count.setdefault(f, {}) | |
self.feature_count[f].setdefault(cat, 1.0) | |
self.feature_count[f][cat] += 1.0 | |
def get_feature_count(self, f, cat): | |
return self.feature_count.get(f, {}).get(cat, 0.0) | |
def increment_category(self, cat): | |
self.category_count.setdefault(cat, 1.0) | |
self.category_count[cat] += 1 | |
def get_category_count(self, cat): | |
return self.category_count.get(cat, 0.0) | |
def categories(self): | |
return self.category_count.keys() | |
def total_count(self): | |
return sum(self.category_count.values()) | |
def train(self, item, cat): | |
""" Trains your classifier given a complete example. Takes a document (string) and category (string) as its arguments, gets the features and updates the model""" | |
features = self.get_features(item) | |
# Increment the count for every feature with this category | |
for f in features: | |
self.increment_feature(f,cat) | |
# Increment the count for this category | |
self.increment_category(cat) | |
def prob_f_given_c(self, f, cat): | |
""" p(word | classification) The total number of times this feature appeared in this category divided by the total number of items in this category. Takes feature and category as arguments""" | |
if self.get_category_count(cat) == 0.0: return 0.0 | |
return self.get_feature_count(f, cat) / self.get_category_count(cat) | |
def weighted_prob(self, f, cat, prob_func, weight=1.0, assumed_prob=0.5): | |
""" Allows you to start with an assumed probability that moves toward 1 or 0 over time. Useful for poorly-represented features and features that could be over-represented in one category""" | |
# Calculate current probability | |
basicprob = prob_func(f, cat) | |
# Count the number of times this feature has appeared in | |
# all categories | |
totals = sum([self.get_feature_count(f, c) for c in self.categories()]) | |
# Calculate the weighted average | |
bp = ((weight * assumed_prob) + (totals * basicprob)) / (weight + totals) | |
return bp | |
class NaiveBayes(Classifier): | |
def __init__(self, get_features=split_on_whitespace): | |
Classifier.__init__(self, get_features) | |
self.thresholds={} | |
def document_prob(self, item, cat): | |
features = self.get_features(item) | |
# Multiply the probabilities of all the features together | |
p = 1.0 | |
for f in features: | |
p *= self.weighted_prob(f, cat, self.prob_f_given_c) | |
return p | |
def prob(self, item, cat): | |
catprob = self.get_category_count(cat) / self.total_count() | |
dp = self.document_prob(item,cat) | |
return dp * catprob | |
def set_threshold(self, cat, t): | |
self.thresholds[cat] = t | |
def get_threshold(self, cat): | |
return self.thresholds[cat] if cat in self.thresholds else 1.0 | |
def classify(self, item, default=None): | |
probs = {} | |
# Find the category with the highest probability | |
max = 0.0 | |
for cat in self.categories(): | |
probs[cat] = self.prob(item, cat) | |
if probs[cat] > max: | |
max = probs[cat] | |
best = cat | |
# Make sure the probability exceeds threshold*next best | |
for cat, prob in probs.iteritems(): | |
if cat == best: | |
continue | |
if prob * self.get_threshold(best) > probs[best]: | |
return default | |
return best | |
class Fisher(Classifier): | |
def __init__(self, get_features=split_on_whitespace): | |
Classifier.__init__(self, get_features) | |
self.minimums = {} | |
def category_prob(self, f, cat): | |
""" The frequency of this feature in this category. Takes feature and category strings """ | |
clf = self.prob_f_given_c(f, cat) | |
if clf == 0.0: | |
return 0.0 | |
# The frequency of this feature in all the categories | |
freqsum = sum([self.prob_f_given_c(f,c) for c in self.categories()]) | |
# The probability is the frequency in this category divided by | |
# the overall frequency | |
p = clf / (freqsum) | |
return p | |
def fisher_prob(self, item, cat): | |
# Multiply all the probabilities together | |
p = 1.0 | |
features = self.get_features(item) | |
for f in features: | |
p *= (self.weighted_prob(f, cat, self.category_prob)) | |
# Take the natural log and multiply by -2 | |
fscore = -2 * math.log(p) | |
# Use the inverse chi2 function to get a probability | |
return self.invchi2(fscore, len(features) * 2) | |
def invchi2(self, chi, df): | |
""" Inverse Chi squared function """ | |
m = chi / 2.0 | |
sum = term = math.exp(-m) | |
for i in range(1, df//2): | |
term *= m / i | |
sum += term | |
return min(sum, 1.0) | |
def set_minimum(self, cat, min): | |
self.minimums[cat] = min | |
def get_minimum(self, cat): | |
return self.minimums.get(cat, 0.0) | |
def classify(self, item, default=None): | |
# Loop through looking for the best result | |
best = default | |
max = 0.0 | |
for c in self.categories(): | |
p = self.fisher_prob(item, c) | |
# Make sure it exceeds its minimum | |
if p > self.get_minimum(c) and p > max: | |
best = c | |
max = p | |
return best | |
if __name__ == '__main__': | |
fisherclass = Fisher() | |
fisherclass.train('zzzzz', 'good') | |
fisherclass.train('zzzzz', 'good') | |
fisherclass.train('zzzzz', 'bad') | |
fisherclass.train('pppp', 'bad') | |
fisherclass.train('pppp', 'bad') | |
fisherclass.train('pppp', 'bad') | |
fisherclass.train('pppp', 'bad') | |
fisherclass.train('pppp', 'bad') | |
fisherclass.train('pppp', 'good') | |
fisherclass.train('ooo', 'bad') | |
fisherclass.train('ooo', 'bad') | |
fisherclass.train('ooo', 'bad') | |
fisherclass.train('ooo', 'good') | |
fp1 = fisherclass.fisher_prob('zzzzz', 'good') | |
print fp1 | |
fp1 = fisherclass.fisher_prob('pppp', 'good') | |
print fp1 | |
fp1 = fisherclass.fisher_prob('ooo', 'good') | |
print fp1 | |
fp1 = fisherclass.fisher_prob('pppp ooo', 'good') | |
print fp1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment