Created
February 11, 2016 21:19
-
-
Save tatome/9badecfab047cacebdc3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
# | |
#!/usr/bin/python2 | |
# | |
# Copyright 2013 Johannes Bauer, Universitaet Hamburg | |
# | |
# This file is free software. Do with it whatever you like. | |
# It comes with no warranty, explicit or implicit, whatsoever. | |
# | |
# If you find this code useful or if you have any questions, do not | |
# hesitate to contact me at | |
# bauer at informatik dot uni dash hamburg dot de. | |
# | |
""" | |
Simple, no-frills implementation of an MLP (Multi-Layer Perceptron) and | |
the backpropagation of errors algorithm (backprop) based on numpy. | |
""" | |
import numpy | |
import logging | |
logger = logging.getLogger(__name__) | |
class Layer(object): | |
""" Base class for a layer in an MLP. Implement methods transfer and | |
dtransfer to use it. | |
Note: in this implementation, every Layer creates a LogisticLayer as | |
its next layer down the hierarchy. | |
""" | |
def __init__(self, num_neurons, layer=0): | |
""" | |
num_neurons : tuple of numbers of neurons in this layer and in the next ones | |
(going from output to input). | |
layer : the number of this layer, counting from the output layer. Used | |
in log messages. | |
""" | |
logger.debug("Initializing layer %d", layer) | |
self.layer = layer | |
self.weights = (numpy.random.random(size=(num_neurons[0], num_neurons[1])) - .5) * .1 | |
self.momentum = numpy.zeros_like(self.weights) | |
if len(num_neurons) > 2: | |
self.next_layer = LogisticLayer(num_neurons[1:], layer + 1) | |
else: | |
self.next_layer = None | |
def transfer(self, a): | |
""" | |
transfer function of the neurons in this layer. | |
a : The input activation. | |
See compute() for a description. | |
""" | |
raise NotImplementedError("This is an 'abstract base class'.") | |
def dtranser(self, e, a): | |
""" | |
partial derivative of the error function of the neurons in this layer. | |
e : the error | |
See update() for a description. | |
a : The _output_ activation. | |
See update() for a description. | |
""" | |
raise NotImplementedError("This is an 'abstract base class'.") | |
def compute(self, inp): | |
""" | |
Compute the output of this layer from the input. | |
inp : The input activation of this layer. | |
Array of shape (n,i), where n is the number of data points and i is | |
the number of inputs. | |
returns : A list containing the output of each layer from top to bottom. The | |
last entry is the input. | |
""" | |
logger.debug("Computing activation in layer %d", self.layer) | |
if self.next_layer is not None: | |
outp = self.next_layer.compute(inp) | |
else: | |
outp = [inp] | |
inp = outp[0].reshape(outp[0].shape[0],1,outp[0].shape[1]) | |
myoutp = self.transfer((inp * self.weights).sum(axis=2)) | |
return [myoutp] + outp | |
def update(self, outputs, error, alpha, beta): | |
""" | |
Update weights of this layer and the next one down recursively (using | |
backpropagation). | |
outputs : a list of output activations where outputs[0] is the output | |
activation of this layer and outputs[0] is the output activation | |
of the next one down, ie. this layer's input activation. Each | |
entry is an array of shape (n,o) where n is the number of data points, | |
and o is the number of neurons in this layer. | |
error : The error of this layer. | |
Array of the same shape as the first entry of outputs. | |
alpha : learning rate | |
beta : momentum rate | |
""" | |
logger.debug("Updating weights in layer %d", self.layer) | |
outp = outputs[0] | |
inp = outputs[1] | |
delta = self.transferd(error, outp) | |
if self.next_layer is not None: | |
# introduce another unitary dimension so numpy can do broadcasting. | |
# first dimension: data point, second: outputs, third: inputs. | |
d = delta.reshape(delta.shape[0],1,delta.shape[1]) | |
next_error = (self.weights.T * d).sum(axis=2) | |
# tile deltas once for each input and adjust dimensions. | |
deltas = numpy.tile(delta,(inp.shape[1], 1, 1)).transpose((2,1,0)) | |
change = alpha * (deltas * inp).mean(axis=1) + beta * self.momentum | |
self.weights += change | |
self.momentum = change | |
if self.next_layer is not None: | |
# recursively update next layer. | |
self.next_layer.update(outputs[1:], next_error, alpha, beta) | |
class LogisticLayer(Layer): | |
""" Implementation of a network layer with a logistic tuning function. """ | |
def __init__(self, num_neurons, layer=0): | |
Layer.__init__(self, num_neurons, layer) | |
def transfer(self, a): | |
""" See Layer.transfer() for documentaion. """ | |
return 1./(1+numpy.exp(-a)) | |
def transferd(self, error, a): | |
""" See Layer.transferd() for documentaion. """ | |
return error * a * (1.-a) | |
class LinearLayer(Layer): | |
""" Implementation of a network layer with a linear tuning function. """ | |
def __init__(self, num_neurons, layer=0): | |
Layer.__init__(self, num_neurons, layer) | |
def transfer(self, a): | |
""" See Layer.transfer() for documentaion. """ | |
return a | |
def transferd(self, error, activation): | |
""" See Layer.transferd() for documentaion. """ | |
return error / error.shape[1] | |
class Network(object): | |
""" | |
Implementation of a back-propagating MLP. Basically just a wrapper for a | |
linked list of Layers. | |
""" | |
def __init__(self, shape, first_layer_class): | |
""" | |
num_neurons : tuple of numbers of neurons in the network from top to bottom. | |
The first entry is the number of outputs, the last the number | |
of inputs. | |
first_layer_class : Class of the first layer (or a function taking num_neurons as | |
input and returning a Layer.) | |
""" | |
shape = shape[:-1] + (shape[-1]+1,) | |
self.shape = shape | |
self.layers = first_layer_class(shape) | |
def compute(self, inp): | |
""" | |
Compute the input to the given output. | |
inp : Array-like of shape (n,i) where n is the number of data points and i | |
is the number of inputs. | |
returns : The output of the network. An array of shape (n,o) where n is the | |
number of data points and o is the number of outputs. | |
""" | |
inp = numpy.concatenate((inp, -numpy.ones((inp.shape[0],1))), axis=1) | |
outp = self.layers.compute(inp) | |
return outp[0] | |
def update(self, inp, target, alpha, beta): | |
""" | |
Update the weights of the network using backprop. | |
inp : Array-like of shape (n,i) where n is the number of data points and i | |
is the number of inputs. | |
target : The target values. An Array of shape (n,o) where n is the number of | |
data points and o is the number of outputs. | |
returns : The error of the network _before_ updating the weights. | |
""" | |
inp = numpy.concatenate((inp, -numpy.ones((inp.shape[0],1))), axis=1) | |
outps = self.layers.compute(inp) | |
error = target - outps[0] | |
self.layers.update(outps, error, alpha, beta) | |
return error | |
class BackpropTrainer(object): | |
""" Backprop Training Manager: encapsulates the backprop training process: | |
* Handles partitioning of data into training data and validation data and | |
partitioning of training data into mini batches. | |
* Trains the network by running backprop on mini batches until one of the | |
stopping criteria is met. | |
""" | |
def __init__(self, network, data, targets, validation_set_size=.1): | |
""" | |
network : the network to train | |
data : the input data | |
targets : the desired output data | |
validation_set_size : the part of the data to set aside for validation. | |
(Should normally be between 0 and 0.5). | |
""" | |
self.network = network | |
self.set_data(data, targets, validation_set_size) | |
def set_data(self, data, targets, validation_set_size=.1): | |
""" | |
Set the data to train on. Done by the constructor---needs only be repeated | |
if there's new data. | |
data : the input data | |
targets : the desired output data | |
validation_set_size : the part of the data to set aside for validation. | |
(Should normally be between 0 and 0.5). | |
""" | |
# important: use a random ordering of data points. Otherwise there may be | |
# a systematic difference between training data and validation data. | |
indices = range(len(data)) | |
numpy.random.shuffle(indices) | |
self.data = data[indices] | |
self.targets = targets[indices] | |
num_data_points = len(self.data) | |
num_validation_data = int(validation_set_size * num_data_points) | |
self.training_data = self.data[num_validation_data:] | |
self.training_targets = self.targets[num_validation_data:] | |
self.validation_data = self.data[:num_validation_data] | |
self.validation_targets = self.targets[:num_validation_data] | |
logger.debug("Number of training data points: %d", len(self.training_data)) | |
logger.debug("Number of validation data points: %d", len(self.validation_data)) | |
if len(self.validation_data) > len(self.training_data): | |
logger.warn("More validation data than training data.") | |
def __mini_batch__(self, mini_batchsize): | |
if mini_batchsize < len(self.training_data): | |
indices = range(len(self.training_data)) | |
# numpy.random.choice() is not available in my version of numpy. | |
numpy.random.shuffle(indices) | |
indices = indices[:mini_batchsize] | |
mini_batch = self.training_data[indices] | |
mini_batch_targets = self.training_targets[indices] | |
return mini_batch, mini_batch_targets | |
elif mini_batchsize >= len(self.training_data): | |
logger.warn("Mini-batch size (%d) is greater than number of data points (%d).", | |
mini_batchsize, | |
len(self.training_data)) | |
return self.training_data, self.training_targets | |
def train(self, | |
alpha, beta, | |
max_iterations=None, | |
validation_every=5, | |
stop_after_no_improvement=3, | |
mini_batchsize=None, | |
min_mse=None): | |
""" | |
Trains the network by feeding it mini batches until one of the stopping criteria is met. | |
alpha : learning rate | |
beta : momentum rate | |
max_iterations : maximum number of iterations to train | |
validation_every : number of iterations between two validation steps | |
stop_after_no_improvement : number of validation steps without improvment | |
before stopping (Default is None: train forever) | |
mini_batchsize : the number of training examples to feed to the net in | |
each iteration | |
min_mse : Mean squared error to reach before stopping the training. | |
(Default is None: train forever) | |
""" | |
step = 0 | |
stop = False | |
current_best = None | |
steps_since_current_best = None | |
while not stop: | |
step += 1 | |
stop = step == max_iterations | |
training_data, training_targets = self.__mini_batch__(mini_batchsize) | |
error_train = self.network.update(training_data, training_targets, alpha, beta) | |
if logger.isEnabledFor(logging.DEBUG): | |
mse_train = (error_train**2).mean() | |
logger.debug("MSE on training set: %e", mse_train) | |
if step % validation_every == 0: | |
error_val = self.validation_targets - self.network.compute(self.validation_data) | |
mse_val = (error_val**2).mean() | |
logger.info("MSE on validation set: %e", mse_val) | |
if current_best < mse_val and steps_since_current_best > stop_after_no_improvement: | |
logging.info("Stopping early---no more improvement.") | |
stop = True | |
elif current_best is None or mse_val < current_best: | |
current_best = mse_val | |
steps_since_current_best = 0 | |
if min_mse >= mse_val: | |
logging.info("Stopping early---minimum MSE reached.") | |
stop = True | |
steps_since_current_best += 1 | |
if __name__ == '__main__': | |
numpy.random.seed(42) | |
logging.basicConfig(level=logging.INFO) | |
def trainXor(): | |
import scipy.stats | |
logger.info("Learning XOR function.") | |
layers = (1,3,2) | |
def actual(inp): | |
return (inp[:,0] != inp[:,1]).astype(float).reshape((inp.shape[0],1)) | |
net = Network(layers, LinearLayer) | |
steps = 10000 | |
for step in range(steps): | |
lr = .5 - .5 * step / steps | |
lrr = .09 | |
inp = scipy.stats.bernoulli.rvs(numpy.tile(.5,(10,2))).astype(float) | |
expected = actual(inp) | |
output = net.update(inp, expected, lr, lrr) | |
if step % 1000 == 999: | |
squarerr = ((expected - output)**2).mean() | |
logger.info("MSE: %e", squarerr) | |
logger.info("[0.,0.]: %s. Should be %s", net.compute(numpy.array([[0.,0.]])), actual(numpy.array([[0.,0.]]))) | |
logger.info("[1.,0.]: %s. Should be %s", net.compute(numpy.array([[1.,0.]])), actual(numpy.array([[1.,0.]]))) | |
logger.info("[0.,1.]: %s. Should be %s", net.compute(numpy.array([[0.,1.]])), actual(numpy.array([[0.,1.]]))) | |
logger.info("[1.,1.]: %s. Should be %s", net.compute(numpy.array([[1.,1.]])), actual(numpy.array([[1.,1.]]))) | |
def trainPolys(): | |
from matplotlib import pyplot as plt | |
logger.info("Learning polynomials.") | |
layers = (2,40,2) | |
powers = [2,3] | |
def actual(inp): | |
return numpy.power(inp, powers) / 1000 | |
net = Network(layers, LinearLayer) | |
steps = 10000 | |
inp = numpy.array([numpy.linspace(-1, 1, 10000), numpy.linspace(0, 5, 10000)]).T * 10 | |
expected = numpy.array([actual(d) for d in inp]) | |
trainer = BackpropTrainer(net, inp, expected, .1) | |
for _ in range(3): | |
# Actually, this will never stop early because there is no noise and no overfitting, so the algorithm can | |
# regress (almost) forever. | |
trainer.train(alpha=.0005, beta=.9, validation_every=100, stop_after_no_improvement=10, max_iterations=steps, mini_batchsize=1000) | |
fig,plots = plt.subplots(2) | |
plots[0].plot(inp, net.compute(inp)) | |
plots[1].plot(inp, expected) | |
plt.show() | |
trainXor() | |
trainPolys() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Multi Layer Perceptron in NumPy
I recently decided to write my own MLP implementation. There are a few implementations out there, but I wanted my own for a number of reasons: I wanted something standalone, I wanted it to be in Python, I wanted it to be reasonably fast and, for Pete's sake I'm basically doing a PHD in computational neuroscience and I've never implemented an MLP.
It's fairly fast and easy to use and it's documented. Also, it supports an arbitrary number of layers, which sets it apart from the two implementations listed below.
Other free MLP implementations I have found are the one linked to from the Wikipedia article and the one by Marsland in the supporting material to his book.
As always: the code is provided as-is. I don't guarantee or even claim it's good or functional code. Assume this code will break your computer and untie your shoes