Skip to content

Instantly share code, notes, and snippets.

@ijkilchenko
Created April 18, 2017 22:43
Show Gist options
  • Save ijkilchenko/75d9d82b049f1b242c658e8a4a9aafae to your computer and use it in GitHub Desktop.
Save ijkilchenko/75d9d82b049f1b242c658e8a4a9aafae to your computer and use it in GitHub Desktop.
from sklearn.datasets import make_classification
from sklearn.cross_validation import train_test_split
from itertools import product
import numpy as np
import pandas as pd
class NeuralNetwork:
g = np.vectorize(lambda z: 1 / (1 + np.exp(-z)))
log = np.vectorize(lambda x: np.log(x))
def __init__(self, test=False):
self.test = test # Are we in test mode?
def fit(self, X, y, architecture, num_iter=100, alpha=0.01, animate=False):
"""architecture: defines the structure of our neural network
and also encodes the number of classes.
num_iter: number of iterations of gradient descent.
alpha: gradient descent learning rate.
"""
self.L = len(architecture)+1
self.K = architecture[-1]
self.d = X.shape[1]
self.arch = [self.d, *architecture]
self.Ws = []
for i in range(self.L - 1):
if self.test:
np.random.seed(2016)
# +1 for the bias neuron.
W = np.random.rand(self.arch[i] + 1, self.arch[i + 1])
else:
np.random.seed()
# +1 for the bias neuron.
W = np.random.rand(self.arch[i] + 1, self.arch[i + 1])
self.Ws.append(W)
self.n = X.shape[0]
self.all_ones = pd.Series([1]*self.n)
X = X.copy() # Do not modify the data set.
X.reset_index(drop=True, inplace=True)
self.X = pd.concat([self.all_ones, X], axis=1)
self.Y = pd.get_dummies(y).reset_index(drop=True)
if animate:
grids = [] # For the animation of the regions.
grid_x = np.linspace(-3, 3, 50)
grid_y = np.linspace(-3, 3, 50)
grid = pd.DataFrame(list(product(grid_x, grid_y)))
grid.columns = ['x', 'y']
accuracies = []
# Gradient descent.
if self.test:
num_iter = 2
for i in range(num_iter):
if not self.test and i % 10 == 0:
acc = self.score(X, y)
# Print the actual loss every 10 iterations.
print('Iteration: %i. Loss: %0.2f. Accuracy: %0.5f' %
(i, self._calc_loss(self.Ws), acc))
if animate:
grid_predictions = self.predict(grid)
#grid_predictions = np.random.random(grid.shape[0])
grids.append(grid_predictions)
accuracies.append(acc)
jacs = self._calc_dLoss_dWs() # Matrix of partial derivatives.
if self.test:
# Numerically approximate the partial derivatives.
jacs1 = self._num_calc_dLoss_dWs()
for l in range(len(jacs)):
assert np.linalg.norm(jacs[l] - jacs1[l]) < 1e-3
for j, jac in enumerate(jacs):
self.Ws[j] -= alpha * jac # Update rule.
if animate:
return grid, grids, accuracies
def _calc_loss(self, Ws):
# Extract the last set of activations (i.e. hypotheses)
A = self.forward_pass(self.X, Ws)[0][-1]
# Compute cost function (log loss).
J = -(np.multiply(self.Log(1-A), 1-self.Y) +
np.multiply(self.Log(A), self.Y))
loss = J.sum().sum()
return loss
def score(self, X, y):
y_predictions = self.predict(X)
is_correct = [y_pred == y_true for y_pred, y_true in
zip(y_predictions, list(y))]
num_correct = len([i for i in is_correct if i])
return num_correct/len(y)
def predict(self, X):
H = self.predict_proba(X)
y = []
for row in range(H.shape[0]):
proba = H.iloc[row,:].tolist()
y.append(proba.index(max(proba)))
return y
def predict_proba(self, X):
all_ones = pd.Series([1]*X.shape[0])
X = X.copy() # Do not modify the data set.
X.reset_index(drop=True, inplace=True)
X = pd.concat([all_ones, X], axis=1)
H = self.forward_pass(X, self.Ws)[0][-1]
return H
def forward_pass(self, X, Ws): # Compute all activations for each layer
As = [] # Activations.
Zs = [] # Values before the activation function is applied.
A = X
As.append(A)
for i in range(self.L - 1):
Z = np.dot(A, Ws[i])
A = pd.DataFrame(self.G(Z))
all_ones = pd.Series([1] * A.shape[0])
if i != self.L - 2: # Add bias units for all except the last layer.
A = pd.concat([all_ones, A], axis=1)
As.append(A)
Zs.append(Z)
return As, Zs
def _num_calc_dLoss_dWs(self): # Gradient checking.
jacs = []
for l in range(self.L-1):
# Sum derivatives for each example.
dx = 1e-5
jac = np.zeros(self.Ws[l].shape)
for i in range(self.Ws[l].shape[0]):
for j in range(self.Ws[l].shape[1]):
loss = self._calc_loss(self.Ws)
Ws1 = [W.copy() for W in self.Ws]
Ws1[l][i,j] += dx
loss1 = self._calc_loss(Ws1)
rise = loss1 - loss
jac[i,j] = rise/dx
jacs.append(jac)
return jacs
def _calc_dLoss_dWs(self): # Compute all partial derivatives for each layer.
Ds = [] # Error terms (one for each z).
As, Zs = self.forward_pass(self.X, self.Ws)
D = As[-1] - self.Y # Errors in the last layer.
Ds.append(D)
for i in reversed(range(1, self.L-1)): # Backpropagate.
# Expression for the Error term for all but the last layer.
D = np.multiply(np.dot(Ds[-1], self.Ws[i].T)[:,1:],
np.multiply(self.G(Zs[i-1]), 1-self.G(Zs[i-1])))
Ds.append(pd.DataFrame(D))
Ds = Ds[::-1] # Reverse the list (since we were appending).
jacs = [] # Jacobian matrixes (matrixes of partial derivatives).
for i in range(self.L-1):
# Sum derivatives for each example.
jac = np.zeros(self.Ws[i].shape)
# TODO: Vectorize over the examples.
for j in range(self.n):
activations_col = As[i].iloc[j,:].T.reshape((As[i].shape[1],1))
errors_row = Ds[i].iloc[j,:].reshape((1,Ds[i].shape[1]))
# Partial derivatives for this example and this layer.
outer_prod = np.dot(activations_col, errors_row)
jac += outer_prod
jacs.append(jac)
return jacs
@classmethod
def G(cls, z):
return cls.g(z)
@classmethod
def Log(cls, x):
return cls.log(x)
def test():
# Test the NeuralNetwork with gradient checking.
X, y = make_classification(n_samples=50,
n_features=2, n_redundant=0, n_repeated=0,
n_classes=2, n_clusters_per_class=1,
random_state=206)
X = pd.DataFrame(X)
y = pd.Series(y)
model = NeuralNetwork(test=True)
model.fit(X, y, [3, 2])
if __name__ == '__main__':
test() # Do gradient checking (did we write down the derivative correctly?)
X, y = make_classification(n_samples=50,
n_features=2, n_redundant=0, n_repeated=0,
n_classes=2, n_clusters_per_class=2,
random_state=2019)
X = pd.DataFrame(X)
y = pd.Series(y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20,
random_state=42)
model = NeuralNetwork()
model.fit(X_train, y_train, architecture=[3, 2], animate=True)
print('Accuracy after training (train): %0.2f' %
model.score(X_train, y_train))
print('Accuracy after training (test): %0.2f' %
model.score(X_test, y_test))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment