Skip to content

Instantly share code, notes, and snippets.

@virresh
Created March 20, 2020 14:18
Show Gist options
  • Save virresh/df2bf51ccd986d30de1680ca37e22925 to your computer and use it in GitHub Desktop.
Save virresh/df2bf51ccd986d30de1680ca37e22925 to your computer and use it in GitHub Desktop.
Naive Bayes from scratch in Python

This implementation is not fully optimised, however it's meant to be readable enough to understand how naive bayes works without gaussian estimation.
It has some utility functions as well, such as Shuffler for k-fold train-test division and a Metrics class for computing ROC, DET and CMC curves along with helper functions for computing confusion matrices.
You can test it with MNIST/FMNIST dataset.

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
class Shuffler(object):
def random_kfold(self, k, X, Y):
fold_size = len(X) // k
idxs = np.arange(len(X))
np.random.shuffle(idxs)
folds = []
for i in range(k):
folds.append((X[idxs[i*fold_size: (i+1)*fold_size]], Y[idxs[i*fold_size: (i+1)*fold_size]]))
return folds
def stratified_kfold(self, k, X, Y):
labels, counts = np.unique(Y, return_counts=True)
folds = []
idxs = [ list(*(np.where(Y == lbl))) for lbl in labels ]
# print(idxs)
for i in range(k):
tidx = []
for type_label in range(len(idxs)):
fsz = counts[type_label] // k
# print(tidx)
tidx.extend(idxs[type_label][i*fsz: (i+1)*fsz])
folds.append((X[tidx] , Y[tidx]))
return folds
class Metrics(object):
def detailed_confusion(self, valid_x, valid_y, model, label_dict_rev):
# FPR vs TPR values needed
thresholds = np.linspace(0, 1, num=100)
values = {}
for lbl in model.translator:
values[label_dict_rev[lbl]] = {
'tpos' : np.zeros(thresholds.shape),
'tneg' : np.zeros(thresholds.shape),
'fpos' : np.zeros(thresholds.shape),
'fneg' : np.zeros(thresholds.shape),
}
for i, thresh in enumerate(thresholds):
vals = model.predict(valid_x, thresh=thresh)
tru_pos = tru_neg = fal_pos = fal_neg = 0
for truth, pred in zip(valid_y, vals):
for lbl in model.translator:
if pred[model.translator[lbl]] == 1:
# predicted postive for lbl
if truth == lbl:
values[label_dict_rev[lbl]]['tpos'][i] += 1
else:
values[label_dict_rev[lbl]]['fpos'][i] += 1
else:
if truth == lbl:
values[label_dict_rev[lbl]]['fneg'][i] += 1
else:
values[label_dict_rev[lbl]]['tneg'][i] += 1
return values
def roc(self, valid_x, valid_y, model, label_dict_rev, ax, det_conf=None, get_EER=False):
if det_conf:
vals = det_conf
else:
vals = self.detailed_confusion(valid_x, valid_y, model, label_dict_rev)
EER = {
x : {} for x in vals
}
thresholds = np.linspace(0, 1, num=100)
# print(vals)
for lbl, val in vals.items():
roc_x = val['fpos'] / (val['tneg'] + val['fpos']) # FPR
roc_y = val['tpos'] / (val['tpos'] + val['fneg']) # TPR
if get_EER == True:
fpr = roc_x
fnr = 1 - roc_y
k = np.nanargmin(np.absolute((fnr - fpr)))
EER[lbl]['FPR'] = roc_x[k]
EER[lbl]['TPR'] = roc_y[k]
EER[lbl]['thresh'] = thresholds[k]
EER[lbl]['k'] = k
ax.plot(roc_x, roc_y, label=lbl)
if get_EER == True:
ax.plot(EER[lbl]['FPR'], EER[lbl]['TPR'], 'ro', label='{} EER'.format(lbl))
ax.set_title('ROC curve')
ax.set(xlabel='False Positive Rate (FPR = FP/(FP+TN))', ylabel='True Positive Rate (TPR = TP/(TP+FN))')
ax.legend()
if get_EER == True:
return EER
return ax
def det(self, valid_x, valid_y, model, label_dict_rev, ax, det_conf=None):
if det_conf:
vals = det_conf
else:
vals = self.detailed_confusion(valid_x, valid_y, model, label_dict_rev)
for lbl, val in vals.items():
roc_x = val['fpos'] / (val['tneg'] + val['fpos']) # FPR
roc_y = val['fneg'] / (val['tpos'] + val['fneg']) # FNR
ax.plot(roc_x, roc_y, label=lbl)
ax.set_title('DET curve')
ax.set(xlabel='False Positive Rate (FPR = FP/(FP+TN))', ylabel='False Negative Rate (FNR = FN/(TP+FN))')
ax.legend()
return ax
def cmc(self, valid_x, valid_y, model, ax):
preds = model.predict(valid_x, ret_order=True)
vals = np.zeros(np.unique(valid_y).shape[0])
for truth, orders in zip(valid_y, preds):
corr_rank = vals.shape[0]-1
for i in range(len(orders)-1, -1, -1):
vals[corr_rank] += 1
if orders[i][1] == truth:
break
corr_rank -= 1
vals /= valid_y.shape[0]
ax.plot(np.arange(1, vals.shape[0]+1, 1), vals, label='CMC')
ax.legend()
return ax
def precision_recall_acc(self, preds, truth, label_dict_rev):
values = pd.DataFrame()
unique_labels = np.unique(truth)
for lbl in unique_labels:
mat = self.confusion_matrix(preds, truth, focus_class=lbl)
total = mat.values.sum()
accuracy = (mat['actual_positive']['predicted_positive'] + mat['actual_negative']['predicted_negative']) / total
precision = (mat['actual_positive']['predicted_positive'])/ (mat['actual_positive']['predicted_positive'] + mat['actual_negative']['predicted_positive'])
recall = (mat['actual_positive']['predicted_positive']) / (mat['actual_positive']['predicted_positive'] + mat['actual_positive']['predicted_negative'])
f_score = 2 * precision * recall / (precision + recall)
values[label_dict_rev[lbl]] = [accuracy, precision, recall, f_score]
values.index = ['accuracy', 'precision', 'recall', 'f_score']
return values
def confusion_matrix(self, preds, truth, focus_class=None, label_dict_rev=None):
tru_pos = tru_neg = fal_pos = fal_neg = 0
if focus_class != None:
for y_dash, y in zip(preds, truth):
if y_dash == focus_class:
# predicted positives
if y == y_dash:
# actually positive, classified as positive
tru_pos += 1
else:
# actually negative, classified as positive
fal_pos += 1
else:
# predicted negatives
if y == focus_class:
# actually positive, classified as negative
fal_neg += 1
else:
# actually negative, classified as negative
tru_neg += 1
df = pd.DataFrame()
df['actual_positive'] = [tru_pos, fal_neg]
df['actual_negative'] = [fal_pos, tru_neg]
df.index = ['predicted_positive', 'predicted_negative']
else:
total_classes = np.unique(list(preds)+list(truth))
labels = [label_dict_rev[x] for x in total_classes]
conf_matrix = np.zeros((len(total_classes), len(total_classes)))
for y_dash, y in zip(preds, truth):
conf_matrix[y_dash][y] += 1
df = pd.DataFrame(conf_matrix, index=labels, columns=labels)
return df
class NaiveBayes(object):
class_priors = {}
likelihood_feature = {}
translator = {} # label value to label index
def train(self, features, labels):
label, counts = np.unique(labels, return_counts=True)
class_counts = {}
self.class_priors = {}
self.likelihood_feature = {}
self.translator = {}
# calculate prior probabilities
i = 0
for x, y in zip(label, counts):
self.translator[x] = i
i+=1
self.class_priors[x] = y/labels.shape[0]
class_counts[x] = y
self.likelihood_feature[x] = np.ones(features.shape[1])
# Smoothing assumption - Seen this class twice
# once with all zeros and once with all ones
# calculate likelihood
for x, y in zip(features, labels):
self.likelihood_feature[y] += x
for lbl in self.likelihood_feature:
self.likelihood_feature[lbl] = (self.likelihood_feature[lbl])/(class_counts[lbl]+2)
def predict(self, features, thresh=None, ret_order=False):
out = []
if thresh != None:
full_recs = np.zeros((features.shape[0], len(self.class_priors.keys())))
for i_rec, x in enumerate(features):
# ans = None
# maxll = None
records = []
for i, w_i in enumerate(self.likelihood_feature):
log_ll = np.log(self.class_priors[w_i])
positive_pr = x * np.log(self.likelihood_feature[w_i])
negative_pr = (1-x) * np.log(1-self.likelihood_feature[w_i])
log_ll += np.sum(positive_pr + negative_pr)
records.append((log_ll, w_i))
# print(w_i, log_ll)
if thresh != None:
full_recs[i_rec][self.translator[w_i]] = log_ll
# if (i == 0) or (log_ll > maxll):
# maxll = log_ll
# ans = w_i
records.sort(reverse=True)
if ret_order:
out.append(records)
else:
out.append(records[0][1])
if thresh != None:
# print(full_recs)
full_recs = full_recs + abs(np.min(full_recs, axis = 1).reshape(-1,1)) + 1
full_recs = np.divide(full_recs, np.sum(full_recs, axis=1).reshape(-1, 1))
full_recs[full_recs >= thresh] = 1
full_recs[full_recs < thresh] = 0
return full_recs
return out
if __name__ == '__main__':
trainX = np.array(
[[0,0,0],
[0,0,1],
[1,0,1],
[1,0,0],
[1,0,0],
[1,1,0],
[1,0,1],
[0,1,1]])
trainY = np.array([1,1,9,0,0,8,9,5])
testX = np.array([[0, 1, 1], [1, 0, 1]])
testY = np.array([5, 9])
obj = NaiveBayes()
obj.train(trainX, trainY)
# print(obj.class_priors)
# print(obj.likelihood_feature)
print(obj.predict(np.array([[0, 1, 1]])))
print(obj.predict(testX, thresh=0.255), obj.translator)
# m = Metrics()
# print(m.roc(np.array([[0, 1, 1], [1, 0, 1]]),
# np.array([9,9]),
# obj,
# {0:0, 1:1, 5:2, 8:3, 9:4}
# )
# )
# plt.show()
s = Shuffler()
folds = s.stratified_kfold(2, trainX, trainY)
for fold in folds:
for dpoint in fold:
print(dpoint)
print('\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment