Last active
August 29, 2015 14:00
-
-
Save c58/11270040 to your computer and use it in GitHub Desktop.
AForge.NET port (Basic Neural Network) with improovements to CYthon
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
# cython: profile=True | |
# cython: wraparound=False | |
# cython: boundscheck=False | |
# cython: infer_types=True | |
# filename: pyforge.pyx | |
from libc.stdlib cimport malloc, free, rand | |
cimport cython | |
from cython.view cimport array | |
cimport libc.math as cmath | |
import numpy as np | |
cimport numpy as np | |
################################## | |
## | |
## ACTIVATION | |
## | |
cdef class IActivationFunction(object): | |
cpdef double function(self, double x): | |
raise NotImplementedError() | |
cpdef double derivative(self, double x) except *: | |
raise NotImplementedError() | |
cpdef double derivative2(self, double y) except *: | |
raise NotImplementedError() | |
def __repr__(self): | |
return "No repr :(" | |
cdef class TanhActivationFunction(IActivationFunction): | |
cdef: | |
public double beta | |
def __init__(self, double beta = 1): | |
self.beta = beta | |
@cython.profile(False) | |
cpdef double function(self, double x): | |
return cmath.tanh(self.beta * x) | |
cpdef double derivative(self, double x): | |
cdef double f = self.function(x) | |
return self.beta * (1 - f * f) | |
@cython.profile(False) | |
cpdef double derivative2(self, double y): | |
return self.beta * (1 - y * y) | |
def __repr__(self): | |
return "tanh(beta * x)" | |
################################## | |
## | |
## NEURON | |
## | |
cdef class Neuron(object): | |
cdef: | |
public int inputs_count | |
public double[:] weights | |
public double output | |
public tuple rand_range | |
def __init__(self, int inputs): | |
# allocate weights | |
self.inputs_count = max(1, inputs) | |
self.weights = array(shape=(self.inputs_count,), itemsize=sizeof(double), format="d") | |
self.rand_range = (-1.0, 1.0) | |
self.output = 0 | |
# randomize the neuron | |
self.randomize() | |
cpdef randomize(self): | |
# randomize weights | |
cdef int i | |
for i in range(self.inputs_count): | |
self.weights[i] = np.random.uniform(self.rand_range[0], self.rand_range[1]) | |
cpdef double compute(self, double[:] inp) except *: | |
raise NotImplementedError() | |
def __repr__(self): | |
return "\n\t\t\tOutput: "+str(self.output)+"; " \ | |
"Inputs count: "+str(self.inputs_count)+"; " \ | |
"Random range: "+str(self.rand_range)+"; " \ | |
"Weights: "+str(np.array(self.weights))+"; " | |
cdef class ActivationNeuron(Neuron): | |
cdef: | |
public IActivationFunction activation_function | |
public double threshold | |
def __init__(self, int inputs, IActivationFunction function): | |
Neuron.__init__(self, inputs) | |
self.activation_function = function | |
cpdef randomize(self): | |
Neuron.randomize(self) | |
self.threshold = np.random.uniform(self.rand_range[0], self.rand_range[1]) | |
cpdef double compute(self, double[:] inp) except *: | |
cdef: | |
double res_summ = 0.0 | |
int i | |
for i in range(self.inputs_count): | |
res_summ += self.weights[i] * inp[i] | |
res_summ += self.threshold | |
self.output = self.activation_function.function(res_summ) | |
return self.output | |
def __repr__(self): | |
res = Neuron.__repr__(self) | |
return res + \ | |
"Threshold: " + str(self.threshold)+"; " \ | |
"Act. func: " + str(self.activation_function) | |
################################## | |
## | |
## LAYER | |
## | |
cdef class Layer(object): | |
cdef: | |
public int inputs_count | |
public int neurons_count | |
public list neurons | |
public double[:] output | |
def __init__(self, int neurons_count, int inputs_count): | |
self.inputs_count = max(1, inputs_count) | |
self.neurons_count = max(1, neurons_count) | |
self.neurons = list() | |
self.output = array(shape=(self.neurons_count,), itemsize=sizeof(double), format="d") | |
cpdef double[:] compute(self, double[:] inp): | |
cdef: | |
Neuron n | |
int i = 0 | |
for n in self.neurons: | |
self.output[i] = n.compute( inp ) | |
i += 1 | |
return self.output | |
cpdef randomize(self): | |
cdef Neuron n | |
for n in self.neurons: | |
n.randomize() | |
def __repr__(self): | |
return "\t\tOutput: "+str(np.array(self.output))+"\n" \ | |
"\t\tNeurons count: "+str(len(self.neurons))+"\n" \ | |
"\t\tNeurons: "+str(self.neurons)+"\n" | |
cdef class ActivationLayer(Layer): | |
def __init__(self, int neurons_count, int inputs_count, IActivationFunction function ): | |
Layer.__init__(self, neurons_count, inputs_count) | |
for i in range(neurons_count): | |
self.neurons.append(ActivationNeuron( inputs_count, function )) | |
cpdef set_activation_function(self, IActivationFunction function ): | |
for n in self.neurons: | |
n.activation_function = function | |
################################## | |
## | |
## NETWORK | |
## | |
cdef class Network(object): | |
cdef: | |
public int layers_count | |
public list layers | |
public double[:] output | |
def __init__(self, int layers_count): | |
self.layers_count = layers_count | |
self.layers = list() | |
cpdef double[:] compute(self, double[:] inp): | |
cdef: | |
double[:] output = inp | |
Layer l | |
for l in self.layers: | |
output = l.compute(output) | |
self.output = output | |
return output | |
cpdef randomize(self): | |
cdef int i | |
for i in range(self.layers_count): | |
self.layers[i].randomize() | |
cpdef get_error_mse(self, double[:,:] inp, double[:,:] output): | |
cdef: | |
double err_sum = 0 | |
int i | |
for i in range(len(inp)): | |
err_sum += abs(self.compute(inp[i])[0] - output[i][0]) | |
err_sum /= len(inp) | |
return err_sum | |
cpdef get_error_sign(self, double[:,:] inp, double[:,:] output): | |
cdef: | |
double err_sum = 0, value | |
int i | |
for i in range(len(inp)): | |
value = self.compute(inp[i])[0] | |
err_sum += 1 if (output[i][0] > 0 and value > 0) or (output[i][0] < 0 and value < 0) else 0 | |
err_sum /= len(inp) | |
return 1 - err_sum | |
def __repr__(self): | |
return "NETWORK:\n" \ | |
"\tOutput: "+str(np.array(self.output))+"\n" \ | |
"\tLayers count: "+str(len(self.layers))+"\n" \ | |
"\tLayers: \n"+str(self.layers) | |
cdef class ActivationNetwork(Network): | |
def __init__(self, IActivationFunction func, int inputs_count, *args): | |
Network.__init__(self, len(args)) | |
cdef int i | |
for i in range(0, self.layers_count): | |
self.layers.append(ActivationLayer( | |
# neurons count in the layer | |
args[i], | |
# inputs count of the layer | |
inputs_count if i == 0 else args[i-1], | |
# activation function of the layer | |
func | |
)) | |
cpdef set_activation_function(self, IActivationFunction func): | |
for l in self.layers: | |
l.set_activation_function(func) | |
################################## | |
## | |
## LEARNING | |
## | |
cdef class ISupervisedLearning(object): | |
cpdef double run(self, double[:] inp, double[:] output ) except *: | |
raise NotImplementedError() | |
cpdef double run_epoch(self, double[:,:] inp, double[:,:] output, int epoch_count=1) except *: | |
raise NotImplementedError() | |
cdef class BackPropagationLearning(ISupervisedLearning): | |
cdef: | |
# network to teach | |
public ActivationNetwork _network | |
# learning rate | |
double _learning_rate | |
# momentum | |
double _momentum | |
int _repeat_learning | |
double _weight_decrease | |
# neuron's errors | |
double ** _neuron_errors | |
# weight's updates | |
double *** _weights_updates | |
# threshold's updates | |
double ** _thresholds_updates | |
property learning_rate: | |
def __get__(self): | |
return self._learning_rate | |
def __set__(self, value): | |
self._learning_rate = max( 0.0, min( 1.0, value ) ) | |
property momentum: | |
def __get__(self): | |
return self._momentum | |
def __set__(self, value): | |
self._momentum = max( 0.0, min( 1.0, value ) ) | |
property repeat_learning: | |
def __get__(self): | |
return self._repeat_learning | |
def __set__(self, value): | |
self._repeat_learning = max( 0.0, value ) | |
property weight_decrease: | |
def __get__(self): | |
return self._weight_decrease | |
def __set__(self, value): | |
self._weight_decrease = max( 0.0, min( 1.0, value ) ) | |
def __init__(self, ActivationNetwork network, double momentum = 0, double learning_rate = 1, | |
int repeat_learning = 1, double weight_decrease = 0): | |
self._network = network | |
self.momentum = momentum | |
self.learning_rate = learning_rate | |
self.repeat_learning = repeat_learning | |
self.weight_decrease = weight_decrease | |
# create error and deltas arrays | |
self._neuron_errors = <double **> malloc( sizeof(double *) * network.layers_count) | |
self._weights_updates = <double ***> malloc( sizeof(double **) * network.layers_count) | |
self._thresholds_updates = <double **> malloc( sizeof(double *) * network.layers_count) | |
# initialize errors and deltas arrays for each layer | |
cdef: | |
Layer layer | |
int i, j, k | |
int layer_neoruns | |
for i in range(network.layers_count): | |
layer = network.layers[i] | |
layer_neoruns = layer.neurons_count | |
self._neuron_errors[i] = <double *> malloc( sizeof(double) * layer_neoruns) | |
self._weights_updates[i] = <double **> malloc( sizeof(double *) * layer_neoruns) | |
self._thresholds_updates[i] = <double *> malloc( sizeof(double) * layer_neoruns) | |
# for each neuron | |
for j in range(layer_neoruns): | |
self._neuron_errors[i][j] = 0. | |
self._thresholds_updates[i][j] = 0. | |
self._weights_updates[i][j] = <double *> malloc( sizeof(double) * layer.inputs_count) | |
for k in range(layer.inputs_count): | |
self._weights_updates[i][j][k] = 0. | |
def __dealloc__(BackPropagationLearning self): | |
free(self._neuron_errors) | |
free(self._weights_updates) | |
free(self._thresholds_updates) | |
@cython.profile(False) | |
cpdef double run(self, double[:] inp, double[:] output, int repeat = 1) except *: | |
cdef: | |
double error = 0 | |
int i | |
for i in range(repeat): | |
# compute the network's output | |
self._network.compute( inp ) | |
# calculate network error | |
error = self.calculate_error( output ) | |
# calculate weights updates | |
self.calculate_updates( inp ) | |
# update the network | |
self.update_network() | |
return error | |
@cython.profile(False) | |
cpdef double run_epoch(self, double[:,:] inp, double[:,:] output, int epoch_count=1) except *: | |
cdef: | |
double error = 0.0 | |
int i, sample, j | |
int inp_len = len(inp) | |
int run_count = inp_len / self.repeat_learning | |
# run learning procedure | |
for j in range(epoch_count): | |
error = 0.0 | |
for i in range(run_count): | |
sample = np.random.randint(inp_len) if self.repeat_learning > 1 else i | |
error += self.run( inp[sample], output[sample], self.repeat_learning) | |
# return summary error | |
return error | |
cpdef double calculate_error(self, double[:] desired_output): | |
cdef: | |
# current and the next layers | |
Layer layer, layer_next | |
Neuron n1, n2 | |
# current and the next errors arrays | |
double * errors | |
double * errors_next | |
# error values | |
double error = 0, e, err_sum | |
# neuron's output value | |
double output | |
# layers count | |
int layers_count = self._network.layers_count | |
int i=0, j=0, k=0 | |
# assume, that all neurons of the network have the same activation function | |
IActivationFunction function = (self._network.layers[0].neurons[0]).activation_function | |
# calculate error values for the last layer first | |
layer = self._network.layers[layers_count - 1] | |
errors = self._neuron_errors[layers_count - 1] | |
for n1 in layer.neurons: | |
output = n1.output | |
# error of the neuron | |
e = desired_output[i] - output | |
# error multiplied with activation function's derivative | |
errors[i] = e * function.derivative2( output ) | |
# squre the error and sum it | |
error += ( e * e ) | |
i += 1 | |
# calculate error values for other layers | |
for j in range(layers_count - 2, -1, -1): | |
layer = self._network.layers[j] | |
layer_next = self._network.layers[j + 1] | |
errors = self._neuron_errors[j] | |
errors_next = self._neuron_errors[j + 1] | |
i = 0 | |
# for all neurons of the layer | |
for n1 in layer.neurons: | |
err_sum = 0.0 | |
k = 0 | |
# for all neurons of the next layer | |
for n2 in layer_next.neurons: | |
err_sum += errors_next[k] * n2.weights[i] | |
k += 1 | |
errors[i] = err_sum * function.derivative2( n1.output ) | |
i += 1 | |
# return squared error of the last layer divided by 2 | |
return error / 2.0 | |
cpdef double calculate_updates(self, double[:] inp) except *: | |
cdef: | |
# current neuron | |
Neuron neuron, n2 | |
# current and previous layers | |
Layer layer, layer_prev | |
# layer's weights updates | |
double ** layer_weights_updates | |
# layer's thresholds updates | |
double * layer_threshold_updates | |
# layer's error | |
double * errors | |
# neuron's weights updates | |
double * neuron_weight_updates | |
# 1 - calculate updates for the first layer | |
layer = self._network.layers[0] | |
errors = self._neuron_errors[0] | |
layer_weights_updates = self._weights_updates[0] | |
layer_threshold_updates = self._thresholds_updates[0] | |
# cache for frequently used values | |
cdef: | |
double cached_momentum = self._learning_rate * self._momentum | |
double cached1m_momentum = self._learning_rate * ( 1 - self._momentum ) | |
double cached_error | |
int i=0, j, k | |
# for each neuron of the first layer | |
for neuron in layer.neurons: | |
cached_error = errors[i] * cached1m_momentum | |
neuron_weight_updates = layer_weights_updates[i] | |
# for each weight of the neuron | |
for j in range(neuron.inputs_count): | |
# calculate weight update | |
neuron_weight_updates[j] = cached_momentum * neuron_weight_updates[j] + cached_error * inp[j] | |
# calculate treshold update | |
layer_threshold_updates[i] = cached_momentum * layer_threshold_updates[i] + cached_error | |
i += 1 | |
# 2 - for all other layers | |
for k in range(1, self._network.layers_count): | |
layer_prev = self._network.layers[k - 1] | |
layer = self._network.layers[k] | |
errors = self._neuron_errors[k] | |
layer_weights_updates = self._weights_updates[k] | |
layer_threshold_updates = self._thresholds_updates[k] | |
i = 0 | |
# for each neuron of the layer | |
for neuron in layer.neurons: | |
cached_error = errors[i] * cached1m_momentum | |
neuron_weight_updates = layer_weights_updates[i] | |
j = 0 | |
# for each synapse of the neuron | |
for n2 in layer_prev.neurons: | |
# calculate weight update | |
neuron_weight_updates[j] = cached_momentum * neuron_weight_updates[j] + cached_error * n2.output | |
j += 1 | |
# calculate treshold update | |
layer_threshold_updates[i] = cached_momentum * layer_threshold_updates[i] + cached_error | |
i += 1 | |
cpdef update_network(self): | |
cdef: | |
# current neuron | |
ActivationNeuron neuron | |
# current layer | |
Layer layer | |
# layer's weights updates | |
double ** layer_weights_updates | |
# layer's thresholds updates | |
double * layer_threshold_updates | |
# neuron's weights updates | |
double * neuron_weight_updates | |
int i=0, j=0, k=0 | |
double weight_dercease = 1 - self._weight_decrease | |
# for each layer of the network\ | |
for layer in self._network.layers: | |
layer_weights_updates = self._weights_updates[i] | |
layer_threshold_updates = self._thresholds_updates[i] | |
j = 0 | |
# for each neuron of the layer | |
for neuron in layer.neurons: | |
neuron_weight_updates = layer_weights_updates[j] | |
# for each weight of the neuron | |
for k in range(neuron.inputs_count): | |
# update weight | |
neuron.weights[k] = weight_dercease * neuron.weights[k] + neuron_weight_updates[k] | |
# update treshold | |
neuron.threshold += layer_threshold_updates[j] | |
j += 1 | |
i += 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import matplotlib.pyplot as plt | |
import pyforge as pf | |
import numpy as np | |
plt.ion() | |
plt.subplot(2, 1, 1) | |
func_plot, = plt.plot([], []) | |
pred_plot, = plt.plot([], []) | |
plt.title('Function graph') | |
plt.ylabel('Y') | |
plt.xlabel('X') | |
plt.subplot(2, 1, 2) | |
err_plot, = plt.plot([], [], 'r.-') | |
plt.xlabel('Epoche') | |
plt.ylabel('MSE') | |
def update_line(hl, x, y, replace=False): | |
if replace: | |
hl.set_xdata(x) | |
hl.set_ydata(y) | |
else: | |
hl.set_xdata(np.append(hl.get_xdata(), x)) | |
hl.set_ydata(np.append(hl.get_ydata(), y)) | |
hl.axes.relim() | |
hl.axes.autoscale() | |
plt.draw() | |
# Create data | |
func = lambda z: z * math.sin(z) / 10 | |
inp = np.array([[x] for x in np.arange(0, 10.25, 0.25)]) | |
output = np.array([[func(*x)] for x in inp]) | |
update_line(func_plot, [x[0] for x in inp], [y[0] for y in output], replace=True) | |
# Create network and trainer | |
nn = pf.ActivationNetwork(pf.TanhActivationFunction(0.1), 1, 5, 1) | |
trainer = pf.BackPropagationLearning(nn) | |
trainer.learning_rate = 1 | |
trainer.momentum = 0.3 | |
trainer.repeat_learning = 1 | |
trainer.weight_decrease = 0 | |
# Train it | |
for i in range(500): | |
for j in range(100): | |
trainer.run_epoch(inp, output) | |
update_line(err_plot, i, nn.get_error_mse(inp, output)) | |
update_line(pred_plot, [x[0] for x in inp], [nn.compute(x)[0] for x in inp], replace=True) | |
plt.ioff() | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment