Skip to content

Instantly share code, notes, and snippets.

@tatome
Created February 11, 2016 21:19
Show Gist options
  • Save tatome/9badecfab047cacebdc3 to your computer and use it in GitHub Desktop.
Save tatome/9badecfab047cacebdc3 to your computer and use it in GitHub Desktop.
# coding=utf-8
#
#!/usr/bin/python2
#
# Copyright 2013 Johannes Bauer, Universitaet Hamburg
#
# This file is free software. Do with it whatever you like.
# It comes with no warranty, explicit or implicit, whatsoever.
#
# If you find this code useful or if you have any questions, do not
# hesitate to contact me at
# bauer at informatik dot uni dash hamburg dot de.
#
"""
Simple, no-frills implementation of an MLP (Multi-Layer Perceptron) and
the backpropagation of errors algorithm (backprop) based on numpy.
"""
import numpy
import logging
logger = logging.getLogger(__name__)
class Layer(object):
""" Base class for a layer in an MLP. Implement methods transfer and
dtransfer to use it.
Note: in this implementation, every Layer creates a LogisticLayer as
its next layer down the hierarchy.
"""
def __init__(self, num_neurons, layer=0):
"""
num_neurons : tuple of numbers of neurons in this layer and in the next ones
(going from output to input).
layer : the number of this layer, counting from the output layer. Used
in log messages.
"""
logger.debug("Initializing layer %d", layer)
self.layer = layer
self.weights = (numpy.random.random(size=(num_neurons[0], num_neurons[1])) - .5) * .1
self.momentum = numpy.zeros_like(self.weights)
if len(num_neurons) > 2:
self.next_layer = LogisticLayer(num_neurons[1:], layer + 1)
else:
self.next_layer = None
def transfer(self, a):
"""
transfer function of the neurons in this layer.
a : The input activation.
See compute() for a description.
"""
raise NotImplementedError("This is an 'abstract base class'.")
def dtranser(self, e, a):
"""
partial derivative of the error function of the neurons in this layer.
e : the error
See update() for a description.
a : The _output_ activation.
See update() for a description.
"""
raise NotImplementedError("This is an 'abstract base class'.")
def compute(self, inp):
"""
Compute the output of this layer from the input.
inp : The input activation of this layer.
Array of shape (n,i), where n is the number of data points and i is
the number of inputs.
returns : A list containing the output of each layer from top to bottom. The
last entry is the input.
"""
logger.debug("Computing activation in layer %d", self.layer)
if self.next_layer is not None:
outp = self.next_layer.compute(inp)
else:
outp = [inp]
inp = outp[0].reshape(outp[0].shape[0],1,outp[0].shape[1])
myoutp = self.transfer((inp * self.weights).sum(axis=2))
return [myoutp] + outp
def update(self, outputs, error, alpha, beta):
"""
Update weights of this layer and the next one down recursively (using
backpropagation).
outputs : a list of output activations where outputs[0] is the output
activation of this layer and outputs[0] is the output activation
of the next one down, ie. this layer's input activation. Each
entry is an array of shape (n,o) where n is the number of data points,
and o is the number of neurons in this layer.
error : The error of this layer.
Array of the same shape as the first entry of outputs.
alpha : learning rate
beta : momentum rate
"""
logger.debug("Updating weights in layer %d", self.layer)
outp = outputs[0]
inp = outputs[1]
delta = self.transferd(error, outp)
if self.next_layer is not None:
# introduce another unitary dimension so numpy can do broadcasting.
# first dimension: data point, second: outputs, third: inputs.
d = delta.reshape(delta.shape[0],1,delta.shape[1])
next_error = (self.weights.T * d).sum(axis=2)
# tile deltas once for each input and adjust dimensions.
deltas = numpy.tile(delta,(inp.shape[1], 1, 1)).transpose((2,1,0))
change = alpha * (deltas * inp).mean(axis=1) + beta * self.momentum
self.weights += change
self.momentum = change
if self.next_layer is not None:
# recursively update next layer.
self.next_layer.update(outputs[1:], next_error, alpha, beta)
class LogisticLayer(Layer):
""" Implementation of a network layer with a logistic tuning function. """
def __init__(self, num_neurons, layer=0):
Layer.__init__(self, num_neurons, layer)
def transfer(self, a):
""" See Layer.transfer() for documentaion. """
return 1./(1+numpy.exp(-a))
def transferd(self, error, a):
""" See Layer.transferd() for documentaion. """
return error * a * (1.-a)
class LinearLayer(Layer):
""" Implementation of a network layer with a linear tuning function. """
def __init__(self, num_neurons, layer=0):
Layer.__init__(self, num_neurons, layer)
def transfer(self, a):
""" See Layer.transfer() for documentaion. """
return a
def transferd(self, error, activation):
""" See Layer.transferd() for documentaion. """
return error / error.shape[1]
class Network(object):
"""
Implementation of a back-propagating MLP. Basically just a wrapper for a
linked list of Layers.
"""
def __init__(self, shape, first_layer_class):
"""
num_neurons : tuple of numbers of neurons in the network from top to bottom.
The first entry is the number of outputs, the last the number
of inputs.
first_layer_class : Class of the first layer (or a function taking num_neurons as
input and returning a Layer.)
"""
shape = shape[:-1] + (shape[-1]+1,)
self.shape = shape
self.layers = first_layer_class(shape)
def compute(self, inp):
"""
Compute the input to the given output.
inp : Array-like of shape (n,i) where n is the number of data points and i
is the number of inputs.
returns : The output of the network. An array of shape (n,o) where n is the
number of data points and o is the number of outputs.
"""
inp = numpy.concatenate((inp, -numpy.ones((inp.shape[0],1))), axis=1)
outp = self.layers.compute(inp)
return outp[0]
def update(self, inp, target, alpha, beta):
"""
Update the weights of the network using backprop.
inp : Array-like of shape (n,i) where n is the number of data points and i
is the number of inputs.
target : The target values. An Array of shape (n,o) where n is the number of
data points and o is the number of outputs.
returns : The error of the network _before_ updating the weights.
"""
inp = numpy.concatenate((inp, -numpy.ones((inp.shape[0],1))), axis=1)
outps = self.layers.compute(inp)
error = target - outps[0]
self.layers.update(outps, error, alpha, beta)
return error
class BackpropTrainer(object):
""" Backprop Training Manager: encapsulates the backprop training process:
* Handles partitioning of data into training data and validation data and
partitioning of training data into mini batches.
* Trains the network by running backprop on mini batches until one of the
stopping criteria is met.
"""
def __init__(self, network, data, targets, validation_set_size=.1):
"""
network : the network to train
data : the input data
targets : the desired output data
validation_set_size : the part of the data to set aside for validation.
(Should normally be between 0 and 0.5).
"""
self.network = network
self.set_data(data, targets, validation_set_size)
def set_data(self, data, targets, validation_set_size=.1):
"""
Set the data to train on. Done by the constructor---needs only be repeated
if there's new data.
data : the input data
targets : the desired output data
validation_set_size : the part of the data to set aside for validation.
(Should normally be between 0 and 0.5).
"""
# important: use a random ordering of data points. Otherwise there may be
# a systematic difference between training data and validation data.
indices = range(len(data))
numpy.random.shuffle(indices)
self.data = data[indices]
self.targets = targets[indices]
num_data_points = len(self.data)
num_validation_data = int(validation_set_size * num_data_points)
self.training_data = self.data[num_validation_data:]
self.training_targets = self.targets[num_validation_data:]
self.validation_data = self.data[:num_validation_data]
self.validation_targets = self.targets[:num_validation_data]
logger.debug("Number of training data points: %d", len(self.training_data))
logger.debug("Number of validation data points: %d", len(self.validation_data))
if len(self.validation_data) > len(self.training_data):
logger.warn("More validation data than training data.")
def __mini_batch__(self, mini_batchsize):
if mini_batchsize < len(self.training_data):
indices = range(len(self.training_data))
# numpy.random.choice() is not available in my version of numpy.
numpy.random.shuffle(indices)
indices = indices[:mini_batchsize]
mini_batch = self.training_data[indices]
mini_batch_targets = self.training_targets[indices]
return mini_batch, mini_batch_targets
elif mini_batchsize >= len(self.training_data):
logger.warn("Mini-batch size (%d) is greater than number of data points (%d).",
mini_batchsize,
len(self.training_data))
return self.training_data, self.training_targets
def train(self,
alpha, beta,
max_iterations=None,
validation_every=5,
stop_after_no_improvement=3,
mini_batchsize=None,
min_mse=None):
"""
Trains the network by feeding it mini batches until one of the stopping criteria is met.
alpha : learning rate
beta : momentum rate
max_iterations : maximum number of iterations to train
validation_every : number of iterations between two validation steps
stop_after_no_improvement : number of validation steps without improvment
before stopping (Default is None: train forever)
mini_batchsize : the number of training examples to feed to the net in
each iteration
min_mse : Mean squared error to reach before stopping the training.
(Default is None: train forever)
"""
step = 0
stop = False
current_best = None
steps_since_current_best = None
while not stop:
step += 1
stop = step == max_iterations
training_data, training_targets = self.__mini_batch__(mini_batchsize)
error_train = self.network.update(training_data, training_targets, alpha, beta)
if logger.isEnabledFor(logging.DEBUG):
mse_train = (error_train**2).mean()
logger.debug("MSE on training set: %e", mse_train)
if step % validation_every == 0:
error_val = self.validation_targets - self.network.compute(self.validation_data)
mse_val = (error_val**2).mean()
logger.info("MSE on validation set: %e", mse_val)
if current_best < mse_val and steps_since_current_best > stop_after_no_improvement:
logging.info("Stopping early---no more improvement.")
stop = True
elif current_best is None or mse_val < current_best:
current_best = mse_val
steps_since_current_best = 0
if min_mse >= mse_val:
logging.info("Stopping early---minimum MSE reached.")
stop = True
steps_since_current_best += 1
if __name__ == '__main__':
numpy.random.seed(42)
logging.basicConfig(level=logging.INFO)
def trainXor():
import scipy.stats
logger.info("Learning XOR function.")
layers = (1,3,2)
def actual(inp):
return (inp[:,0] != inp[:,1]).astype(float).reshape((inp.shape[0],1))
net = Network(layers, LinearLayer)
steps = 10000
for step in range(steps):
lr = .5 - .5 * step / steps
lrr = .09
inp = scipy.stats.bernoulli.rvs(numpy.tile(.5,(10,2))).astype(float)
expected = actual(inp)
output = net.update(inp, expected, lr, lrr)
if step % 1000 == 999:
squarerr = ((expected - output)**2).mean()
logger.info("MSE: %e", squarerr)
logger.info("[0.,0.]: %s. Should be %s", net.compute(numpy.array([[0.,0.]])), actual(numpy.array([[0.,0.]])))
logger.info("[1.,0.]: %s. Should be %s", net.compute(numpy.array([[1.,0.]])), actual(numpy.array([[1.,0.]])))
logger.info("[0.,1.]: %s. Should be %s", net.compute(numpy.array([[0.,1.]])), actual(numpy.array([[0.,1.]])))
logger.info("[1.,1.]: %s. Should be %s", net.compute(numpy.array([[1.,1.]])), actual(numpy.array([[1.,1.]])))
def trainPolys():
from matplotlib import pyplot as plt
logger.info("Learning polynomials.")
layers = (2,40,2)
powers = [2,3]
def actual(inp):
return numpy.power(inp, powers) / 1000
net = Network(layers, LinearLayer)
steps = 10000
inp = numpy.array([numpy.linspace(-1, 1, 10000), numpy.linspace(0, 5, 10000)]).T * 10
expected = numpy.array([actual(d) for d in inp])
trainer = BackpropTrainer(net, inp, expected, .1)
for _ in range(3):
# Actually, this will never stop early because there is no noise and no overfitting, so the algorithm can
# regress (almost) forever.
trainer.train(alpha=.0005, beta=.9, validation_every=100, stop_after_no_improvement=10, max_iterations=steps, mini_batchsize=1000)
fig,plots = plt.subplots(2)
plots[0].plot(inp, net.compute(inp))
plots[1].plot(inp, expected)
plt.show()
trainXor()
trainPolys()
@tatome
Copy link
Author

tatome commented Feb 11, 2016

Multi Layer Perceptron in NumPy

I recently decided to write my own MLP implementation. There are a few implementations out there, but I wanted my own for a number of reasons: I wanted something standalone, I wanted it to be in Python, I wanted it to be reasonably fast and, for Pete's sake I'm basically doing a PHD in computational neuroscience and I've never implemented an MLP.

It's fairly fast and easy to use and it's documented. Also, it supports an arbitrary number of layers, which sets it apart from the two implementations listed below.

Other free MLP implementations I have found are the one linked to from the Wikipedia article and the one by Marsland in the supporting material to his book.

As always: the code is provided as-is. I don't guarantee or even claim it's good or functional code. Assume this code will break your computer and untie your shoes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment