Last active
October 29, 2021 13:25
-
-
Save rcdilorenzo/b047675728063ff0bdfcb59fc1424c0b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from typing import Callable, Optional, List | |
from pydantic import BaseModel | |
from tqdm.notebook import tqdm | |
from math import ceil, isnan, inf | |
class Activation(BaseModel): | |
forward: Callable[[np.ndarray], np.ndarray] # input -> output | |
backward: Callable[[np.ndarray, np.ndarray], np.ndarray] # (dA, X) -> gradients | |
linear = Activation( | |
forward=lambda X: X, | |
backward=lambda dA, X: ( | |
dA # identity | |
if isinstance(dA, np.ndarray) | |
else np.dot(dA, np.ones(X.shape)) # broadcast to proper size if scalar | |
) | |
) | |
relu = Activation( | |
forward=lambda X: (X > 0) * X, | |
backward=lambda dA, X: dA * (X > 0) | |
) | |
class Loss(BaseModel): | |
forward: Callable[[np.ndarray, np.ndarray], np.ndarray] # (y_true, y_pred) -> cost | |
backward: Callable[[np.ndarray, np.ndarray], np.ndarray] # (y_true, y_pred) -> gradient | |
class Layer(BaseModel): | |
W: np.ndarray | |
B: np.ndarray | |
activation: Activation = linear | |
Z: Optional[np.ndarray] = None | |
A: Optional[np.ndarray] = None | |
class Config: | |
allow_mutation = True | |
arbitrary_types_allowed = True | |
class Model(BaseModel): | |
layers: List[Layer] | |
loss: Loss | |
def mse_forward(y_true, y_pred): | |
return np.sum(np.square(y_pred - y_true)) / len(y_true) | |
def mse_grad(y_true, y_pred): | |
# 1/N ∑ (y_pred - y_true) | |
return np.mean(np.sum(y_pred - y_true)) | |
mse = Loss(forward=mse_forward, backward=mse_grad) | |
def layer(n_in, n_out, activation="linear"): | |
W = np.random.rand(n_out, n_in) * np.sqrt(2 / n_in) | |
B = np.zeros((n_out, 1)) | |
activation_f = linear | |
if activation == "relu": | |
activation_f = relu | |
return Layer(W=W, B=B, activation=activation_f) | |
def forward(model: Model, X: np.ndarray): | |
A = X.T | |
for i, layer in enumerate(model.layers): | |
layer.A = A | |
layer.Z = np.dot(layer.W, A) + layer.B | |
A = layer.activation.forward(layer.Z) | |
return np.squeeze(A) | |
def backward(model: Model, y_true: np.ndarray, y_pred: np.ndarray, alpha=0.01, inspect=False): | |
assert len(y_pred.shape) > 1, "Please reshape the y_pred matrix to be one per row" | |
assert y_pred.shape == y_true.shape, "y_pred matrix must match the size of the y matrix" | |
dA = model.loss.backward(y_true, y_pred) | |
N = y_true.shape[1] | |
for idx, layer in reversed(list(enumerate(model.layers))): | |
dZ = layer.activation.backward(dA, layer.Z) | |
# Gradient is multiplied by previous input (x or known here as A) because of chain rule | |
# Division by N occurs to get mean since dot product includes a summation | |
dW = np.dot(dZ, layer.A.T) / N | |
# Bias has no x term so it is just multipled by 1 based on the chain rule | |
# (when taking partial derivative with respect to bias) | |
dB = np.sum(dZ.reshape((1, -1)), axis=1, keepdims=True) / N | |
# Update weights and biases from gradients | |
layer.W -= dW * alpha | |
layer.B -= dB * alpha | |
# Get gradient for next (earlier) layer | |
# Because derivative of linear equation (mx + b) is m (a.k.a. slope or weight) | |
dA = np.dot(layer.W.T, dZ) | |
if inspect: | |
print(f"Layer {idx}\n=> W:\n{layer.W}\n=>B:\n{layer.B}\n\n") | |
return model | |
def train(model: Model, X_train: np.ndarray, y_train: np.ndarray, epochs=1, early_stopping_n=3, batch_size = 16, learning_rate = 0.01): | |
N = X_train.shape[0] | |
losses = [] | |
min_loss = inf | |
for epoch in range(epochs): | |
print(f"Epoch {epoch + 1}") | |
for batch_i in tqdm(range(ceil(N / batch_size))): | |
i_start, i_end = batch_i * batch_size, (batch_i + 1) * batch_size | |
X_batch, y_batch = X_train[i_start:i_end], y_train[i_start:i_end] | |
y_batch_pred = forward(model, X_batch) | |
loss = model.loss.forward(y_batch, y_batch_pred) | |
losses.append(loss) | |
min_loss = min(min_loss, loss) | |
if isnan(loss): | |
print("Stopping due to nan loss") | |
return | |
if len(losses) > early_stopping_n and ( | |
np.array(losses)[-early_stopping_n:-1] | |
- np.array(losses)[-early_stopping_n-1:-2] > 0 | |
).all(): | |
print("Early stopping now.") | |
return | |
print(f"MSE: {loss:.1f}") | |
backward( | |
model, | |
y_true=y_batch.reshape((1, -1)), | |
y_pred=y_batch_pred.reshape((1, -1)), | |
alpha=learning_rate | |
) | |
for layer in model.layers: | |
layer.Z = None | |
layer.A = None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def real_func(a, b): | |
return a * b + 3 | |
X = np.random.randint(20, size=(1000, 2)) | |
y = np.array([real_func(x[0], x[1]) for x in X]) | |
N = len(X) | |
split = 0.8 | |
split_index = int(split * N) | |
X_train, X_test = X[:split_index], X[split_index:] | |
y_train, y_test = y[:split_index], y[split_index:] | |
model = Model( | |
layers=[ | |
layer(2, 4, activation="relu"), | |
layer(4, 4, activation="relu"), | |
layer(4, 4, activation="relu"), | |
layer(4, 1) | |
], | |
loss=mse | |
) | |
train( | |
model, | |
X_train=X_train, | |
y_train=y_train, | |
epochs=100, | |
learning_rate=0.000000001, | |
batch_size=16, | |
early_stopping_n=3 | |
) | |
forward(model, np.array([[1, 3], [2, 3]])) | |
# => array([6.8428677, 8.3032102]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment