Created
April 18, 2017 22:43
Revisions
-
ijkilchenko created this gist
Apr 18, 2017 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,210 @@ from sklearn.datasets import make_classification from sklearn.cross_validation import train_test_split from itertools import product import numpy as np import pandas as pd class NeuralNetwork: g = np.vectorize(lambda z: 1 / (1 + np.exp(-z))) log = np.vectorize(lambda x: np.log(x)) def __init__(self, test=False): self.test = test # Are we in test mode? def fit(self, X, y, architecture, num_iter=100, alpha=0.01, animate=False): """architecture: defines the structure of our neural network and also encodes the number of classes. num_iter: number of iterations of gradient descent. alpha: gradient descent learning rate. """ self.L = len(architecture)+1 self.K = architecture[-1] self.d = X.shape[1] self.arch = [self.d, *architecture] self.Ws = [] for i in range(self.L - 1): if self.test: np.random.seed(2016) # +1 for the bias neuron. W = np.random.rand(self.arch[i] + 1, self.arch[i + 1]) else: np.random.seed() # +1 for the bias neuron. W = np.random.rand(self.arch[i] + 1, self.arch[i + 1]) self.Ws.append(W) self.n = X.shape[0] self.all_ones = pd.Series([1]*self.n) X = X.copy() # Do not modify the data set. X.reset_index(drop=True, inplace=True) self.X = pd.concat([self.all_ones, X], axis=1) self.Y = pd.get_dummies(y).reset_index(drop=True) if animate: grids = [] # For the animation of the regions. grid_x = np.linspace(-3, 3, 50) grid_y = np.linspace(-3, 3, 50) grid = pd.DataFrame(list(product(grid_x, grid_y))) grid.columns = ['x', 'y'] accuracies = [] # Gradient descent. if self.test: num_iter = 2 for i in range(num_iter): if not self.test and i % 10 == 0: acc = self.score(X, y) # Print the actual loss every 10 iterations. print('Iteration: %i. Loss: %0.2f. Accuracy: %0.5f' % (i, self._calc_loss(self.Ws), acc)) if animate: grid_predictions = self.predict(grid) #grid_predictions = np.random.random(grid.shape[0]) grids.append(grid_predictions) accuracies.append(acc) jacs = self._calc_dLoss_dWs() # Matrix of partial derivatives. if self.test: # Numerically approximate the partial derivatives. jacs1 = self._num_calc_dLoss_dWs() for l in range(len(jacs)): assert np.linalg.norm(jacs[l] - jacs1[l]) < 1e-3 for j, jac in enumerate(jacs): self.Ws[j] -= alpha * jac # Update rule. if animate: return grid, grids, accuracies def _calc_loss(self, Ws): # Extract the last set of activations (i.e. hypotheses) A = self.forward_pass(self.X, Ws)[0][-1] # Compute cost function (log loss). J = -(np.multiply(self.Log(1-A), 1-self.Y) + np.multiply(self.Log(A), self.Y)) loss = J.sum().sum() return loss def score(self, X, y): y_predictions = self.predict(X) is_correct = [y_pred == y_true for y_pred, y_true in zip(y_predictions, list(y))] num_correct = len([i for i in is_correct if i]) return num_correct/len(y) def predict(self, X): H = self.predict_proba(X) y = [] for row in range(H.shape[0]): proba = H.iloc[row,:].tolist() y.append(proba.index(max(proba))) return y def predict_proba(self, X): all_ones = pd.Series([1]*X.shape[0]) X = X.copy() # Do not modify the data set. X.reset_index(drop=True, inplace=True) X = pd.concat([all_ones, X], axis=1) H = self.forward_pass(X, self.Ws)[0][-1] return H def forward_pass(self, X, Ws): # Compute all activations for each layer As = [] # Activations. Zs = [] # Values before the activation function is applied. A = X As.append(A) for i in range(self.L - 1): Z = np.dot(A, Ws[i]) A = pd.DataFrame(self.G(Z)) all_ones = pd.Series([1] * A.shape[0]) if i != self.L - 2: # Add bias units for all except the last layer. A = pd.concat([all_ones, A], axis=1) As.append(A) Zs.append(Z) return As, Zs def _num_calc_dLoss_dWs(self): # Gradient checking. jacs = [] for l in range(self.L-1): # Sum derivatives for each example. dx = 1e-5 jac = np.zeros(self.Ws[l].shape) for i in range(self.Ws[l].shape[0]): for j in range(self.Ws[l].shape[1]): loss = self._calc_loss(self.Ws) Ws1 = [W.copy() for W in self.Ws] Ws1[l][i,j] += dx loss1 = self._calc_loss(Ws1) rise = loss1 - loss jac[i,j] = rise/dx jacs.append(jac) return jacs def _calc_dLoss_dWs(self): # Compute all partial derivatives for each layer. Ds = [] # Error terms (one for each z). As, Zs = self.forward_pass(self.X, self.Ws) D = As[-1] - self.Y # Errors in the last layer. Ds.append(D) for i in reversed(range(1, self.L-1)): # Backpropagate. # Expression for the Error term for all but the last layer. D = np.multiply(np.dot(Ds[-1], self.Ws[i].T)[:,1:], np.multiply(self.G(Zs[i-1]), 1-self.G(Zs[i-1]))) Ds.append(pd.DataFrame(D)) Ds = Ds[::-1] # Reverse the list (since we were appending). jacs = [] # Jacobian matrixes (matrixes of partial derivatives). for i in range(self.L-1): # Sum derivatives for each example. jac = np.zeros(self.Ws[i].shape) # TODO: Vectorize over the examples. for j in range(self.n): activations_col = As[i].iloc[j,:].T.reshape((As[i].shape[1],1)) errors_row = Ds[i].iloc[j,:].reshape((1,Ds[i].shape[1])) # Partial derivatives for this example and this layer. outer_prod = np.dot(activations_col, errors_row) jac += outer_prod jacs.append(jac) return jacs @classmethod def G(cls, z): return cls.g(z) @classmethod def Log(cls, x): return cls.log(x) def test(): # Test the NeuralNetwork with gradient checking. X, y = make_classification(n_samples=50, n_features=2, n_redundant=0, n_repeated=0, n_classes=2, n_clusters_per_class=1, random_state=206) X = pd.DataFrame(X) y = pd.Series(y) model = NeuralNetwork(test=True) model.fit(X, y, [3, 2]) if __name__ == '__main__': test() # Do gradient checking (did we write down the derivative correctly?) X, y = make_classification(n_samples=50, n_features=2, n_redundant=0, n_repeated=0, n_classes=2, n_clusters_per_class=2, random_state=2019) X = pd.DataFrame(X) y = pd.Series(y) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=42) model = NeuralNetwork() model.fit(X_train, y_train, architecture=[3, 2], animate=True) print('Accuracy after training (train): %0.2f' % model.score(X_train, y_train)) print('Accuracy after training (test): %0.2f' % model.score(X_test, y_test))