Created
October 16, 2022 14:38
-
-
Save contrasting/970503bf4db31fabeafd0bdad561c8db to your computer and use it in GitHub Desktop.
Mini NN Library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import pickle | |
| def xavier_init(size, gain=1.0): | |
| """ | |
| Xavier initialization of network weights. | |
| """ | |
| low = -gain * np.sqrt(6.0 / np.sum(size)) | |
| high = gain * np.sqrt(6.0 / np.sum(size)) | |
| return np.random.uniform(low=low, high=high, size=size) | |
| class Layer: | |
| """ | |
| Abstract layer class. | |
| """ | |
| def __init__(self, *args, **kwargs): | |
| raise NotImplementedError() | |
| def forward(self, *args, **kwargs): | |
| raise NotImplementedError() | |
| def __call__(self, *args, **kwargs): | |
| return self.forward(*args, **kwargs) | |
| def backward(self, *args, **kwargs): | |
| raise NotImplementedError() | |
| def update_params(self, *args, **kwargs): | |
| pass | |
| class MSELossLayer(Layer): | |
| """ | |
| MSELossLayer: Computes mean-squared error between y_pred and y_target. | |
| """ | |
| def __init__(self, *args, **kwargs): | |
| self._cache_current = None | |
| @staticmethod | |
| def _mse(y_pred, y_target): | |
| return np.mean((y_pred - y_target) ** 2) | |
| @staticmethod | |
| def _mse_grad(y_pred, y_target): | |
| return 2 * (y_pred - y_target) / len(y_pred) | |
| def forward(self, y_pred, y_target): | |
| self._cache_current = y_pred, y_target | |
| return self._mse(y_pred, y_target) | |
| def backward(self): | |
| return self._mse_grad(*self._cache_current) | |
| class CrossEntropyLossLayer(Layer): | |
| """ | |
| CrossEntropyLossLayer: Computes the softmax followed by the negative log- | |
| likelihood loss. | |
| """ | |
| def __init__(self): | |
| self._cache_current = None | |
| @staticmethod | |
| def softmax(x): | |
| numer = np.exp(x - x.max(axis=1, keepdims=True)) | |
| denom = numer.sum(axis=1, keepdims=True) | |
| return numer / denom | |
| def forward(self, inputs, y_target): | |
| assert len(inputs) == len(y_target) | |
| n_obs = len(y_target) | |
| probs = self.softmax(inputs) | |
| self._cache_current = y_target, probs | |
| out = -1 / n_obs * np.sum(y_target * np.log(probs)) | |
| return out | |
| def backward(self): | |
| y_target, probs = self._cache_current | |
| n_obs = len(y_target) | |
| return -1 / n_obs * (y_target - probs) | |
| class SigmoidLayer(Layer): | |
| """ | |
| SigmoidLayer: Applies sigmoid function elementwise. | |
| """ | |
| def __init__(self): | |
| self._cache_current = None | |
| @staticmethod | |
| def _sigmoid(x): | |
| return 1 / ( 1 + np.exp(-x) ) | |
| def forward(self, x): | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| # what to cache? There are no weights here, we just need to pass back | |
| # dL/dX = dL/dZ * dZ/dX = dL/dZ * f'(X) if you look at the lecture notes. | |
| # so cache x again | |
| self._cache_current = x | |
| return self._sigmoid(x) | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def backward(self, grad_z): | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| # pass back dL/dX = dL/dZ * f'(X) with a Hadamard product! | |
| # derivative of sigmoid f'(x) = f(x)(1 - f(x)) | |
| f_prime = self._sigmoid(self._cache_current) * (1 - self._sigmoid(self._cache_current)) | |
| return grad_z * f_prime | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| class ReluLayer(Layer): | |
| """ | |
| ReluLayer: Applies Relu function elementwise. | |
| """ | |
| def __init__(self): | |
| self._cache_current = None | |
| @staticmethod | |
| def _relu(x): | |
| if x > 0: | |
| return x | |
| return 0 | |
| @staticmethod | |
| def _relu_prime(x): | |
| if x > 0: | |
| return 1 | |
| return 0 | |
| def forward(self, x): | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| self._cache_current = x | |
| return np.vectorize(self._relu)(x) | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def backward(self, grad_z): | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| return grad_z * np.vectorize(self._relu_prime)(self._cache_current) | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| class LinearLayer(Layer): | |
| """ | |
| LinearLayer: Performs affine transformation of input. | |
| """ | |
| def __init__(self, n_in, n_out): | |
| """Constructor. | |
| Arguments: | |
| n_in {int} -- Number (or dimension) of inputs. | |
| n_out {int} -- Number (or dimension) of outputs. | |
| """ | |
| self.n_in = n_in | |
| self.n_out = n_out | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| self._W = xavier_init([n_in, n_out]) | |
| self._b = xavier_init([1, n_out]) | |
| self._cache_current = None | |
| self._grad_W_current = None | |
| self._grad_b_current = None | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def forward(self, x): | |
| """ | |
| Performs forward pass through the layer (i.e. returns Wx + b). | |
| Logs information needed to compute gradient at a later stage in | |
| `_cache_current`. | |
| Arguments: | |
| x {np.ndarray} -- Input array of shape (batch_size, n_in). | |
| Returns: | |
| {np.ndarray} -- Output array of shape (batch_size, n_out) | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| # what to cache here? x, of course, because we need to calculate dL/dW, | |
| # we are passed dL/dZ, to which we multiply dZ/dW, which is a function of x | |
| self._cache_current = x | |
| batch_size = len(x) | |
| # B is the "stacked" version of b, see lecture notes | |
| return np.matmul(x, self._W) + np.repeat(self._b, batch_size, axis=0) | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def backward(self, grad_z): | |
| """ | |
| Given `grad_z`, the gradient of some scalar (e.g. loss) with respect to | |
| the output of this layer, performs back pass through the layer (i.e. | |
| computes gradients of loss with respect to parameters of layer and | |
| inputs of layer). | |
| Arguments: | |
| grad_z {np.ndarray} -- Gradient array of shape (batch_size, n_out). | |
| Returns: | |
| {np.ndarray} -- Array containing gradient with repect to layer | |
| input, of shape (batch_size, n_in). | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| # dL/dW = dL/dZ * dZ/dW OR X' * dL/dZ if you do the matrix calculus/look at lecture notes | |
| self._grad_W_current = np.matmul(np.transpose(self._cache_current), grad_z) | |
| self._grad_b_current = np.matmul(np.ones([1, len(grad_z)]), grad_z) | |
| # pass dL/dX back which is dL/dZ * W' | |
| return np.matmul(grad_z, np.transpose(self._W)) | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def update_params(self, learning_rate): | |
| """ | |
| Performs one step of gradient descent with given learning rate on the | |
| layer's parameters using currently stored gradients. | |
| Arguments: | |
| learning_rate {float} -- Learning rate of update step. | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| self._W = self._W - learning_rate * self._grad_W_current | |
| self._b = self._b - learning_rate * self._grad_b_current | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| class MultiLayerNetwork(object): | |
| """ | |
| MultiLayerNetwork: A network consisting of stacked linear layers and | |
| activation functions. | |
| """ | |
| @staticmethod | |
| def _activation(a: str): | |
| if a == "relu": | |
| return ReluLayer() | |
| if a == "identity": | |
| return SigmoidLayer() | |
| def __init__(self, input_dim, neurons, activations): | |
| """Constructor. | |
| Arguments: | |
| input_dim {int} -- Dimension of input (excluding batch dimension). | |
| neurons {list} -- Number of neurons in each layer represented as a | |
| list (the length of the list determines the number of layers). | |
| activations {list} -- List of the activation function to use for | |
| each layer. | |
| """ | |
| self.input_dim = input_dim | |
| self.neurons = neurons | |
| self.activations = activations | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| layers = [] | |
| layers.append(LinearLayer(input_dim, neurons[0])) | |
| layers.append(self._activation(activations[0])) | |
| for i in range(0, len(neurons) - 1): # note - 1 | |
| layers.append(LinearLayer(neurons[i], neurons[i + 1])) | |
| layers.append(self._activation(activations[i + 1])) | |
| self._layers = layers | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def forward(self, x): | |
| """ | |
| Performs forward pass through the network. | |
| Arguments: | |
| x {np.ndarray} -- Input array of shape (batch_size, input_dim). | |
| Returns: | |
| {np.ndarray} -- Output array of shape (batch_size, | |
| #_neurons_in_final_layer) | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| for layer in self._layers: | |
| x = layer.forward(x) | |
| return x | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def __call__(self, x): | |
| return self.forward(x) | |
| def backward(self, grad_z): | |
| """ | |
| Performs backward pass through the network. | |
| Arguments: | |
| grad_z {np.ndarray} -- Gradient array of shape (1, | |
| #_neurons_in_final_layer). | |
| Returns: | |
| {np.ndarray} -- Array containing gradient with repect to layer | |
| input, of shape (batch_size, input_dim). | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| # gotta walk backwards, starting from the output layer, right? | |
| for i in range(0, len(self._layers)): | |
| layer = self._layers[len(self._layers) - i - 1] | |
| grad_z = layer.backward(grad_z) | |
| return grad_z | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def update_params(self, learning_rate): | |
| """ | |
| Performs one step of gradient descent with given learning rate on the | |
| parameters of all layers using currently stored gradients. | |
| Arguments: | |
| learning_rate {float} -- Learning rate of update step. | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| for layer in self._layers: | |
| layer.update_params(learning_rate) | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def save_network(network, fpath): | |
| """ | |
| Utility function to pickle `network` at file path `fpath`. | |
| """ | |
| with open(fpath, "wb") as f: | |
| pickle.dump(network, f) | |
| def load_network(fpath): | |
| """ | |
| Utility function to load network found at file path `fpath`. | |
| """ | |
| with open(fpath, "rb") as f: | |
| network = pickle.load(f) | |
| return network | |
| class Trainer(object): | |
| """ | |
| Trainer: Object that manages the training of a neural network. | |
| """ | |
| def __init__( | |
| self, | |
| network, | |
| batch_size, | |
| nb_epoch, | |
| learning_rate, | |
| loss_fun, | |
| shuffle_flag, | |
| ): | |
| """Constructor. | |
| Arguments: | |
| network {MultiLayerNetwork} -- MultiLayerNetwork to be trained. | |
| batch_size {int} -- Training batch size. | |
| nb_epoch {int} -- Number of training epochs. | |
| learning_rate {float} -- SGD learning rate to be used in training. | |
| loss_fun {str} -- Loss function to be used. Possible values: mse, | |
| bce. | |
| shuffle_flag {bool} -- If True, training data is shuffled before | |
| training. | |
| """ | |
| self.network = network | |
| self.batch_size = batch_size | |
| self.nb_epoch = nb_epoch | |
| self.learning_rate = learning_rate | |
| self.loss_fun = loss_fun | |
| self.shuffle_flag = shuffle_flag | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| if loss_fun == "mse": | |
| self._loss_layer = MSELossLayer() | |
| if loss_fun == "cross_entropy": | |
| self._loss_layer = CrossEntropyLossLayer() | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| @staticmethod | |
| def shuffle(input_dataset, target_dataset): | |
| """ | |
| Returns shuffled versions of the inputs. | |
| Arguments: | |
| - input_dataset {np.ndarray} -- Array of input features, of shape | |
| (#_data_points, n_features). | |
| - target_dataset {np.ndarray} -- Array of corresponding targets, of | |
| shape (#_data_points, ). | |
| Returns: 2-tuple of np.ndarray: (shuffled inputs, shuffled_targets). | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| # to shuffle as one, we need to append first | |
| # edit: no - we don't know the dimension of outputs, it's not necessarily 1 | |
| # appended = np.append(input_dataset, target_dataset, axis=1) | |
| # | |
| # np.random.shuffle(appended) | |
| # | |
| # return appended[:,:-1], appended[:,-1:] | |
| # yifeng or whoever's solution: | |
| assert len(input_dataset) == len(target_dataset) | |
| p = np.random.permutation(len(input_dataset)) | |
| return input_dataset[p], target_dataset[p] | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def train(self, input_dataset, target_dataset): | |
| """ | |
| Main training loop. Performs the following steps `nb_epoch` times: | |
| - Shuffles the input data (if `shuffle` is True) | |
| - Splits the dataset into batches of size `batch_size`. | |
| - For each batch: | |
| - Performs forward pass through the network given the current | |
| batch of inputs. | |
| - Computes loss. | |
| - Performs backward pass to compute gradients of loss with | |
| respect to parameters of network. | |
| - Performs one step of gradient descent on the network | |
| parameters. | |
| Arguments: | |
| - input_dataset {np.ndarray} -- Array of input features, of shape | |
| (#_training_data_points, n_features). | |
| - target_dataset {np.ndarray} -- Array of corresponding targets, of | |
| shape (#_training_data_points, ). | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| if self.shuffle_flag: | |
| input_dataset, target_dataset = self.shuffle(input_dataset, target_dataset) | |
| # tuple of lists. But we want list of tuples. How? Zip | |
| # batches = np.split(input_dataset, self.batch_size), np.split(target_dataset, self.batch_size) | |
| batches = zip(np.split(input_dataset, self.batch_size), np.split(target_dataset, self.batch_size)) | |
| for epoch in range(0, self.nb_epoch): | |
| for i, t in batches: | |
| output = self.network.forward(i) | |
| loss = self._loss_layer.forward(output, t) | |
| grad_z = self._loss_layer.backward() | |
| grad_z = self.network.backward(grad_z) | |
| self.network.update_params(self.learning_rate) | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def eval_loss(self, input_dataset, target_dataset): | |
| """ | |
| Function that evaluate the loss function for given data. | |
| Arguments: | |
| - input_dataset {np.ndarray} -- Array of input features, of shape | |
| (#_evaluation_data_points, n_features). | |
| - target_dataset {np.ndarray} -- Array of corresponding targets, of | |
| shape (#_evaluation_data_points, ). | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| output = self.network.forward(input_dataset) | |
| loss = self._loss_layer.forward(output, target_dataset) | |
| # do we return the average or total loss? | |
| return loss | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| class Preprocessor(object): | |
| """ | |
| Preprocessor: Object used to apply "preprocessing" operation to datasets. | |
| The object can also be used to revert the changes. | |
| """ | |
| def __init__(self, data): | |
| """ | |
| Initializes the Preprocessor according to the provided dataset. | |
| (Does not modify the dataset.) | |
| Arguments: | |
| - data {np.ndarray} dataset used to determined the parameters for | |
| the normalization. | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| self.min = np.min(data) | |
| self.max = np.max(data) | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def apply(self, data): | |
| """ | |
| Apply the pre-processing operations to the provided dataset. | |
| Arguments: | |
| - data {np.ndarray} dataset to be normalized. | |
| Returns: | |
| {np.ndarray} normalized dataset. | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| # check lecture notes for formula on min max scaling | |
| return (data - self.min) / (self.max - self.min) | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def revert(self, data): | |
| """ | |
| Revert the pre-processing operations to retreive the original dataset. | |
| Arguments: | |
| - data {np.ndarray} dataset for which to revert normalization. | |
| Returns: | |
| {np.ndarray} reverted dataset. | |
| """ | |
| ####################################################################### | |
| # ** START OF YOUR CODE ** | |
| ####################################################################### | |
| # just play with equation | |
| return data * (self.max - self.min) + self.min | |
| ####################################################################### | |
| # ** END OF YOUR CODE ** | |
| ####################################################################### | |
| def example_main(): | |
| input_dim = 4 | |
| neurons = [16, 3] | |
| activations = ["relu", "identity"] | |
| net = MultiLayerNetwork(input_dim, neurons, activations) | |
| dat = np.loadtxt("iris.dat") | |
| np.random.shuffle(dat) | |
| x = dat[:, :4] | |
| y = dat[:, 4:] | |
| split_idx = int(0.8 * len(x)) | |
| x_train = x[:split_idx] | |
| y_train = y[:split_idx] | |
| x_val = x[split_idx:] | |
| y_val = y[split_idx:] | |
| prep_input = Preprocessor(x_train) | |
| x_train_pre = prep_input.apply(x_train) | |
| x_val_pre = prep_input.apply(x_val) | |
| trainer = Trainer( | |
| network=net, | |
| batch_size=8, | |
| nb_epoch=1000, | |
| learning_rate=0.01, | |
| loss_fun="cross_entropy", | |
| shuffle_flag=True, | |
| ) | |
| trainer.train(x_train_pre, y_train) | |
| print("Train loss = ", trainer.eval_loss(x_train_pre, y_train)) | |
| print("Validation loss = ", trainer.eval_loss(x_val_pre, y_val)) | |
| preds = net(x_val_pre).argmax(axis=1).squeeze() | |
| targets = y_val.argmax(axis=1).squeeze() | |
| accuracy = (preds == targets).mean() | |
| print("Validation accuracy: {}".format(accuracy)) | |
| if __name__ == "__main__": | |
| example_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment