This implementation is not fully optimised, however it's meant to be readable enough to understand how naive bayes works without gaussian estimation.
It has some utility functions as well, such as Shuffler for k-fold train-test division and a Metrics class for computing ROC, DET and CMC curves along with helper functions for computing confusion matrices.
You can test it with MNIST/FMNIST dataset.
Created
March 20, 2020 14:18
-
-
Save virresh/df2bf51ccd986d30de1680ca37e22925 to your computer and use it in GitHub Desktop.
Naive Bayes from scratch in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
class Shuffler(object): | |
def random_kfold(self, k, X, Y): | |
fold_size = len(X) // k | |
idxs = np.arange(len(X)) | |
np.random.shuffle(idxs) | |
folds = [] | |
for i in range(k): | |
folds.append((X[idxs[i*fold_size: (i+1)*fold_size]], Y[idxs[i*fold_size: (i+1)*fold_size]])) | |
return folds | |
def stratified_kfold(self, k, X, Y): | |
labels, counts = np.unique(Y, return_counts=True) | |
folds = [] | |
idxs = [ list(*(np.where(Y == lbl))) for lbl in labels ] | |
# print(idxs) | |
for i in range(k): | |
tidx = [] | |
for type_label in range(len(idxs)): | |
fsz = counts[type_label] // k | |
# print(tidx) | |
tidx.extend(idxs[type_label][i*fsz: (i+1)*fsz]) | |
folds.append((X[tidx] , Y[tidx])) | |
return folds | |
class Metrics(object): | |
def detailed_confusion(self, valid_x, valid_y, model, label_dict_rev): | |
# FPR vs TPR values needed | |
thresholds = np.linspace(0, 1, num=100) | |
values = {} | |
for lbl in model.translator: | |
values[label_dict_rev[lbl]] = { | |
'tpos' : np.zeros(thresholds.shape), | |
'tneg' : np.zeros(thresholds.shape), | |
'fpos' : np.zeros(thresholds.shape), | |
'fneg' : np.zeros(thresholds.shape), | |
} | |
for i, thresh in enumerate(thresholds): | |
vals = model.predict(valid_x, thresh=thresh) | |
tru_pos = tru_neg = fal_pos = fal_neg = 0 | |
for truth, pred in zip(valid_y, vals): | |
for lbl in model.translator: | |
if pred[model.translator[lbl]] == 1: | |
# predicted postive for lbl | |
if truth == lbl: | |
values[label_dict_rev[lbl]]['tpos'][i] += 1 | |
else: | |
values[label_dict_rev[lbl]]['fpos'][i] += 1 | |
else: | |
if truth == lbl: | |
values[label_dict_rev[lbl]]['fneg'][i] += 1 | |
else: | |
values[label_dict_rev[lbl]]['tneg'][i] += 1 | |
return values | |
def roc(self, valid_x, valid_y, model, label_dict_rev, ax, det_conf=None, get_EER=False): | |
if det_conf: | |
vals = det_conf | |
else: | |
vals = self.detailed_confusion(valid_x, valid_y, model, label_dict_rev) | |
EER = { | |
x : {} for x in vals | |
} | |
thresholds = np.linspace(0, 1, num=100) | |
# print(vals) | |
for lbl, val in vals.items(): | |
roc_x = val['fpos'] / (val['tneg'] + val['fpos']) # FPR | |
roc_y = val['tpos'] / (val['tpos'] + val['fneg']) # TPR | |
if get_EER == True: | |
fpr = roc_x | |
fnr = 1 - roc_y | |
k = np.nanargmin(np.absolute((fnr - fpr))) | |
EER[lbl]['FPR'] = roc_x[k] | |
EER[lbl]['TPR'] = roc_y[k] | |
EER[lbl]['thresh'] = thresholds[k] | |
EER[lbl]['k'] = k | |
ax.plot(roc_x, roc_y, label=lbl) | |
if get_EER == True: | |
ax.plot(EER[lbl]['FPR'], EER[lbl]['TPR'], 'ro', label='{} EER'.format(lbl)) | |
ax.set_title('ROC curve') | |
ax.set(xlabel='False Positive Rate (FPR = FP/(FP+TN))', ylabel='True Positive Rate (TPR = TP/(TP+FN))') | |
ax.legend() | |
if get_EER == True: | |
return EER | |
return ax | |
def det(self, valid_x, valid_y, model, label_dict_rev, ax, det_conf=None): | |
if det_conf: | |
vals = det_conf | |
else: | |
vals = self.detailed_confusion(valid_x, valid_y, model, label_dict_rev) | |
for lbl, val in vals.items(): | |
roc_x = val['fpos'] / (val['tneg'] + val['fpos']) # FPR | |
roc_y = val['fneg'] / (val['tpos'] + val['fneg']) # FNR | |
ax.plot(roc_x, roc_y, label=lbl) | |
ax.set_title('DET curve') | |
ax.set(xlabel='False Positive Rate (FPR = FP/(FP+TN))', ylabel='False Negative Rate (FNR = FN/(TP+FN))') | |
ax.legend() | |
return ax | |
def cmc(self, valid_x, valid_y, model, ax): | |
preds = model.predict(valid_x, ret_order=True) | |
vals = np.zeros(np.unique(valid_y).shape[0]) | |
for truth, orders in zip(valid_y, preds): | |
corr_rank = vals.shape[0]-1 | |
for i in range(len(orders)-1, -1, -1): | |
vals[corr_rank] += 1 | |
if orders[i][1] == truth: | |
break | |
corr_rank -= 1 | |
vals /= valid_y.shape[0] | |
ax.plot(np.arange(1, vals.shape[0]+1, 1), vals, label='CMC') | |
ax.legend() | |
return ax | |
def precision_recall_acc(self, preds, truth, label_dict_rev): | |
values = pd.DataFrame() | |
unique_labels = np.unique(truth) | |
for lbl in unique_labels: | |
mat = self.confusion_matrix(preds, truth, focus_class=lbl) | |
total = mat.values.sum() | |
accuracy = (mat['actual_positive']['predicted_positive'] + mat['actual_negative']['predicted_negative']) / total | |
precision = (mat['actual_positive']['predicted_positive'])/ (mat['actual_positive']['predicted_positive'] + mat['actual_negative']['predicted_positive']) | |
recall = (mat['actual_positive']['predicted_positive']) / (mat['actual_positive']['predicted_positive'] + mat['actual_positive']['predicted_negative']) | |
f_score = 2 * precision * recall / (precision + recall) | |
values[label_dict_rev[lbl]] = [accuracy, precision, recall, f_score] | |
values.index = ['accuracy', 'precision', 'recall', 'f_score'] | |
return values | |
def confusion_matrix(self, preds, truth, focus_class=None, label_dict_rev=None): | |
tru_pos = tru_neg = fal_pos = fal_neg = 0 | |
if focus_class != None: | |
for y_dash, y in zip(preds, truth): | |
if y_dash == focus_class: | |
# predicted positives | |
if y == y_dash: | |
# actually positive, classified as positive | |
tru_pos += 1 | |
else: | |
# actually negative, classified as positive | |
fal_pos += 1 | |
else: | |
# predicted negatives | |
if y == focus_class: | |
# actually positive, classified as negative | |
fal_neg += 1 | |
else: | |
# actually negative, classified as negative | |
tru_neg += 1 | |
df = pd.DataFrame() | |
df['actual_positive'] = [tru_pos, fal_neg] | |
df['actual_negative'] = [fal_pos, tru_neg] | |
df.index = ['predicted_positive', 'predicted_negative'] | |
else: | |
total_classes = np.unique(list(preds)+list(truth)) | |
labels = [label_dict_rev[x] for x in total_classes] | |
conf_matrix = np.zeros((len(total_classes), len(total_classes))) | |
for y_dash, y in zip(preds, truth): | |
conf_matrix[y_dash][y] += 1 | |
df = pd.DataFrame(conf_matrix, index=labels, columns=labels) | |
return df | |
class NaiveBayes(object): | |
class_priors = {} | |
likelihood_feature = {} | |
translator = {} # label value to label index | |
def train(self, features, labels): | |
label, counts = np.unique(labels, return_counts=True) | |
class_counts = {} | |
self.class_priors = {} | |
self.likelihood_feature = {} | |
self.translator = {} | |
# calculate prior probabilities | |
i = 0 | |
for x, y in zip(label, counts): | |
self.translator[x] = i | |
i+=1 | |
self.class_priors[x] = y/labels.shape[0] | |
class_counts[x] = y | |
self.likelihood_feature[x] = np.ones(features.shape[1]) | |
# Smoothing assumption - Seen this class twice | |
# once with all zeros and once with all ones | |
# calculate likelihood | |
for x, y in zip(features, labels): | |
self.likelihood_feature[y] += x | |
for lbl in self.likelihood_feature: | |
self.likelihood_feature[lbl] = (self.likelihood_feature[lbl])/(class_counts[lbl]+2) | |
def predict(self, features, thresh=None, ret_order=False): | |
out = [] | |
if thresh != None: | |
full_recs = np.zeros((features.shape[0], len(self.class_priors.keys()))) | |
for i_rec, x in enumerate(features): | |
# ans = None | |
# maxll = None | |
records = [] | |
for i, w_i in enumerate(self.likelihood_feature): | |
log_ll = np.log(self.class_priors[w_i]) | |
positive_pr = x * np.log(self.likelihood_feature[w_i]) | |
negative_pr = (1-x) * np.log(1-self.likelihood_feature[w_i]) | |
log_ll += np.sum(positive_pr + negative_pr) | |
records.append((log_ll, w_i)) | |
# print(w_i, log_ll) | |
if thresh != None: | |
full_recs[i_rec][self.translator[w_i]] = log_ll | |
# if (i == 0) or (log_ll > maxll): | |
# maxll = log_ll | |
# ans = w_i | |
records.sort(reverse=True) | |
if ret_order: | |
out.append(records) | |
else: | |
out.append(records[0][1]) | |
if thresh != None: | |
# print(full_recs) | |
full_recs = full_recs + abs(np.min(full_recs, axis = 1).reshape(-1,1)) + 1 | |
full_recs = np.divide(full_recs, np.sum(full_recs, axis=1).reshape(-1, 1)) | |
full_recs[full_recs >= thresh] = 1 | |
full_recs[full_recs < thresh] = 0 | |
return full_recs | |
return out | |
if __name__ == '__main__': | |
trainX = np.array( | |
[[0,0,0], | |
[0,0,1], | |
[1,0,1], | |
[1,0,0], | |
[1,0,0], | |
[1,1,0], | |
[1,0,1], | |
[0,1,1]]) | |
trainY = np.array([1,1,9,0,0,8,9,5]) | |
testX = np.array([[0, 1, 1], [1, 0, 1]]) | |
testY = np.array([5, 9]) | |
obj = NaiveBayes() | |
obj.train(trainX, trainY) | |
# print(obj.class_priors) | |
# print(obj.likelihood_feature) | |
print(obj.predict(np.array([[0, 1, 1]]))) | |
print(obj.predict(testX, thresh=0.255), obj.translator) | |
# m = Metrics() | |
# print(m.roc(np.array([[0, 1, 1], [1, 0, 1]]), | |
# np.array([9,9]), | |
# obj, | |
# {0:0, 1:1, 5:2, 8:3, 9:4} | |
# ) | |
# ) | |
# plt.show() | |
s = Shuffler() | |
folds = s.stratified_kfold(2, trainX, trainY) | |
for fold in folds: | |
for dpoint in fold: | |
print(dpoint) | |
print('\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment