Skip to content

Instantly share code, notes, and snippets.

@c58
Last active August 29, 2015 14:00
Show Gist options
  • Save c58/11270040 to your computer and use it in GitHub Desktop.
Save c58/11270040 to your computer and use it in GitHub Desktop.
AForge.NET port (Basic Neural Network) with improovements to CYthon
# encoding: utf-8
# cython: profile=True
# cython: wraparound=False
# cython: boundscheck=False
# cython: infer_types=True
# filename: pyforge.pyx
from libc.stdlib cimport malloc, free, rand
cimport cython
from cython.view cimport array
cimport libc.math as cmath
import numpy as np
cimport numpy as np
##################################
##
## ACTIVATION
##
cdef class IActivationFunction(object):
cpdef double function(self, double x):
raise NotImplementedError()
cpdef double derivative(self, double x) except *:
raise NotImplementedError()
cpdef double derivative2(self, double y) except *:
raise NotImplementedError()
def __repr__(self):
return "No repr :("
cdef class TanhActivationFunction(IActivationFunction):
cdef:
public double beta
def __init__(self, double beta = 1):
self.beta = beta
@cython.profile(False)
cpdef double function(self, double x):
return cmath.tanh(self.beta * x)
cpdef double derivative(self, double x):
cdef double f = self.function(x)
return self.beta * (1 - f * f)
@cython.profile(False)
cpdef double derivative2(self, double y):
return self.beta * (1 - y * y)
def __repr__(self):
return "tanh(beta * x)"
##################################
##
## NEURON
##
cdef class Neuron(object):
cdef:
public int inputs_count
public double[:] weights
public double output
public tuple rand_range
def __init__(self, int inputs):
# allocate weights
self.inputs_count = max(1, inputs)
self.weights = array(shape=(self.inputs_count,), itemsize=sizeof(double), format="d")
self.rand_range = (-1.0, 1.0)
self.output = 0
# randomize the neuron
self.randomize()
cpdef randomize(self):
# randomize weights
cdef int i
for i in range(self.inputs_count):
self.weights[i] = np.random.uniform(self.rand_range[0], self.rand_range[1])
cpdef double compute(self, double[:] inp) except *:
raise NotImplementedError()
def __repr__(self):
return "\n\t\t\tOutput: "+str(self.output)+"; " \
"Inputs count: "+str(self.inputs_count)+"; " \
"Random range: "+str(self.rand_range)+"; " \
"Weights: "+str(np.array(self.weights))+"; "
cdef class ActivationNeuron(Neuron):
cdef:
public IActivationFunction activation_function
public double threshold
def __init__(self, int inputs, IActivationFunction function):
Neuron.__init__(self, inputs)
self.activation_function = function
cpdef randomize(self):
Neuron.randomize(self)
self.threshold = np.random.uniform(self.rand_range[0], self.rand_range[1])
cpdef double compute(self, double[:] inp) except *:
cdef:
double res_summ = 0.0
int i
for i in range(self.inputs_count):
res_summ += self.weights[i] * inp[i]
res_summ += self.threshold
self.output = self.activation_function.function(res_summ)
return self.output
def __repr__(self):
res = Neuron.__repr__(self)
return res + \
"Threshold: " + str(self.threshold)+"; " \
"Act. func: " + str(self.activation_function)
##################################
##
## LAYER
##
cdef class Layer(object):
cdef:
public int inputs_count
public int neurons_count
public list neurons
public double[:] output
def __init__(self, int neurons_count, int inputs_count):
self.inputs_count = max(1, inputs_count)
self.neurons_count = max(1, neurons_count)
self.neurons = list()
self.output = array(shape=(self.neurons_count,), itemsize=sizeof(double), format="d")
cpdef double[:] compute(self, double[:] inp):
cdef:
Neuron n
int i = 0
for n in self.neurons:
self.output[i] = n.compute( inp )
i += 1
return self.output
cpdef randomize(self):
cdef Neuron n
for n in self.neurons:
n.randomize()
def __repr__(self):
return "\t\tOutput: "+str(np.array(self.output))+"\n" \
"\t\tNeurons count: "+str(len(self.neurons))+"\n" \
"\t\tNeurons: "+str(self.neurons)+"\n"
cdef class ActivationLayer(Layer):
def __init__(self, int neurons_count, int inputs_count, IActivationFunction function ):
Layer.__init__(self, neurons_count, inputs_count)
for i in range(neurons_count):
self.neurons.append(ActivationNeuron( inputs_count, function ))
cpdef set_activation_function(self, IActivationFunction function ):
for n in self.neurons:
n.activation_function = function
##################################
##
## NETWORK
##
cdef class Network(object):
cdef:
public int layers_count
public list layers
public double[:] output
def __init__(self, int layers_count):
self.layers_count = layers_count
self.layers = list()
cpdef double[:] compute(self, double[:] inp):
cdef:
double[:] output = inp
Layer l
for l in self.layers:
output = l.compute(output)
self.output = output
return output
cpdef randomize(self):
cdef int i
for i in range(self.layers_count):
self.layers[i].randomize()
cpdef get_error_mse(self, double[:,:] inp, double[:,:] output):
cdef:
double err_sum = 0
int i
for i in range(len(inp)):
err_sum += abs(self.compute(inp[i])[0] - output[i][0])
err_sum /= len(inp)
return err_sum
cpdef get_error_sign(self, double[:,:] inp, double[:,:] output):
cdef:
double err_sum = 0, value
int i
for i in range(len(inp)):
value = self.compute(inp[i])[0]
err_sum += 1 if (output[i][0] > 0 and value > 0) or (output[i][0] < 0 and value < 0) else 0
err_sum /= len(inp)
return 1 - err_sum
def __repr__(self):
return "NETWORK:\n" \
"\tOutput: "+str(np.array(self.output))+"\n" \
"\tLayers count: "+str(len(self.layers))+"\n" \
"\tLayers: \n"+str(self.layers)
cdef class ActivationNetwork(Network):
def __init__(self, IActivationFunction func, int inputs_count, *args):
Network.__init__(self, len(args))
cdef int i
for i in range(0, self.layers_count):
self.layers.append(ActivationLayer(
# neurons count in the layer
args[i],
# inputs count of the layer
inputs_count if i == 0 else args[i-1],
# activation function of the layer
func
))
cpdef set_activation_function(self, IActivationFunction func):
for l in self.layers:
l.set_activation_function(func)
##################################
##
## LEARNING
##
cdef class ISupervisedLearning(object):
cpdef double run(self, double[:] inp, double[:] output ) except *:
raise NotImplementedError()
cpdef double run_epoch(self, double[:,:] inp, double[:,:] output, int epoch_count=1) except *:
raise NotImplementedError()
cdef class BackPropagationLearning(ISupervisedLearning):
cdef:
# network to teach
public ActivationNetwork _network
# learning rate
double _learning_rate
# momentum
double _momentum
int _repeat_learning
double _weight_decrease
# neuron's errors
double ** _neuron_errors
# weight's updates
double *** _weights_updates
# threshold's updates
double ** _thresholds_updates
property learning_rate:
def __get__(self):
return self._learning_rate
def __set__(self, value):
self._learning_rate = max( 0.0, min( 1.0, value ) )
property momentum:
def __get__(self):
return self._momentum
def __set__(self, value):
self._momentum = max( 0.0, min( 1.0, value ) )
property repeat_learning:
def __get__(self):
return self._repeat_learning
def __set__(self, value):
self._repeat_learning = max( 0.0, value )
property weight_decrease:
def __get__(self):
return self._weight_decrease
def __set__(self, value):
self._weight_decrease = max( 0.0, min( 1.0, value ) )
def __init__(self, ActivationNetwork network, double momentum = 0, double learning_rate = 1,
int repeat_learning = 1, double weight_decrease = 0):
self._network = network
self.momentum = momentum
self.learning_rate = learning_rate
self.repeat_learning = repeat_learning
self.weight_decrease = weight_decrease
# create error and deltas arrays
self._neuron_errors = <double **> malloc( sizeof(double *) * network.layers_count)
self._weights_updates = <double ***> malloc( sizeof(double **) * network.layers_count)
self._thresholds_updates = <double **> malloc( sizeof(double *) * network.layers_count)
# initialize errors and deltas arrays for each layer
cdef:
Layer layer
int i, j, k
int layer_neoruns
for i in range(network.layers_count):
layer = network.layers[i]
layer_neoruns = layer.neurons_count
self._neuron_errors[i] = <double *> malloc( sizeof(double) * layer_neoruns)
self._weights_updates[i] = <double **> malloc( sizeof(double *) * layer_neoruns)
self._thresholds_updates[i] = <double *> malloc( sizeof(double) * layer_neoruns)
# for each neuron
for j in range(layer_neoruns):
self._neuron_errors[i][j] = 0.
self._thresholds_updates[i][j] = 0.
self._weights_updates[i][j] = <double *> malloc( sizeof(double) * layer.inputs_count)
for k in range(layer.inputs_count):
self._weights_updates[i][j][k] = 0.
def __dealloc__(BackPropagationLearning self):
free(self._neuron_errors)
free(self._weights_updates)
free(self._thresholds_updates)
@cython.profile(False)
cpdef double run(self, double[:] inp, double[:] output, int repeat = 1) except *:
cdef:
double error = 0
int i
for i in range(repeat):
# compute the network's output
self._network.compute( inp )
# calculate network error
error = self.calculate_error( output )
# calculate weights updates
self.calculate_updates( inp )
# update the network
self.update_network()
return error
@cython.profile(False)
cpdef double run_epoch(self, double[:,:] inp, double[:,:] output, int epoch_count=1) except *:
cdef:
double error = 0.0
int i, sample, j
int inp_len = len(inp)
int run_count = inp_len / self.repeat_learning
# run learning procedure
for j in range(epoch_count):
error = 0.0
for i in range(run_count):
sample = np.random.randint(inp_len) if self.repeat_learning > 1 else i
error += self.run( inp[sample], output[sample], self.repeat_learning)
# return summary error
return error
cpdef double calculate_error(self, double[:] desired_output):
cdef:
# current and the next layers
Layer layer, layer_next
Neuron n1, n2
# current and the next errors arrays
double * errors
double * errors_next
# error values
double error = 0, e, err_sum
# neuron's output value
double output
# layers count
int layers_count = self._network.layers_count
int i=0, j=0, k=0
# assume, that all neurons of the network have the same activation function
IActivationFunction function = (self._network.layers[0].neurons[0]).activation_function
# calculate error values for the last layer first
layer = self._network.layers[layers_count - 1]
errors = self._neuron_errors[layers_count - 1]
for n1 in layer.neurons:
output = n1.output
# error of the neuron
e = desired_output[i] - output
# error multiplied with activation function's derivative
errors[i] = e * function.derivative2( output )
# squre the error and sum it
error += ( e * e )
i += 1
# calculate error values for other layers
for j in range(layers_count - 2, -1, -1):
layer = self._network.layers[j]
layer_next = self._network.layers[j + 1]
errors = self._neuron_errors[j]
errors_next = self._neuron_errors[j + 1]
i = 0
# for all neurons of the layer
for n1 in layer.neurons:
err_sum = 0.0
k = 0
# for all neurons of the next layer
for n2 in layer_next.neurons:
err_sum += errors_next[k] * n2.weights[i]
k += 1
errors[i] = err_sum * function.derivative2( n1.output )
i += 1
# return squared error of the last layer divided by 2
return error / 2.0
cpdef double calculate_updates(self, double[:] inp) except *:
cdef:
# current neuron
Neuron neuron, n2
# current and previous layers
Layer layer, layer_prev
# layer's weights updates
double ** layer_weights_updates
# layer's thresholds updates
double * layer_threshold_updates
# layer's error
double * errors
# neuron's weights updates
double * neuron_weight_updates
# 1 - calculate updates for the first layer
layer = self._network.layers[0]
errors = self._neuron_errors[0]
layer_weights_updates = self._weights_updates[0]
layer_threshold_updates = self._thresholds_updates[0]
# cache for frequently used values
cdef:
double cached_momentum = self._learning_rate * self._momentum
double cached1m_momentum = self._learning_rate * ( 1 - self._momentum )
double cached_error
int i=0, j, k
# for each neuron of the first layer
for neuron in layer.neurons:
cached_error = errors[i] * cached1m_momentum
neuron_weight_updates = layer_weights_updates[i]
# for each weight of the neuron
for j in range(neuron.inputs_count):
# calculate weight update
neuron_weight_updates[j] = cached_momentum * neuron_weight_updates[j] + cached_error * inp[j]
# calculate treshold update
layer_threshold_updates[i] = cached_momentum * layer_threshold_updates[i] + cached_error
i += 1
# 2 - for all other layers
for k in range(1, self._network.layers_count):
layer_prev = self._network.layers[k - 1]
layer = self._network.layers[k]
errors = self._neuron_errors[k]
layer_weights_updates = self._weights_updates[k]
layer_threshold_updates = self._thresholds_updates[k]
i = 0
# for each neuron of the layer
for neuron in layer.neurons:
cached_error = errors[i] * cached1m_momentum
neuron_weight_updates = layer_weights_updates[i]
j = 0
# for each synapse of the neuron
for n2 in layer_prev.neurons:
# calculate weight update
neuron_weight_updates[j] = cached_momentum * neuron_weight_updates[j] + cached_error * n2.output
j += 1
# calculate treshold update
layer_threshold_updates[i] = cached_momentum * layer_threshold_updates[i] + cached_error
i += 1
cpdef update_network(self):
cdef:
# current neuron
ActivationNeuron neuron
# current layer
Layer layer
# layer's weights updates
double ** layer_weights_updates
# layer's thresholds updates
double * layer_threshold_updates
# neuron's weights updates
double * neuron_weight_updates
int i=0, j=0, k=0
double weight_dercease = 1 - self._weight_decrease
# for each layer of the network\
for layer in self._network.layers:
layer_weights_updates = self._weights_updates[i]
layer_threshold_updates = self._thresholds_updates[i]
j = 0
# for each neuron of the layer
for neuron in layer.neurons:
neuron_weight_updates = layer_weights_updates[j]
# for each weight of the neuron
for k in range(neuron.inputs_count):
# update weight
neuron.weights[k] = weight_dercease * neuron.weights[k] + neuron_weight_updates[k]
# update treshold
neuron.threshold += layer_threshold_updates[j]
j += 1
i += 1
import math
import matplotlib.pyplot as plt
import pyforge as pf
import numpy as np
plt.ion()
plt.subplot(2, 1, 1)
func_plot, = plt.plot([], [])
pred_plot, = plt.plot([], [])
plt.title('Function graph')
plt.ylabel('Y')
plt.xlabel('X')
plt.subplot(2, 1, 2)
err_plot, = plt.plot([], [], 'r.-')
plt.xlabel('Epoche')
plt.ylabel('MSE')
def update_line(hl, x, y, replace=False):
if replace:
hl.set_xdata(x)
hl.set_ydata(y)
else:
hl.set_xdata(np.append(hl.get_xdata(), x))
hl.set_ydata(np.append(hl.get_ydata(), y))
hl.axes.relim()
hl.axes.autoscale()
plt.draw()
# Create data
func = lambda z: z * math.sin(z) / 10
inp = np.array([[x] for x in np.arange(0, 10.25, 0.25)])
output = np.array([[func(*x)] for x in inp])
update_line(func_plot, [x[0] for x in inp], [y[0] for y in output], replace=True)
# Create network and trainer
nn = pf.ActivationNetwork(pf.TanhActivationFunction(0.1), 1, 5, 1)
trainer = pf.BackPropagationLearning(nn)
trainer.learning_rate = 1
trainer.momentum = 0.3
trainer.repeat_learning = 1
trainer.weight_decrease = 0
# Train it
for i in range(500):
for j in range(100):
trainer.run_epoch(inp, output)
update_line(err_plot, i, nn.get_error_mse(inp, output))
update_line(pred_plot, [x[0] for x in inp], [nn.compute(x)[0] for x in inp], replace=True)
plt.ioff()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment