Skip to content

Instantly share code, notes, and snippets.

@contrasting
Created October 16, 2022 14:38
Show Gist options
  • Select an option

  • Save contrasting/970503bf4db31fabeafd0bdad561c8db to your computer and use it in GitHub Desktop.

Select an option

Save contrasting/970503bf4db31fabeafd0bdad561c8db to your computer and use it in GitHub Desktop.
Mini NN Library
import numpy as np
import pickle
def xavier_init(size, gain=1.0):
"""
Xavier initialization of network weights.
"""
low = -gain * np.sqrt(6.0 / np.sum(size))
high = gain * np.sqrt(6.0 / np.sum(size))
return np.random.uniform(low=low, high=high, size=size)
class Layer:
"""
Abstract layer class.
"""
def __init__(self, *args, **kwargs):
raise NotImplementedError()
def forward(self, *args, **kwargs):
raise NotImplementedError()
def __call__(self, *args, **kwargs):
return self.forward(*args, **kwargs)
def backward(self, *args, **kwargs):
raise NotImplementedError()
def update_params(self, *args, **kwargs):
pass
class MSELossLayer(Layer):
"""
MSELossLayer: Computes mean-squared error between y_pred and y_target.
"""
def __init__(self, *args, **kwargs):
self._cache_current = None
@staticmethod
def _mse(y_pred, y_target):
return np.mean((y_pred - y_target) ** 2)
@staticmethod
def _mse_grad(y_pred, y_target):
return 2 * (y_pred - y_target) / len(y_pred)
def forward(self, y_pred, y_target):
self._cache_current = y_pred, y_target
return self._mse(y_pred, y_target)
def backward(self):
return self._mse_grad(*self._cache_current)
class CrossEntropyLossLayer(Layer):
"""
CrossEntropyLossLayer: Computes the softmax followed by the negative log-
likelihood loss.
"""
def __init__(self):
self._cache_current = None
@staticmethod
def softmax(x):
numer = np.exp(x - x.max(axis=1, keepdims=True))
denom = numer.sum(axis=1, keepdims=True)
return numer / denom
def forward(self, inputs, y_target):
assert len(inputs) == len(y_target)
n_obs = len(y_target)
probs = self.softmax(inputs)
self._cache_current = y_target, probs
out = -1 / n_obs * np.sum(y_target * np.log(probs))
return out
def backward(self):
y_target, probs = self._cache_current
n_obs = len(y_target)
return -1 / n_obs * (y_target - probs)
class SigmoidLayer(Layer):
"""
SigmoidLayer: Applies sigmoid function elementwise.
"""
def __init__(self):
self._cache_current = None
@staticmethod
def _sigmoid(x):
return 1 / ( 1 + np.exp(-x) )
def forward(self, x):
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
# what to cache? There are no weights here, we just need to pass back
# dL/dX = dL/dZ * dZ/dX = dL/dZ * f'(X) if you look at the lecture notes.
# so cache x again
self._cache_current = x
return self._sigmoid(x)
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def backward(self, grad_z):
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
# pass back dL/dX = dL/dZ * f'(X) with a Hadamard product!
# derivative of sigmoid f'(x) = f(x)(1 - f(x))
f_prime = self._sigmoid(self._cache_current) * (1 - self._sigmoid(self._cache_current))
return grad_z * f_prime
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
class ReluLayer(Layer):
"""
ReluLayer: Applies Relu function elementwise.
"""
def __init__(self):
self._cache_current = None
@staticmethod
def _relu(x):
if x > 0:
return x
return 0
@staticmethod
def _relu_prime(x):
if x > 0:
return 1
return 0
def forward(self, x):
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
self._cache_current = x
return np.vectorize(self._relu)(x)
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def backward(self, grad_z):
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
return grad_z * np.vectorize(self._relu_prime)(self._cache_current)
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
class LinearLayer(Layer):
"""
LinearLayer: Performs affine transformation of input.
"""
def __init__(self, n_in, n_out):
"""Constructor.
Arguments:
n_in {int} -- Number (or dimension) of inputs.
n_out {int} -- Number (or dimension) of outputs.
"""
self.n_in = n_in
self.n_out = n_out
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
self._W = xavier_init([n_in, n_out])
self._b = xavier_init([1, n_out])
self._cache_current = None
self._grad_W_current = None
self._grad_b_current = None
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def forward(self, x):
"""
Performs forward pass through the layer (i.e. returns Wx + b).
Logs information needed to compute gradient at a later stage in
`_cache_current`.
Arguments:
x {np.ndarray} -- Input array of shape (batch_size, n_in).
Returns:
{np.ndarray} -- Output array of shape (batch_size, n_out)
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
# what to cache here? x, of course, because we need to calculate dL/dW,
# we are passed dL/dZ, to which we multiply dZ/dW, which is a function of x
self._cache_current = x
batch_size = len(x)
# B is the "stacked" version of b, see lecture notes
return np.matmul(x, self._W) + np.repeat(self._b, batch_size, axis=0)
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def backward(self, grad_z):
"""
Given `grad_z`, the gradient of some scalar (e.g. loss) with respect to
the output of this layer, performs back pass through the layer (i.e.
computes gradients of loss with respect to parameters of layer and
inputs of layer).
Arguments:
grad_z {np.ndarray} -- Gradient array of shape (batch_size, n_out).
Returns:
{np.ndarray} -- Array containing gradient with repect to layer
input, of shape (batch_size, n_in).
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
# dL/dW = dL/dZ * dZ/dW OR X' * dL/dZ if you do the matrix calculus/look at lecture notes
self._grad_W_current = np.matmul(np.transpose(self._cache_current), grad_z)
self._grad_b_current = np.matmul(np.ones([1, len(grad_z)]), grad_z)
# pass dL/dX back which is dL/dZ * W'
return np.matmul(grad_z, np.transpose(self._W))
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def update_params(self, learning_rate):
"""
Performs one step of gradient descent with given learning rate on the
layer's parameters using currently stored gradients.
Arguments:
learning_rate {float} -- Learning rate of update step.
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
self._W = self._W - learning_rate * self._grad_W_current
self._b = self._b - learning_rate * self._grad_b_current
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
class MultiLayerNetwork(object):
"""
MultiLayerNetwork: A network consisting of stacked linear layers and
activation functions.
"""
@staticmethod
def _activation(a: str):
if a == "relu":
return ReluLayer()
if a == "identity":
return SigmoidLayer()
def __init__(self, input_dim, neurons, activations):
"""Constructor.
Arguments:
input_dim {int} -- Dimension of input (excluding batch dimension).
neurons {list} -- Number of neurons in each layer represented as a
list (the length of the list determines the number of layers).
activations {list} -- List of the activation function to use for
each layer.
"""
self.input_dim = input_dim
self.neurons = neurons
self.activations = activations
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
layers = []
layers.append(LinearLayer(input_dim, neurons[0]))
layers.append(self._activation(activations[0]))
for i in range(0, len(neurons) - 1): # note - 1
layers.append(LinearLayer(neurons[i], neurons[i + 1]))
layers.append(self._activation(activations[i + 1]))
self._layers = layers
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def forward(self, x):
"""
Performs forward pass through the network.
Arguments:
x {np.ndarray} -- Input array of shape (batch_size, input_dim).
Returns:
{np.ndarray} -- Output array of shape (batch_size,
#_neurons_in_final_layer)
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
for layer in self._layers:
x = layer.forward(x)
return x
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def __call__(self, x):
return self.forward(x)
def backward(self, grad_z):
"""
Performs backward pass through the network.
Arguments:
grad_z {np.ndarray} -- Gradient array of shape (1,
#_neurons_in_final_layer).
Returns:
{np.ndarray} -- Array containing gradient with repect to layer
input, of shape (batch_size, input_dim).
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
# gotta walk backwards, starting from the output layer, right?
for i in range(0, len(self._layers)):
layer = self._layers[len(self._layers) - i - 1]
grad_z = layer.backward(grad_z)
return grad_z
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def update_params(self, learning_rate):
"""
Performs one step of gradient descent with given learning rate on the
parameters of all layers using currently stored gradients.
Arguments:
learning_rate {float} -- Learning rate of update step.
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
for layer in self._layers:
layer.update_params(learning_rate)
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def save_network(network, fpath):
"""
Utility function to pickle `network` at file path `fpath`.
"""
with open(fpath, "wb") as f:
pickle.dump(network, f)
def load_network(fpath):
"""
Utility function to load network found at file path `fpath`.
"""
with open(fpath, "rb") as f:
network = pickle.load(f)
return network
class Trainer(object):
"""
Trainer: Object that manages the training of a neural network.
"""
def __init__(
self,
network,
batch_size,
nb_epoch,
learning_rate,
loss_fun,
shuffle_flag,
):
"""Constructor.
Arguments:
network {MultiLayerNetwork} -- MultiLayerNetwork to be trained.
batch_size {int} -- Training batch size.
nb_epoch {int} -- Number of training epochs.
learning_rate {float} -- SGD learning rate to be used in training.
loss_fun {str} -- Loss function to be used. Possible values: mse,
bce.
shuffle_flag {bool} -- If True, training data is shuffled before
training.
"""
self.network = network
self.batch_size = batch_size
self.nb_epoch = nb_epoch
self.learning_rate = learning_rate
self.loss_fun = loss_fun
self.shuffle_flag = shuffle_flag
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
if loss_fun == "mse":
self._loss_layer = MSELossLayer()
if loss_fun == "cross_entropy":
self._loss_layer = CrossEntropyLossLayer()
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
@staticmethod
def shuffle(input_dataset, target_dataset):
"""
Returns shuffled versions of the inputs.
Arguments:
- input_dataset {np.ndarray} -- Array of input features, of shape
(#_data_points, n_features).
- target_dataset {np.ndarray} -- Array of corresponding targets, of
shape (#_data_points, ).
Returns: 2-tuple of np.ndarray: (shuffled inputs, shuffled_targets).
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
# to shuffle as one, we need to append first
# edit: no - we don't know the dimension of outputs, it's not necessarily 1
# appended = np.append(input_dataset, target_dataset, axis=1)
#
# np.random.shuffle(appended)
#
# return appended[:,:-1], appended[:,-1:]
# yifeng or whoever's solution:
assert len(input_dataset) == len(target_dataset)
p = np.random.permutation(len(input_dataset))
return input_dataset[p], target_dataset[p]
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def train(self, input_dataset, target_dataset):
"""
Main training loop. Performs the following steps `nb_epoch` times:
- Shuffles the input data (if `shuffle` is True)
- Splits the dataset into batches of size `batch_size`.
- For each batch:
- Performs forward pass through the network given the current
batch of inputs.
- Computes loss.
- Performs backward pass to compute gradients of loss with
respect to parameters of network.
- Performs one step of gradient descent on the network
parameters.
Arguments:
- input_dataset {np.ndarray} -- Array of input features, of shape
(#_training_data_points, n_features).
- target_dataset {np.ndarray} -- Array of corresponding targets, of
shape (#_training_data_points, ).
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
if self.shuffle_flag:
input_dataset, target_dataset = self.shuffle(input_dataset, target_dataset)
# tuple of lists. But we want list of tuples. How? Zip
# batches = np.split(input_dataset, self.batch_size), np.split(target_dataset, self.batch_size)
batches = zip(np.split(input_dataset, self.batch_size), np.split(target_dataset, self.batch_size))
for epoch in range(0, self.nb_epoch):
for i, t in batches:
output = self.network.forward(i)
loss = self._loss_layer.forward(output, t)
grad_z = self._loss_layer.backward()
grad_z = self.network.backward(grad_z)
self.network.update_params(self.learning_rate)
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def eval_loss(self, input_dataset, target_dataset):
"""
Function that evaluate the loss function for given data.
Arguments:
- input_dataset {np.ndarray} -- Array of input features, of shape
(#_evaluation_data_points, n_features).
- target_dataset {np.ndarray} -- Array of corresponding targets, of
shape (#_evaluation_data_points, ).
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
output = self.network.forward(input_dataset)
loss = self._loss_layer.forward(output, target_dataset)
# do we return the average or total loss?
return loss
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
class Preprocessor(object):
"""
Preprocessor: Object used to apply "preprocessing" operation to datasets.
The object can also be used to revert the changes.
"""
def __init__(self, data):
"""
Initializes the Preprocessor according to the provided dataset.
(Does not modify the dataset.)
Arguments:
- data {np.ndarray} dataset used to determined the parameters for
the normalization.
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
self.min = np.min(data)
self.max = np.max(data)
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def apply(self, data):
"""
Apply the pre-processing operations to the provided dataset.
Arguments:
- data {np.ndarray} dataset to be normalized.
Returns:
{np.ndarray} normalized dataset.
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
# check lecture notes for formula on min max scaling
return (data - self.min) / (self.max - self.min)
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def revert(self, data):
"""
Revert the pre-processing operations to retreive the original dataset.
Arguments:
- data {np.ndarray} dataset for which to revert normalization.
Returns:
{np.ndarray} reverted dataset.
"""
#######################################################################
# ** START OF YOUR CODE **
#######################################################################
# just play with equation
return data * (self.max - self.min) + self.min
#######################################################################
# ** END OF YOUR CODE **
#######################################################################
def example_main():
input_dim = 4
neurons = [16, 3]
activations = ["relu", "identity"]
net = MultiLayerNetwork(input_dim, neurons, activations)
dat = np.loadtxt("iris.dat")
np.random.shuffle(dat)
x = dat[:, :4]
y = dat[:, 4:]
split_idx = int(0.8 * len(x))
x_train = x[:split_idx]
y_train = y[:split_idx]
x_val = x[split_idx:]
y_val = y[split_idx:]
prep_input = Preprocessor(x_train)
x_train_pre = prep_input.apply(x_train)
x_val_pre = prep_input.apply(x_val)
trainer = Trainer(
network=net,
batch_size=8,
nb_epoch=1000,
learning_rate=0.01,
loss_fun="cross_entropy",
shuffle_flag=True,
)
trainer.train(x_train_pre, y_train)
print("Train loss = ", trainer.eval_loss(x_train_pre, y_train))
print("Validation loss = ", trainer.eval_loss(x_val_pre, y_val))
preds = net(x_val_pre).argmax(axis=1).squeeze()
targets = y_val.argmax(axis=1).squeeze()
accuracy = (preds == targets).mean()
print("Validation accuracy: {}".format(accuracy))
if __name__ == "__main__":
example_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment