Last active
January 29, 2018 16:49
-
-
Save egeromin/b70e6d8356878b35d18a631272677f1c to your computer and use it in GitHub Desktop.
Train a fully connected NN to classify MNIST digits. Implemented using numpy only.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Class to define a fully connected neural net. Written using | |
numpy only. | |
The number and size of hidden layers is defined by providing | |
a list to the constructor, for example | |
``` | |
net = FullyConnectedNet([1024, 450, 10]) | |
``` | |
In this case, `net` represents a 3-layer neural net, with an input | |
unit of size 1024, a hidden unit of size 450, and an output | |
unit of size 10. | |
The following defaults are hardcoded (for now): | |
- The hidden layers have RELU activation | |
- The output layer has SOFTMAX activation | |
- The cost function is the cross-entropy cost | |
- Weights use He initialisation | |
- The update optimizer rule is ADAM (but you can choose the parameters) | |
- Dropout regularization is used (but you can choose the parameter) | |
Also implements *gradient checking* to check that backprop is | |
working correctly. | |
""" | |
import os | |
import numpy as np | |
import matplotlib.pyplot as plt | |
class FullyConnectedNet: | |
def __init__(self, layer_spec, dropout_prob=0.25, learning_rate=0.003, | |
beta1=0.9, beta2=0.999, epsilon=1e-8): | |
"""Initialise learning hyperparameters""" | |
self.layer_spec = layer_spec | |
self.dropout_prob = dropout_prob | |
self.learning_rate = learning_rate | |
self.beta1 = beta1 | |
self.beta2 = beta2 | |
self.epsilon = epsilon | |
def train_init(self): | |
"""Initialise all the parameters I require for training""" | |
self.bias_init() | |
self.weight_init_he() | |
self.adam_init() | |
self.A, self.Z, self.D, self.costs = [], [], [], [] | |
self.dW, self.db = [], [] | |
def bias_init(self): | |
"""Initialise biases to 0""" | |
self.b = [np.zeros((sz_layer, 1)) for sz_layer in self.layer_spec[1:]] | |
def weight_init_he(self): | |
"""Initialise weights using He initialisation""" | |
np.random.seed(8) | |
self.W = [np.random.randn(self.layer_spec[i], self.layer_spec[i - 1]) * np.sqrt(2 / self.layer_spec[i - 1]) | |
for i in range(1, len(self.layer_spec))] | |
def weight_init_random(self): | |
"""Random initialisation of weights, used for debugging""" | |
np.random.seed(3) | |
self.W = [np.random.randn(self.layer_spec[i], self.layer_spec[i - 1]) * 0.01 | |
for i in range(1, len(self.layer_spec))] | |
def weight_init_zeros(self): | |
"""Zeros initialisation of weights, used for debugging""" | |
self.W = [np.zeros((self.layer_spec[i], self.layer_spec[i - 1])) | |
for i in range(1, len(self.layer_spec))] | |
# only used for debugging! | |
def adam_init(self): | |
"""Initialise adam moving averages""" | |
self.t = 1 | |
self.vdW = [np.zeros(Wi.shape) for Wi in self.W] | |
self.vdb = [np.zeros(bi.shape) for bi in self.b] | |
self.sdW = [np.zeros(Wi.shape) for Wi in self.W] | |
self.sdb = [np.zeros(bi.shape) for bi in self.b] | |
def relu(self, x): | |
return np.maximum(x, 0) | |
def softmax(self, z): | |
y = np.exp(z) | |
y_soft = y / y.sum(axis=0) | |
return y_soft | |
def cross_entropy_cost(self, y_soft, labels): | |
assert (y_soft.shape == labels.shape) # labels need to be 1-hot encoded | |
m = labels.shape[1] # number of samples | |
cost = -np.multiply(np.log(y_soft), labels).sum() / m | |
return cost | |
def relu_grad(self, x): | |
return x > 0 | |
def cross_entropy_with_softmax_grad(self, y_soft, labels): | |
"""Backprops directly to Z's""" | |
grad = y_soft - labels | |
return grad | |
def forward(self, batch, labels=None, dropout=False): | |
""" | |
Do a full forward pass over all layers, for a batch of inputs. | |
Computes and stores the cost in `self.costs` | |
:param batch: The batch, dimensions: dim_input x num_samples | |
:param labels: The labels, 1-hot encoded | |
:param dropout: `True` in order to use dropout | |
""" | |
def single_forward(prev_layer, weights, bias, activation_func, with_dropout=False): | |
"""Given activations of one layer, do a forward pass and | |
compute the activations of the next""" | |
Z = np.dot(weights, prev_layer) + bias | |
A = activation_func(Z) | |
if with_dropout: | |
D = np.random.rand(*A.shape) > self.dropout_prob | |
A = np.multiply(A, D) # apply dropout mask | |
self.Z.append(Z) | |
self.A.append(A) | |
if with_dropout: | |
self.D.append(D) | |
self.A.append(batch) # populate initial 'activations' | |
for Wi, bi in zip(self.W[:-1], self.b[:-1]): | |
Aprev = self.A[-1] | |
single_forward(Aprev, Wi, bi, self.relu, with_dropout=dropout) | |
# compute RELU activation for all units | |
single_forward(self.A[-1], self.W[-1], self.b[-1], self.softmax) | |
# compute sigmoid activation for final layer | |
cost = None | |
if labels is not None: | |
cost = self.cross_entropy_cost(self.A[-1], labels) | |
self.costs.append(cost) # keep track of costs, for debugging and checking | |
return self.A[-1], cost # return final activations and cost | |
def backward(self, labels, dropout=False): | |
""" | |
Do a full backward pass over all layers. | |
:param labels: The labels used for forward propagation, one-hot encoded. | |
""" | |
self.dW, self.db = [], [] # reset weights | |
m = labels.shape[1] # number of samples in batch | |
def single_backward(dA, weights, with_dropout=False, labels=None): | |
"""Do a backward pass for a single layer. | |
The case `labels is not None` is used for last layer, instead of dA. | |
Otherwise, relu_grad is used""" | |
if with_dropout and dA is not None: | |
D = self.D.pop() | |
dA = np.multiply(dA, D) # apply previously created dropout mask to gradient. | |
if labels is not None: | |
dZ = self.cross_entropy_with_softmax_grad(self.A.pop(), labels) # gradient for last layer | |
self.Z.pop() # final 'Z' not required, throw it away | |
else: | |
dZ = np.multiply(self.relu_grad(self.Z.pop()), dA) | |
dW = np.dot(dZ, self.A.pop().T) / m | |
db = np.sum(dZ, axis=1, keepdims=True) / m | |
self.dW.insert(0, dW) | |
self.db.insert(0, db) | |
return np.dot(weights.T, dZ) # return updated dA | |
dA_current = single_backward(None, self.W[-1], labels=labels) | |
# do first backward pass in case 'softmax' | |
for Wi in reversed(self.W[:-1]): | |
dA_current = single_backward(dA_current, Wi, with_dropout=dropout) | |
# do all 'RELU' backward passes | |
def check_gradient(self, batch, labels, eps=1e-4, check_thresh=1e-6): | |
"""Method implementing gradient checking: | |
for each example in the batch, compute dW by backprop | |
and manually, by computing the partial derivative using | |
partial approx= (cost(x+eps) - cost(x-eps)) / 2*eps. | |
Return the maximum difference of gradients""" | |
self.forward(batch, labels) | |
self.backward(labels) # computes dW | |
max_diff = 0.0 | |
for Wi, dWi in zip(self.W, self.dW): | |
dWi_manual = np.zeros(Wi.shape) | |
for iter_W in range(len(dWi_manual.flat)): | |
Wi.flat[iter_W] += eps | |
_, cost_plus = self.forward(batch, labels) | |
Wi.flat[iter_W] -= 2*eps | |
_, cost_minus = self.forward(batch, labels) | |
Wi.flat[iter_W] += eps | |
dWi_manual.flat[iter_W] = (cost_plus - cost_minus) / (2*eps) | |
max_diff = max(max_diff, abs(dWi_manual - dWi).max()) | |
if max_diff > check_thresh: | |
raise RuntimeError("Net not passing gradient checking, max_diff = {:.4f}. " | |
"Something in the code is probably wrong!".format(max_diff)) | |
def gradient_update_simple(self): | |
"""Simple gradient update, for debugging""" | |
self.W = [Wi - self.learning_rate * dWi | |
for Wi, dWi in zip(self.W, self.dW)] | |
self.b = [bi - self.learning_rate * dbi | |
for bi, dbi in zip(self.b, self.db)] | |
def gradient_update_adam(self): | |
"""Use stored gradients dW to perform an ADAM update of | |
the weights W""" | |
def v_compute_running_average(vd, grads): | |
return [self.beta1 * vdi + (1 - self.beta1) * gradi | |
for vdi, gradi in zip(vd, grads)] | |
self.vdW = v_compute_running_average(self.vdW, self.dW) | |
self.vdb = v_compute_running_average(self.vdb, self.db) | |
def v_compute_correction(vd): | |
return [vdi / (1 - np.power(self.beta1, self.t)) | |
for vdi in vd] | |
vdW_corrected = v_compute_correction(self.vdW) | |
vdb_corrected = v_compute_correction(self.vdb) | |
def s_compute_running_average(sd, grads): | |
return [self.beta2 * sdi + (1 - self.beta2) * np.power(gradi, 2) | |
for sdi, gradi in zip(sd, grads)] | |
self.sdW = s_compute_running_average(self.sdW, self.dW) | |
self.sdb = s_compute_running_average(self.sdb, self.db) | |
def s_compute_correction(sd): | |
return [sdi / (1 - np.power(self.beta2, self.t)) | |
for sdi in sd] | |
sdW_corrected = s_compute_correction(self.sdW) | |
sdb_corrected = s_compute_correction(self.sdb) | |
def update_weights(weights, v_corrected, s_corrected): | |
return [Wi - self.learning_rate * v_corr / (np.sqrt(s_corr) + self.epsilon) | |
for Wi, v_corr, s_corr in zip(weights, v_corrected, s_corrected)] | |
self.W = update_weights(self.W, vdW_corrected, sdW_corrected) | |
self.b = update_weights(self.b, vdb_corrected, sdb_corrected) | |
self.t = self.t + 1 | |
def check_inputs(self, inputs): | |
"""Check whether inputs have correct format""" | |
if inputs.shape[0] != self.layer_spec[0]: | |
raise RuntimeError("Inputs have incorrect shape!" | |
"Expected: {}" | |
"Received: {}".format(self.layer_spec[0], | |
inputs.shape[0])) | |
def to_one_hot(self, labels): | |
"""Compute one-hot encoding of labels""" | |
one_hot_labels = np.zeros((labels.max()+1, labels.shape[0])) | |
for i, label in enumerate(labels): | |
one_hot_labels[label, i] = 1 | |
return one_hot_labels | |
def check_labels(self, labels): | |
if isinstance(labels, list): | |
labels = np.array(labels) | |
if labels.shape[0] == self.layer_spec[-1]: | |
return labels # assume already one-hot | |
labels = labels.flatten() # otherwise, assume given class numbers 0 to num_classes - 1 | |
if labels.max() == self.layer_spec[-1] - 1: | |
return self.to_one_hot(labels) | |
raise RuntimeError("Training set labels in the wrong format!") | |
def train(self, train_set, labels, mini_batch_size=100, num_epochs=10, shuffle=False): | |
""" | |
Run full training loop | |
:param train_set: The training set, 1 column per input | |
:param labels: The training labels, either with class numbers from | |
0 to max_num_classes-1, or one-hot encoded | |
:param mini_batch_size: Mini batch size | |
:param num_epochs: Number of epochs to run | |
:param shuffle: Shuffle training set at each epoch? | |
""" | |
self.check_inputs(train_set) | |
labels = self.check_labels(labels) # computes one-hot, if required | |
self.train_init() # initialise arrays needed for training | |
np.random.seed(38) | |
def split_into_batches(train_s): | |
"""Split the training set into batches using | |
`np.split`. np.split requires the split to result | |
in equal division, so we have to extract the last batch | |
manually""" | |
m = train_s.shape[1] # number of samples | |
last_batch_start = - (m % mini_batch_size) | |
num_full_batches = (m + last_batch_start) / mini_batch_size | |
if last_batch_start == 0: | |
last_batch_start = m | |
last_batch = train_s[:, last_batch_start:] # could be empty, if m % mini_batch_size == 0 | |
batches = np.split(train_s[:, :last_batch_start], num_full_batches, axis=1) | |
if last_batch.shape[1] > 0: # ignore if empty | |
batches.append(last_batch) | |
return batches | |
count = 0 | |
for epoch in range(num_epochs): | |
if shuffle: | |
rand_permutation = np.random.permutation(train_set.shape[1]) | |
train_set = train_set[:, rand_permutation] | |
labels = labels[:, rand_permutation] # shuffle differently for each epoch. | |
train_batches = split_into_batches(train_set) | |
labels_batches = split_into_batches(labels) | |
for mini_batch, mini_batch_labels in zip(train_batches, labels_batches): | |
self.forward(mini_batch, mini_batch_labels, dropout=True) | |
self.backward(mini_batch_labels, dropout=True) | |
self.gradient_update_adam() | |
assert(not self.A and not self.Z and not self.D) | |
count += 1 | |
if count % 500 == 0: | |
print("Current cost: {}".format(self.costs[-1])) | |
def predict(self, inputs): | |
"""Predict by returning """ | |
self.check_inputs(inputs) | |
activations, _ = self.forward(inputs) | |
return np.argmax(activations, axis=0) | |
def test(self, test_set, labels): | |
"""Compute accuracy given a test set""" | |
self.check_inputs(test_set) | |
self.check_labels(labels) | |
if labels.shape[0] == test_set.shape[0]: # then labels are already one-hot encoded, so convert to numeric | |
labels = np.argmax(labels) | |
predictions = self.predict(test_set) | |
accuracy = (labels == predictions).mean() | |
return accuracy | |
def download_mnist(mnist_dir): | |
""" | |
Download and gunzip the following files to `mnist_dir`: | |
http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz | |
http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz | |
http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz | |
http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz | |
""" | |
import requests | |
import gzip | |
print("Downloading MNIST data") | |
training_files = ('train-images-idx3-ubyte', 'train-labels-idx1-ubyte', | |
't10k-images-idx3-ubyte', 't10k-labels-idx1-ubyte') | |
url_home = 'http://yann.lecun.com/exdb/mnist/' | |
for training_file in training_files: | |
gzipped_file = training_file + ".gz" | |
print("Downloading and unzipping {}".format(gzipped_file)) | |
url = url_home + gzipped_file | |
resp = requests.get(url) # download | |
path_gzipped = os.path.join(mnist_dir, gzipped_file) | |
with open(path_gzipped, "wb") as fh: | |
fh.write(resp.content) # save to disk | |
path_file = os.path.join(mnist_dir, training_file) | |
with gzip.open(path_gzipped, 'rb') as fh_gzipped: | |
with open(path_file, 'wb') as fh: | |
data = fh_gzipped.read() # gunzip | |
fh.write(data) # saved gunzipped version to disk | |
def load_mnist(mnist_dir): | |
"""Method to load MNIST data from filesystem""" | |
def load_images(path_images): | |
with open(path_images, 'rb') as fh: | |
header_buf = fh.read(4 * 4) # first 4 integers are a header | |
header = np.frombuffer(header_buf, dtype='>i4') # read big endian integers ('>i4') | |
assert (header[0] == 2051) # check that the magic number is correct | |
assert (header[1] == 60000 or header[1] == 10000) # set should have either 60000 or 10000 images (train & test, respectively) | |
assert (header[2] == 28) # check image width is 28 | |
assert (header[3] == 28) # check image height is 28 | |
images_buf = fh.read() | |
images = np.frombuffer(images_buf, dtype=np.uint8).reshape((-1, 28*28)).T | |
images_normalized = images / 255. | |
return images_normalized | |
def load_labels(path_labels): | |
with open(path_labels, 'rb') as fh: | |
header_buf = fh.read(2 * 4) # first 2 integers are a header | |
header = np.frombuffer(header_buf, dtype='>i4') | |
assert (header[0] == 2049) # check that the magic number is correct | |
assert (header[1] == 60000 or header[1] == 10000) | |
labels_buf = fh.read() | |
labels = np.frombuffer(labels_buf, dtype=np.uint8) | |
return labels | |
train_images = load_images(os.path.join(mnist_dir, "train-images-idx3-ubyte")) | |
train_labels = load_labels(os.path.join(mnist_dir, "train-labels-idx1-ubyte")) | |
test_images = load_images(os.path.join(mnist_dir, "t10k-images-idx3-ubyte")) | |
test_labels = load_labels(os.path.join(mnist_dir, "t10k-labels-idx1-ubyte")) | |
return train_images, train_labels, test_images, test_labels | |
def show_images(images, labels): | |
"""Plot the training data and its labels to check everything is OK""" | |
plt.title("Showing some training images and their labels") | |
count = 1 | |
for image_ind in [34, 2302, 34598]: | |
image = images[:, image_ind].reshape(28, 28) | |
ax = plt.subplot(130 + count) | |
ax.set_title("Should be {}".format(labels[image_ind])) | |
plt.imshow(image) | |
count += 1 | |
plt.suptitle("""Showing some training images and their labels | |
Check whether the labels are accurate! If not, something's wrong! | |
Close the window to start training.""") | |
plt.show() | |
def check_with_small_net(): | |
"""Creates a small net with random inputs | |
to check backprop using gradient checking""" | |
net = FullyConnectedNet([100, 25, 10, 2]) | |
net.train_init() | |
inputs = np.random.rand(100, 1) | |
labels = np.array([1, 0]).reshape(2, 1) | |
net.check_gradient(inputs, labels) | |
def main(): | |
check_with_small_net() # check that backprop is implemented correctly | |
import argparse | |
parser = argparse.ArgumentParser(description=""" | |
Train a fully connected NN to classify MNIST digits. | |
Example usage: | |
python fully_connected_net.py --mnist_dir mnist_data --layer_spec 784 200 10 | |
This trains the mnist data in `mnist_data` using a fully connected net with | |
input layer of size 784, hidden layer of size 200, and output layer of size 10 | |
If the data is not already in `mnist_data`, it will be downloaded | |
""") | |
parser.add_argument("--mnist_dir", help="Directory to download mnist data. " | |
"If it doesn't exist, the data will be downloaded", | |
default='mnist-data') | |
parser.add_argument("--layer_spec", help="""The structure of the net, a sequence of integers. | |
The first element MUST be 784 and the last element MUST be 10""", type=int, nargs='+') | |
parser.add_argument("--num_epochs", help="Number of epochs to train", type=int, default=10) | |
args = parser.parse_args() | |
MNIST_INPUT = 784 | |
MNIST_OUTPUT = 10 | |
layer_spec = [MNIST_INPUT, 1000, 100, 30, MNIST_OUTPUT] # 4-layer NN | |
if args.layer_spec: | |
layer_spec = args.layer_spec | |
if layer_spec[0] != MNIST_INPUT: | |
raise RuntimeError("Input layer must have dimension {}".format(MNIST_INPUT)) | |
if layer_spec[-1] != MNIST_OUTPUT: | |
raise RuntimeError("Output layer must have dimension {}".format(MNIST_OUTPUT)) | |
print("Layer spec: {}".format(layer_spec)) | |
net = FullyConnectedNet(layer_spec) | |
print("MNIST directory: {}".format(args.mnist_dir)) | |
if not os.path.isdir(args.mnist_dir): | |
os.mkdir(args.mnist_dir) | |
download_mnist(args.mnist_dir) | |
train_images, train_labels, test_images, test_labels = load_mnist(args.mnist_dir) | |
show_images(train_images, train_labels) | |
# train and plot decreasing cost | |
print("About to train {} epochs".format(args.num_epochs)) | |
net.train(train_images, train_labels, shuffle=True, num_epochs=args.num_epochs) | |
print("Plotting cost per iteration") | |
plt.plot(np.array(net.costs)) | |
plt.ylabel('Cross-entropy cost') | |
plt.xlabel('Iteration number') | |
plt.title("Cost per iteration") | |
plt.show() | |
train_accuracy = net.test(train_images, train_labels) | |
print("Train accuracy: {:.2f}%".format(100*train_accuracy)) | |
test_accuracy = net.test(test_images, test_labels) | |
print("Test accuracy: {:.2f}%".format(100*test_accuracy)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment