Created
August 3, 2016 20:08
-
-
Save usholanb/575e3287f931a29a32f35040b25a4ef7 to your computer and use it in GitHub Desktop.
Python NN with numpy only (MNIST dataset)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
##################################################################################### | |
# This code trains a model that predicts which number from 0 to 9 is drawn on the | |
# MNIST pictures of 20x20 pixels. There are 60000 training examples and 10000 | |
# testing examples. | |
# The methods used: Mini-batching, Weight Decay, Momentum, Dropout, | |
# Xavier's Initialization | |
# Number of hidden layers: 3 ( Adjustable ) | |
# Sizes of mini-batches and hidden layer are easily adjustable | |
# Non-Linear functions: Sigmoid, Funny Tanh | |
# Linear: ReLU | |
# | |
# NOTE: The dataset was already unpacked, ordered and packed in the pickle file | |
# which is used in here | |
##################################################################################### | |
import numpy as np | |
import pickle | |
import math | |
import random | |
from random import gauss | |
################### Shuffles to increase the speed of learning ######################### | |
def shuffle_set(images, labels): | |
shuffled_array = np.arange(len(images)) | |
np.random.shuffle(shuffled_array) | |
images = images[[shuffled_array]] | |
labels = labels[[shuffled_array]] | |
return images, labels | |
data_images, data_labels = pickle.load(open("MNIST.p", "rb")) | |
data_images, data_labels = shuffle_set(data_images, data_labels) | |
train_images = data_images[:60000] | |
train_labels = data_labels[:60000] | |
test_images = data_images[60000:] | |
test_labels = data_labels[60000:] | |
############################### Non-Linear Functions ################################# | |
def sigmoid(elem): | |
return 1.0 / (1 + np.exp(-elem)) | |
def softmax(x): | |
nominator = np.exp(x) | |
return (nominator.T / (np.sum(nominator, axis=1) + epsilon)).T | |
###################################################################################### | |
############################### Cross-Entropy Error ################################## | |
def CEE(output, teacher): | |
return -np.sum(np.log(np.sum(np.multiply(output, teacher), axis=1))) | |
###### Xavier Initialization of arrays | |
# m - bottom layer | |
# n - top layer | |
# y - indicator that initializing weights, not bias | |
def Xavier_generator(m, n, y): | |
if y != 0: | |
return np.array([[gauss(0, math.sqrt(1.0 / (m + n))) for dummy1 in range(n)] for dummy2 in range(m)]) | |
else: | |
return np.array([gauss(0, math.sqrt(1.0 / (m + n))) for dummy1 in range(n)]) | |
###################################################################################### | |
# Initialize weights and momentum arrays using Xavier initialization | |
def initialize_weights_bias_moment(array_hidden): | |
weights_sets = len(array_hidden) + 1 | |
bias = [] | |
moment_b = [] | |
weights = [0] * weights_sets | |
moment = [0] * weights_sets | |
init_bias = 0 | |
init_weights = 1 | |
prev_layer = number_pixels | |
for i in range(len(array_hidden)): | |
bias.append(Xavier_generator(prev_layer, array_hidden[i], init_bias)) | |
moment_b.append(Xavier_generator(prev_layer, array_hidden[i], init_bias)) | |
weights[i] = Xavier_generator(prev_layer, array_hidden[i], init_weights) | |
moment[i] = Xavier_generator(prev_layer, array_hidden[i], init_weights) | |
prev_layer = array_hidden[i] | |
bias.append(Xavier_generator(prev_layer, classes, init_bias)) | |
moment_b.append(Xavier_generator(prev_layer, classes, init_bias)) | |
weights[weights_sets - 1] = Xavier_generator(prev_layer, classes, init_weights) | |
moment[weights_sets - 1] = Xavier_generator(prev_layer, classes, init_weights) | |
return np.array(weights), np.array(bias), np.array(moment), np.array(moment_b) | |
##################### initialize constants and parameters of DNN ######################### | |
number_pixels = 784 # CONSTANT | |
classes = 10 # CONSTANT | |
epsilon = 10 ** (-5) # CONSTANT | |
array_hidden = [100, 150] | |
num_layers = len(array_hidden) + 2 | |
grad_w = [0] * (num_layers - 1) | |
grad_b = [0] * (num_layers - 1) | |
hidden_layers = [] | |
activated_hidden_layers = [] | |
dropout_matrix = [] | |
delta_previous = [] | |
minibatch_size = 120 | |
epoch = 10 | |
alpha = 0.001 | |
act_func_type = 1 | |
momentum = 0.9 | |
decay = 0.1 | |
dropout = 0 | |
for i in array_hidden: | |
hidden_layers.append(np.array([0] * i)) | |
activated_hidden_layers.append(np.array([0] * i)) | |
if dropout == 1: | |
for i in array_hidden: | |
dropout_matrix.append(np.array([int(random.getrandbits(1)) for j in range(i)])) | |
w, b, moment, moment_b = initialize_weights_bias_moment(array_hidden) | |
################## Make sure that values do not blow up or shrink ################## | |
def z_score(some_matrix): | |
mean = np.mean(some_matrix) | |
std_dev = np.std(some_matrix) | |
mean_matrix = np.full(some_matrix.shape, mean) | |
some_matrix = some_matrix - mean_matrix | |
some_matrix = some_matrix / (std_dev + epsilon) | |
return some_matrix | |
################## Divide the data into mini-batches ############################### | |
def minibatch_producer(images, labels, minibatch_size): | |
global number_pixels, classes | |
images, labels = shuffle_set(images, labels) | |
remainder = len(images) % minibatch_size | |
number_of_minibatches = len(images) // minibatch_size | |
minibatch_images = np.array_split(images[:len(images) - remainder], minibatch_size) | |
minibatch_labels = np.array_split(labels[:len(labels) - remainder], minibatch_size) | |
return np.array(minibatch_images).reshape(number_of_minibatches, minibatch_size, number_pixels),\ | |
np.array(minibatch_labels).reshape(number_of_minibatches, minibatch_size, classes) | |
######################### Forward Propogation ############################# | |
def forward_prop(mb_image): | |
global activated_hidden_layers | |
global hidden_layers | |
global w | |
global b | |
prev_layer = z_score(mb_image) | |
for i in range(len(array_hidden)): | |
hidden_layers[i] = np.dot(prev_layer, w[i]) + b[i] | |
if dropout == 1: | |
hidden_layers[i] = hidden_layers[i] * dropout_matrix[i] | |
hidden_layers[i] = z_score(hidden_layers[i]) | |
activated_hidden_layers[i] = activation_function(hidden_layers[i], act_func_type) | |
prev_layer = activated_hidden_layers[i] | |
return np.add(np.dot(prev_layer, w[len(array_hidden)]), b[len(array_hidden)]) | |
################## Return the direvative of the activated layer ################ | |
def derivative_activation_function(act_hidden, func): | |
if func == 0: | |
return np.exp(-act_hidden) / (act_hidden ** 2) | |
elif func == 1: | |
return 1 - act_hidden ** 2 | |
elif func == 2: | |
act_hidden[act_hidden > 0] = 1 | |
return act_hidden | |
#################### Return the activated layers ############################## | |
def activation_function(hidden, func): | |
if func == 0: | |
for i in range(len(hidden)): | |
for j in range(len(hidden[i])): | |
hidden[i][j] = sigmoid(hidden[i][j]) | |
elif func == 1: | |
z = hidden * 0.6666667 | |
return 1.7159 * ((np.exp(z) - np.exp(-z)) / (np.exp(z) + np.exp(-z))) | |
elif func == 2: | |
return np.maximum(np.zeros(hidden.shape), hidden) | |
####################### Back Propogation ########################################### | |
def back_prop(delta_output, mb_image): | |
global alpha | |
global delta_previous | |
global activated_hidden_layers | |
global grad_w | |
global grad_b | |
delta_previous = delta_output | |
# ONE OR MANY HIDDEN LAYERS CASE | |
if len(array_hidden) > 0: | |
for i in reversed(range(1, len(w))): | |
grad_w[i] = np.dot(activated_hidden_layers[i - 1].T, delta_previous) | |
grad_b[i] = np.sum(delta_previous, axis=0) | |
delta_previous = np.multiply(np.dot(delta_previous, w[i].T), | |
derivative_activation_function(activated_hidden_layers[i - 1], act_func_type)) | |
# only the lowest gradient is left to calculate, so calculate it below | |
grad_w[0] = np.dot(mb_image.T, delta_previous) | |
grad_b[0] = np.sum(delta_previous, axis=0) | |
return np.multiply(grad_w, alpha), np.multiply(grad_b, alpha) | |
########################## Return the gradient of one mini-batch ######################### | |
def calc_grad(mb_image, mb_label): | |
output = forward_prop(mb_image) | |
probability = softmax(output) # probability is my y for 200 images, so it is a 200x10 matrix | |
delta_output = np.subtract(probability, mb_label) | |
return back_prop(delta_output, mb_image) | |
################## Make prediction of the number based on probability ################### | |
def prediction_function(probability): | |
prediction = np.argmax(probability, axis=1) | |
return prediction | |
########################################################################################## | |
def test_NN(images, labels): | |
correct_answers = 0 | |
sumCEE = 0 | |
minibatch_images, minibatch_labels = minibatch_producer(images, labels, minibatch_size) | |
for mb_image, mb_label in zip(minibatch_images, minibatch_labels): | |
output = forward_prop(mb_image) | |
probability = softmax(output) | |
sumCEE += CEE(probability, mb_label) | |
prediction = prediction_function(probability) | |
for i in range(minibatch_size): | |
if mb_label[i][prediction[i]] == 1: | |
correct_answers += 1 | |
print("Cross-Entropy = ", sumCEE) | |
accuracy = correct_answers * 100.0 / len(labels) | |
print("Prediction Accuracy ", accuracy) | |
def test(train_im, train_la, test_im, test_la): | |
print("Train set") | |
test_NN(train_im, train_la) | |
print("Test Set") | |
test_NN(test_im, test_la) | |
######################################################################################## | |
# Split the data to minibatches and calculate gradient in calc_grad | |
# Update the weights every minibatch iteration | |
# Use weight decay and momentum principles | |
######################################################################################### | |
def train_NN(): | |
global w | |
global b | |
global moment | |
global moment_b | |
global train_images | |
global train_labels | |
global alpha | |
global minibatch_size | |
global momentum | |
for ep in range(epoch): | |
if ep % 11: | |
alpha = alpha / 10 | |
minibatch_images, minibatch_labels = minibatch_producer(train_images, train_labels, minibatch_size) | |
for mb_image, mb_label in zip(minibatch_images, minibatch_labels): | |
w_prev = np.array(np.copy(w)) | |
b_prev = np.array(np.copy(b)) | |
w = w_prev + moment | |
b = b_prev + moment_b | |
grad_w, grad_b = calc_grad(mb_image, mb_label) | |
weight_decay = w * (decay * alpha) | |
weight_decay_b = b * (decay * alpha) | |
for i in reversed(range(len(w))): | |
w[i] = w[i] - grad_w[i] | |
b[i] = b[i] - grad_b[i] | |
w[i] = w[i] - weight_decay[i] | |
b[i] = b[i] - weight_decay_b[i] | |
moment = momentum * (w - w_prev) | |
moment_b = momentum * (b - b_prev) | |
# Test every 5 epochs | |
if ep % 2 == 0: | |
test(train_images, train_labels, test_images, test_labels) | |
train_images, train_labels = shuffle_set(train_images, train_labels) | |
print("Epoch # ", ep) | |
print("Done.") | |
# START | |
train_NN() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment