Last active
September 18, 2016 17:06
-
-
Save p-i-/9eb0bf6c4960d7e9921908fa4785dcdc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Implements Assignment 3 for Geoffrey Hinton's Neural Networks Course offered through Coursera. | |
| * (Batch)Trains a simple Feedforward Neural Network with Backpropogation, for recognizing USPS handwritten digits. | |
| * Assignment looks into efficient optimization, and into effective regularization. | |
| * Recognizes USPS handwritten digits. | |
| Abstracts classifiers developed in the course into, a more pythonic Sklearn framework. And cleans up a lot of the | |
| given code. | |
| * NOTE Matrix sizes are given as <y> <x> i.e. <#rows> <#cols> | |
| RAISE | |
| """ | |
| import copy | |
| import os | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from numpy.testing import assert_array_equal | |
| from sklearn.base import BaseEstimator | |
| # courseraneuralnet.bla | |
| from utility.utils import loadmat, logistic, log_sum_exp_over_rows | |
| NUM_INPUT_UNITS = 256 | |
| NUM_CLASSES = 10 | |
| __all__ = ['A3Run'] | |
| #@staticmethod | |
| def model_to_theta(model): | |
| """Takes a model (or gradient in model form), and turns it into one long vector. See also theta_to_model.""" | |
| model_copy = copy.deepcopy(model) | |
| return np.hstack((model_copy['inputToHid'].flatten(), model_copy['hidToClass'].flatten())) | |
| #@staticmethod | |
| def theta_to_model(theta): | |
| """Takes a model (or gradient) in the form of one long vector (maybe produced by model_to_theta), | |
| and restores it to the structure format, i.e. with fields .input_to_hid and .hid_to_class, both matrices. | |
| """ | |
| n_hid = int( np.size(theta, 0) / (NUM_INPUT_UNITS + NUM_CLASSES) ) | |
| return { | |
| 'inputToHid': np.reshape( theta[ : NUM_INPUT_UNITS * n_hid ], (n_hid , NUM_INPUT_UNITS) ), | |
| 'hidToClass': np.reshape( theta[ NUM_INPUT_UNITS * n_hid : np.size(theta, 0) ], (NUM_CLASSES, n_hid ) ) | |
| } | |
| class FFNeuralNet(BaseEstimator): | |
| """Implements Feedforward Neural Network from Assignment 3 trained with Backpropagation. | |
| """ | |
| def __init__(self, | |
| training_iters, | |
| validation_data, | |
| wd_coeff=None, | |
| learning_rate=0.02, | |
| n_hid=300, | |
| n_classes=10, | |
| n_input_units=256, | |
| train_momentum=0.9, | |
| mini_batch_size=100, | |
| early_stopping=False): | |
| """Initialize neural network. | |
| Args: | |
| training_iters (int) : number of training iterations | |
| validation_data (dict) : contains 'inputs' and 'targets' data matrices | |
| wd_coeff (float) : weight decay coefficient | |
| learning_rate (float) : learning rate for neural net classifier | |
| n_hid (int) : number of hidden units | |
| n_classes (int) : number of classes | |
| train_momentum (float) : momentum used in training | |
| mini_batch_size (int) : size of training batches | |
| early_stopping (bool) : saves model at validation error minimum | |
| """ | |
| super(FFNeuralNet, self).__init__() | |
| self.n_classes = n_classes | |
| self.wd_coeff = wd_coeff | |
| self.batch_size = mini_batch_size | |
| self.learning_rate = learning_rate | |
| self.n_iterations = training_iters | |
| self.train_momentum = train_momentum | |
| self.early_stopping = early_stopping | |
| self.validation_data = validation_data # used for early stopping | |
| # model result params | |
| self.training_data_losses = [] | |
| self.validation_data_losses = [] | |
| # Model params | |
| self.n_params = (n_input_units + n_classes) * n_hid | |
| self.reset_classifier() | |
| assert_array_equal(self.theta.flatten(), self.theta) | |
| # We don't use random initialization, for this assignment. This way, everybody will get the same results. | |
| def reset_classifier(self): | |
| """Resets the model parameters. | |
| """ | |
| theta = np.transpose(np.column_stack(np.cos(range(self.n_params)))) * 0.1 if self.n_params else np.array([]) | |
| self.model = theta_to_model(theta) | |
| self.theta = model_to_theta(self.model) | |
| self.momentum_speed = self.theta * 0.0 | |
| def train(self, sequences): | |
| """Implements optimize(..) from assignment. This trains using gradient descent with momentum. | |
| Args: | |
| model_shape (tuple) : is the shape of the array of weights. | |
| gradient_function : a function that takes parameters <model> and <data> and returns the gradient | |
| (or approximate gradient in the case of CD-1) of the function that we're maximizing. | |
| Note the contrast with the loss function that we saw in PA3, which we were minimizing. | |
| The returned gradient is an array of the same shape as the provided <model> parameter. | |
| Returns: | |
| (numpy.array) : matrix of weights of the trained model (hid_to_class) | |
| """ | |
| self.reset_classifier() | |
| if self.early_stopping: | |
| best_so_far = dict() | |
| best_so_far['theta'] = None | |
| best_so_far['validationLoss'] = np.inf | |
| best_so_far['afterNIters'] = None | |
| n_training_cases = np.size(sequences['inputs'], 1) | |
| for i in range(self.n_iterations): # π was: xrange | |
| training_batch_start = (i * self.batch_size) % n_training_cases | |
| training_batch_x = sequences['inputs' ][ : , training_batch_start : training_batch_start + self.batch_size ] | |
| training_batch_y = sequences['targets'][ : , training_batch_start : training_batch_start + self.batch_size ] | |
| # Forward-pass batch, get error-deriv, backpropagate deltas for all network weights | |
| gradient = self.calcWeightDeltas(inputs=training_batch_x, targets=training_batch_y) | |
| self.momentum_speed = self.momentum_speed * self.train_momentum - gradient | |
| self.theta += self.momentum_speed * self.learning_rate | |
| self.model = theta_to_model(self.theta) | |
| self.training_data_losses += [self.loss(sequences)] | |
| self.validation_data_losses += [self.loss(self.validation_data)] | |
| if self.early_stopping and self.validation_data_losses[-1] < best_so_far['validationLoss']: | |
| best_so_far['theta'] = copy.deepcopy(self.theta) # deepcopy avoids memory reference bug | |
| best_so_far['validationLoss'] = self.validation_data_losses[-1] | |
| best_so_far['afterNIters'] = i | |
| if np.mod(i, round(self.n_iterations / float(self.n_classes))) == 0: | |
| print ('After %d optimization iterations, training data loss is %.5f, and validation data loss is %.5f' \ | |
| % (i, self.training_data_losses[-1], self.validation_data_losses[-1])) | |
| # check gradient again, this time with more typical parameters and with a different data size | |
| if i == self.n_iterations: | |
| print ('Now testing the gradient on just a mini-batch instead of the whole training set... ') | |
| training_batch = {'inputs': training_batch_x, 'targets': training_batch_y} | |
| self.test_gradient(training_batch) | |
| if self.early_stopping: | |
| print ('Early stopping: validation loss was lowest after {0} iterations. ' \ | |
| 'We chose the model that we had then.'.format(best_so_far['afterNIters'])) | |
| self.theta = copy.deepcopy(best_so_far['theta']) # deepcopy avoids memory reference bug | |
| def classify(self, batch): | |
| """Predict a specific class from a given set of sequences. | |
| """ | |
| hiddenOutputs = logistic( np.dot(self.model['inputToHid'], batch['inputs']) ) # size: <number of hidden units> by <number of data cases> | |
| classifierPreactivations = np.dot(self.model['hidToClass'], hiddenOutputs) | |
| return np.argmax(classifierPreactivations, axis=0) | |
| def loss(self, training_batch): | |
| """Evaluate loss of given data set (batch of input vectors). | |
| Args: | |
| (dict) training_batch: | |
| - 'inputs' is a matrix of size <number of inputs i.e. NUM_INPUT_UNITS> by <number of data cases> | |
| Each column describes a different data case. | |
| - 'targets' is a matrix of size <number of classes i.e. NUM_CLASSES> by <number of data cases> | |
| Each column describes a different data case. It contains a one-of-N encoding of the class, | |
| i.e. one element in every column is 1 and the others are 0. | |
| Returns: | |
| (float) loss of model | |
| """ | |
| INPUTS, TARGETS = training_batch['inputs'], training_batch['targets'] | |
| hidden_preactivations = np.dot( self.model['inputToHid'], INPUTS ) | |
| hidden_outputs = logistic( hidden_preactivations ) | |
| classifier_preactivations = np.dot( self.model['hidToClass'], hidden_outputs ) | |
| # softmax outputs: P(i) = exp(z_i) / Z where Z is partitionfunc (a.k.a. normalization constant): Z = sum over j of exp(z_j) | |
| # Note that for numerical stability, we log the above. So, log P(i) = z_i - log(Z) | |
| logZ = log_sum_exp_over_rows( classifier_preactivations ) | |
| # Cost = - log( P(target) ) | |
| # = - ( z_i - log Z ) | |
| neglogprobs = - (classifier_preactivations - logZ) | |
| # zero out non-target neuron outputs & sum, to get winning output | |
| cost = sum(neglogprobs * TARGETS, 0) # axis 0 is the classifier-axis | |
| # average over batch | |
| average_batch_cost = np.mean( cost ) | |
| # weight decay loss. Penalize large weights: E = 1/2 * weightdecay_coeffecient * sum over network(weight^2) | |
| weights_L2_penalty = .5 * sum(model_to_theta(self.model) ** 2) * self.wd_coeff | |
| total_cost = average_batch_cost + weights_L2_penalty | |
| return total_cost | |
| def calcWeightDeltas(self, inputs, targets): | |
| """Compute deltas for network weights | |
| Args: | |
| inputs: a matrix of size <number of inputs i.e. NUM_INPUT_UNITS> by <number of data cases> | |
| targets: a matrix of size <number of classes i.e. NUM_CLASSES> by <number of data cases> | |
| Outputs: | |
| gradient: dC/dW for each weight in network, in flattened (long-vector) form | |
| """ | |
| batchSize = float(np.size(targets, axis=1)) | |
| ret_model = dict() | |
| # = = = = = FORWARD PASS data thru network to get classification probs = = = = = | |
| hidden_preactivations = np.dot(self.model['inputToHid'], inputs) | |
| hidden_outputs = logistic(hidden_preactivations) | |
| classifier_preactivations = np.dot(self.model['hidToClass'], hidden_outputs) | |
| logprobs = classifier_preactivations - log_sum_exp_over_rows(classifier_preactivations) | |
| probs = np.exp( logprobs ) | |
| # = = = = = BACKPROPAGATE deltas for all weights = = = = = | |
| # Neg-Log-Prob cost C = - log P(target), where P(target) = softmax output = e^z_ans / SUM e^z_k | |
| # = - log( e^z_ans / SUM e^z_k ) | |
| # = - ( z_ans - log SUM e^z_k ) | |
| # = log SUM e^z_k - z_ans | |
| # so dC/dz_i = e^z_i / SUM e^z_k - 1 when i==ans, otherwise e^z_i / SUM e^z_k | |
| # = P(ans) - 1 when i==ans, otherwise P(i) | |
| # = y_i - target_i | |
| # (remember y_i = P(i)) | |
| dC_dzClassifier = probs - targets | |
| # - - - Weights INTO classifier: - - - | |
| # dC/dW = SUM dC/dz_k dz_k/dw | |
| # = SUM dC/dz_k hidden_outputs_k i.e. inputs to classifier i.e. outputs of the hidden layer | |
| dC_dwClassifierInputs = np.dot( hidden_outputs, dC_dzClassifier.T ) / batchSize | |
| ret_model['hidToClass'] = self.model['hidToClass'] * self.wd_coeff + dC_dwClassifierInputs.T | |
| # dC/dx = SUM dC/dz_k dz_k/dx | |
| # = SUM dC/dz_k hiddenToClass_WEIGHTS | |
| dC_dxClassifierInputs = np.dot( self.model['hidToClass'].T, dC_dzClassifier ) | |
| # - - - Weights INTO hidden layer: - - - | |
| # dC/dw__input_to_hidden = SUM dC/dX_classifierInputs dX_classifierInputs{aka hiddenOutputs}/dz_hidden dz_hidden/w__input_to_hidden | |
| # = SUM dC_dxClassifierInputs hidden_outputs (1-hidden_outputs) input_to_hidden | |
| dC_dwHiddenInputs = \ | |
| np.dot( | |
| inputs, | |
| ((1.0 - hidden_outputs) * hidden_outputs * dC_dxClassifierInputs).T | |
| ) \ | |
| / batchSize | |
| ret_model['inputToHid'] = self.model['inputToHid'] * self.wd_coeff + dC_dwHiddenInputs.T | |
| # self.gradient = self.model_to_theta(ret_model) | |
| return model_to_theta(ret_model) | |
| def classification_accuracy(self, data): | |
| """This returns the fraction of data cases that is incorrectly classified by the model. | |
| """ | |
| return np.mean(np.array(self.classify(data) != np.argmax(data['targets'], axis=0), dtype=float)) | |
| def test_gradient(self, data): | |
| """Test the gradient using a finite difference approximation to the gradient. | |
| Notes: | |
| * If that finite difference approximation results in an approximate gradient that's very different | |
| from the computed gradient produced, then the program prints an error message. | |
| """ | |
| base_theta = model_to_theta(self.model) | |
| h = 1e-2 | |
| correctness_threshold = 1e-5 | |
| gradient = self.calcWeightDeltas(data['inputs'], data['targets']) | |
| analytic_gradient_struct = theta_to_model(gradient) | |
| if len(analytic_gradient_struct.keys()) != 2: | |
| raise Exception('The object returned by calcWeightDeltas should have exactly two field names: ' | |
| '.input_to_hid and .hid_to_class') | |
| if len(analytic_gradient_struct['inputToHid']) != len(self.model['inputToHid']): | |
| raise Exception('The size of .input_to_hid of the return value of d_loss_by_d_model (currently {0}) ' | |
| 'should be same as the size of model[\'inputToHid\'] ' | |
| '(currently {1})'.format(len(analytic_gradient_struct['inputToHid']), | |
| len(self.model['inputToHid']))) | |
| if np.size(analytic_gradient_struct['hidToClass']) != np.size(self.model['hidToClass']): | |
| raise Exception('The size of .hid_to_class of the return value of d_loss_by_d_model (currently {0}) ' | |
| 'should be same as the size of model[\'hidToClass\'] ' | |
| '(currently {1})'.format(np.size(analytic_gradient_struct['hidToClass']), | |
| np.size(self.model['hidToClass']))) | |
| analytic_gradient = gradient | |
| if any(np.isnan(analytic_gradient)) or any(np.isinf(analytic_gradient)): | |
| raise Exception('Your gradient computation produced a NaN or infinity. That is an error.') | |
| # We want to test the gradient not for every element of theta, because that's a lot of work. | |
| # Instead, we test for only a few elements. If there's an error, this is probably enough to find that error. | |
| input_to_hid_theta_size = np.prod(np.size(self.model['inputToHid'])) | |
| hid_to_class_theta_size = np.prod(np.size(self.model['hidToClass'])) | |
| big_prime = 1299721 # 1299721 is prime and thus ensures a somewhat random-like selection of indices. | |
| hid_to_class_indices_to_check = np.mod(big_prime * np.array(range(20)), hid_to_class_theta_size) + input_to_hid_theta_size | |
| input_to_hid_indices_to_check = np.mod(big_prime * np.array(range(80)), input_to_hid_theta_size) | |
| indices_to_check = np.hstack((hid_to_class_indices_to_check, input_to_hid_indices_to_check)) | |
| for i, test_index in enumerate(indices_to_check): | |
| analytic_here = analytic_gradient[test_index] | |
| theta_step = base_theta * 0.0 | |
| theta_step[test_index] = h | |
| # https://en.wikipedia.org/wiki/Finite_difference_coefficient | |
| # http://www.holoborodko.com/pavel/numerical-methods/numerical-derivative/central-differences/#central_differences | |
| contribution_distances = [-4 , -3 , -2 , -1 , 1 , 2 , 3 , 4 ] # π was: range(-4, 0) + range(1, 5) | |
| contribution_weights = [ 1./280, -4./105, 1./5, -4./5, 4./5, -1./5, 4./105, -1./280] | |
| temp = 0. | |
| for distance, weight in zip(contribution_distances, contribution_weights): | |
| self.model = theta_to_model(base_theta + theta_step * distance) # temporarily update model | |
| temp += self.loss(data) * weight | |
| fd_here = temp / h | |
| diff = abs(analytic_here - fd_here) | |
| if diff > correctness_threshold and diff / float(abs(analytic_here) + abs(fd_here)) > correctness_threshold: | |
| part_names = ['inputToHid', 'hidToClass'] | |
| raise Exception('Theta element #{0} (part of {1}), with value {2}, has finite difference gradient {3} ' | |
| 'but analytic gradient {4}. That looks like an error.'.format(test_index, | |
| part_names[i <= 19], | |
| base_theta[test_index], | |
| fd_here, | |
| analytic_here)) | |
| if i == 19: | |
| print ('Gradient test passed for hid_to_class.') | |
| if i == 99: | |
| print ('Gradient test passed for input_to_hid.') | |
| self.model = theta_to_model(base_theta) # make sure model is reset back to initial | |
| print ('Gradient test passed. That means that the gradient that your code computed is within 0.001%% of the ' \ | |
| 'gradient that the finite difference approximation computed, so the gradient calculation procedure is ' \ | |
| 'probably correct (not certainly, but probably).') | |
| class A3Run(object): | |
| """Runs assignment 3. | |
| """ | |
| def __init__(self): | |
| """Initialize data set and all test cases for assignment. | |
| """ | |
| data = loadmat(os.path.join(os.getcwd(), 'Data/data.mat')) | |
| self.data_sets = data['data'] | |
| def a3_main(self, wd_coeff, n_hid, n_iterations, learning_rate, train_momentum=0.9, early_stopping=False, mini_batch_size=100): | |
| """Runs training and computes error and loss of training, testing, and validation training sets. | |
| Args: | |
| wd_coeff (float) : weight decay coefficient | |
| n_hid (int) : number of hidden units | |
| n_iterations (int) : number of training iterations | |
| learning_rate (float) : learning rate for neural net classifier | |
| train_momentum (float) : momentum used in training | |
| early_stopping (bool) : saves model at validation error minimum | |
| mini_batch_size (int) : size of training batches | |
| """ | |
| nn = FFNeuralNet( | |
| training_iters = n_iterations, | |
| validation_data = self.data_sets['validation'], | |
| wd_coeff = wd_coeff, | |
| learning_rate = learning_rate, | |
| n_hid = n_hid, | |
| n_classes = 10, | |
| n_input_units = 256, | |
| train_momentum = train_momentum, | |
| mini_batch_size = mini_batch_size, | |
| early_stopping = early_stopping | |
| ) | |
| if n_iterations != 0: | |
| print ('Now testing the gradient on the whole training set... ') | |
| nn.test_gradient(self.data_sets['training']) | |
| nn.train(self.data_sets['training']) | |
| # the optimization is finished. Now do some reporting. | |
| if n_iterations != 0 and nn.training_data_losses and nn.validation_data_losses: | |
| plt.hold(True) | |
| plt.yscale('log') | |
| plt.plot(nn.training_data_losses, 'b') | |
| plt.plot(nn.validation_data_losses, 'r') | |
| plt.legend(['training', 'validation']) | |
| plt.ylabel('loss') | |
| plt.xlabel('iteration number') | |
| plt.hold(False) | |
| for data_name, data_segment in self.data_sets.items(): | |
| print( '{Loss, Classification Error rate} on %s is { %.5f, %.5f }' | |
| % (data_name, nn.loss(data_segment), nn.classification_accuracy(data_segment) ) ) | |
| if wd_coeff != 0: | |
| nn.wd_coeff = 0 | |
| print ('Classification loss (i.e. without weight decay) is %.5f' % (nn.loss(data_segment))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment