Created
February 5, 2022 10:44
-
-
Save 3N4N/4e120c0f7eb12caa652257972f6c3653 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from tensorflow.keras.datasets import mnist | |
from tqdm.auto import tqdm | |
(x_train, y_train), (x_test, y_test) = mnist.load_data() | |
### IMPORTANT: RUN THIS CELL ONLY ONCE !!! ### | |
# add dimension to images | |
x_train = np.expand_dims(x_train, axis=1) | |
x_test = np.expand_dims(x_test, axis=1) | |
def conv_forward(x, w, b): | |
""" | |
Perform convolutional forward pass. | |
x: input of shape (N, C, H, W) | |
w: filters of shape (F, C, FH, FW) | |
b: bias terms of shape (F, ) | |
""" | |
N, C, H, W = x.shape | |
F, _, FH, FW = w.shape | |
# other parameters. Set so that the input shape remains unchanged | |
stride = 1 # stride to apply filter | |
padding = (FH - 1) // 2 # padding on each side | |
out = np.zeros((N, F, H, W)) | |
padded_x = np.pad(x, ((0, 0), (0, 0), (padding, padding), (padding, padding)), mode='constant') | |
_, _, padded_H, padded_W = padded_x.shape | |
x_col = np.zeros((C * FH * FW, H * W)) | |
w_row = w.reshape(F, C * FH * FW) | |
for i in range(N): | |
c = 0 | |
for j in range(0, padded_H - FH + 1, stride): | |
for k in range(0, padded_W - FW + 1, stride): | |
x_col[:, c] = padded_x[i, :, j:j+FH, k:k+FW].reshape(C * FH * FW) | |
c += 1 | |
out[i, :] = (np.dot(w_row, x_col) + b.reshape(-1, 1)).reshape(F, H, W) | |
cache = (x, w, b, stride, padding) | |
return out, cache | |
def conv_backward(dout, cache): | |
""" | |
Perform convolutional backpropagation | |
dout: downstream derivative | |
cache: caches from foward pass | |
""" | |
x, w, b, stride, padding = cache | |
N, C, H, W = x.shape | |
F, _, FH, FW = w.shape | |
padded_x = np.pad(x, ((0, 0), (0, 0), (padding, padding), (padding, padding)), mode='constant') | |
_, _, padded_H, padded_W = padded_x.shape | |
dx = np.zeros_like(x) | |
dw = np.zeros_like(w) | |
db = np.zeros_like(b) | |
x_col = np.zeros((C * FH * FW, H * W)) | |
w_row = w.reshape(F, C * FH * FW) | |
for i in range(N): | |
curr_dout = dout[i, :, :, :].reshape(F, H * W) | |
curr_out = np.dot(w_row.T, curr_dout) | |
curr_dpx = np.zeros(padded_x.shape[1:]) | |
c = 0 | |
for j in range(0, padded_H - FH + 1, stride): | |
for k in range(0, padded_W - FW + 1, stride): | |
curr_dpx[:, j:j+FH, k:k+FW] += curr_out[:, c].reshape(C, FH, FW) | |
x_col[:, c] = padded_x[i, :, j:j+FH, k:k+FW].reshape(C * FH * FW) | |
c += 1 | |
dx[i] = curr_dpx[:, padding:-padding, padding:-padding] | |
dw += np.dot(curr_dout, x_col.T).reshape(F, C, FH, FW) | |
db += np.sum(curr_dout, axis=1) | |
return dx, dw, db | |
def relu_forward(x): | |
""" | |
ReLU activation foward pass. | |
x: input of shape(N, C, H, W) | |
""" | |
out = np.maximum(x, 0) | |
cache = x | |
return out, cache | |
def relu_backward(dout, cache): | |
""" | |
ReLU backpropagation. | |
dout: downstream derivative | |
cache: cache from forward pass | |
""" | |
x = cache | |
dx = dout * (x > 0) | |
return dx | |
def max_pool_forward(x, shape=[2, 2], stride=2): | |
""" | |
Max pooling layer forward pass. | |
x: input of shape (N, C, H, W) | |
shape: shape of the pooling region | |
stride: stride to apply pooling | |
""" | |
N, C, H, W = x.shape | |
pool_height, pool_width = shape | |
out_H = 1 + (H - pool_height) // stride | |
out_W = 1 + (W - pool_width) // stride | |
out = np.zeros((N, C, out_H, out_W)) | |
for i in range(N): | |
curr_out = np.zeros((C, out_H * out_W)) | |
c = 0 | |
for j in range(0, H - pool_height + 1, stride): | |
for k in range(0, W - pool_width + 1, stride): | |
curr_region = x[i, :, j:j+pool_height, k:k+pool_width].reshape(C, pool_height * pool_width) | |
curr_max_pool = np.max(curr_region, axis=1) | |
curr_out[:, c] = curr_max_pool | |
c += 1 | |
out[i, :, :, :] = curr_out.reshape(C, out_H, out_W) | |
cache = (x, pool_height, pool_width, stride) | |
return out, cache | |
def max_pool_backward(dout, cache): | |
""" | |
Max pooling layer backpropagation. | |
dout: downstream derivative | |
cache: cache from forward pass | |
""" | |
x, pool_height, pool_width, stride = cache | |
N, C, H, W = x.shape | |
_, _, out_H, out_W = dout.shape | |
dx = np.zeros_like(x) | |
for i in range(N): | |
curr_dout = dout[i, :].reshape(C, out_H * out_W) | |
c = 0 | |
for j in range(0, H - pool_height + 1, stride): | |
for k in range(0, W - pool_width + 1, stride): | |
curr_region = x[i, :, j:j+pool_height, k:k+pool_width].reshape(C, pool_height * pool_width) | |
curr_max_idx = np.argmax(curr_region, axis=1) | |
curr_dout_region = curr_dout[:, c] | |
curr_dpooling = np.zeros_like(curr_region) | |
curr_dpooling[np.arange(C), curr_max_idx] = curr_dout_region | |
dx[i, :, j:j+pool_height, k:k+pool_height] = curr_dpooling.reshape(C, pool_height, pool_width) | |
c += 1 | |
return dx | |
def fc_forward(x, w, b): | |
""" | |
Fully-connected layer forward pass. | |
x: input of shape (N, C, W, H) | |
w: weight matrix of shape (D, M) | |
b: bias of shape (M, ) | |
""" | |
N = x.shape[0] | |
x_new = x.reshape(N, -1) | |
out = np.dot(x_new, w) + b | |
cache = (x, w, b) | |
return out, cache | |
def fc_backward(dout, cache): | |
""" | |
Fully-connected layer backpropagation. | |
dout: downstream derivative | |
cache: cache from forward pass | |
""" | |
x, w, b = cache | |
N = x.shape[0] | |
x_new = x.reshape(N, -1) | |
dx = np.dot(dout, w.T).reshape(x.shape) | |
dw = np.dot(x_new.T, dout) | |
db = np.sum(dout.T, axis=1) | |
return dx, dw, db | |
def softmax_loss(x, y): | |
N = x.shape[0] | |
# stable softmax | |
x = x - np.max(x, axis=1, keepdims=True) | |
numerator = np.exp(x) | |
probs = numerator / np.sum(numerator, axis=1, keepdims=True) | |
# compute loss | |
loss = -np.sum(np.log(probs[np.arange(N), y])) / N | |
# compute derivative | |
dx = probs.copy() | |
dx[np.arange(N), y] -= 1 | |
dx /= N | |
return loss, dx | |
"""## Model class ##""" | |
class ConvNet(object): | |
def __init__(self, | |
input_dim=(1, 28, 28), | |
hidden_dim=64, | |
num_classes=10, | |
weight_scale=0.01, | |
reg=0.0): | |
C, H, W = input_dim | |
self.W1 = np.random.normal(0.0, weight_scale, (64, C, 3, 3)) | |
self.b1 = np.zeros((64, )) | |
self.W2 = np.random.normal(0.0, weight_scale, (64, 64, 3, 3)) | |
self.b2 = np.zeros((64, )) | |
# spatial size after 2 max pooling layers | |
conv_out_H = 28 // 4 | |
conv_out_W = 28 // 4 | |
self.W3 = np.random.randn(64 * conv_out_H * conv_out_W, hidden_dim) * np.sqrt(2.0 / (64 * conv_out_H * conv_out_W)) | |
self.b3 = np.zeros((hidden_dim, )) | |
self.W4 = np.random.randn(hidden_dim, num_classes) * np.sqrt(2.0 / hidden_dim) | |
self.b4 = np.zeros((num_classes, )) | |
self.reg = reg | |
def forward(self, x): | |
# forward pass | |
x, conv1_cache = conv_forward(x, self.W1, self.b1) | |
x, relu1_cache = relu_forward(x) | |
x, pool1_cache = max_pool_forward(x) | |
x, conv2_cache = conv_forward(x, self.W2, self.b2) | |
x, relu2_cache = relu_forward(x) | |
x, pool2_cache = max_pool_forward(x) | |
x, fc1_cache = fc_forward(x, self.W3, self.b3) | |
x, relu3_cache = relu_forward(x) | |
out, fc2_cache = fc_forward(x, self.W4, self.b4) | |
caches = (conv1_cache, relu1_cache, pool1_cache, conv2_cache, relu2_cache, pool2_cache, fc1_cache, relu3_cache, fc2_cache) | |
return out, caches | |
def loss(self, x, y): | |
# forward pass | |
out, caches = self.forward(x) | |
conv1_cache, relu1_cache, pool1_cache, conv2_cache, relu2_cache, pool2_cache, fc1_cache, relu3_cache, fc2_cache = caches | |
# softmax loss | |
loss, dout = softmax_loss(out, y) | |
# backprop | |
dout, dW4, db4 = fc_backward(dout, fc2_cache) | |
dout = relu_backward(dout, relu3_cache) | |
dout, dW3, db3 = fc_backward(dout, fc1_cache) | |
dout = max_pool_backward(dout, pool2_cache) | |
dout = relu_backward(dout, relu2_cache) | |
dout, dW2, db2 = conv_backward(dout, conv2_cache) | |
dout = max_pool_backward(dout, pool1_cache) | |
dout = relu_backward(dout, relu1_cache) | |
dout, dW1, db1 = conv_backward(dout, conv1_cache) | |
grads = { | |
'W1': dW1 + self.reg * self.W1, | |
'b1': db1, | |
'W2': dW2 + self.reg * self.W2, | |
'b2': db2, | |
'W3': dW3 + self.reg * self.W3, | |
'b3': db3, | |
'W4': dW4 + self.reg * self.W4, | |
'b4': db4 | |
} | |
return loss, grads | |
def create_minibatch(self, x, y, batch_size=128): | |
mini_batches = [] | |
num_examples = x.shape[0] | |
num_batches = num_examples // batch_size | |
i = 0 | |
for i in range(num_batches): | |
x_mini = x[i*batch_size:(i+1)*batch_size, :] | |
y_mini = y[i*batch_size:(i+1)*batch_size] | |
mini_batches.append((x_mini, y_mini)) | |
if num_examples % batch_size != 0: | |
x_mini = x[i*batch_size:, :] | |
y_mini = y[i*batch_size:] | |
mini_batches.append((x_mini, y_mini)) | |
return mini_batches | |
def train(self, x, y, lr=1e-4, batch_size=128, epochs=10): | |
mini_batches = self.create_minibatch(x, y, batch_size) | |
print('Splitted the training set into {} mini batches. \n'.format(len(mini_batches))) | |
loss_history = [] | |
for epoch in tqdm(range(epochs)): | |
# print('Epoch {}/{}: \n'.format(epoch + 1, epochs)) | |
for mini_batch in tqdm(mini_batches,leave=False): | |
x_mini, y_mini = mini_batch | |
loss, grads = self.loss(x_mini, y_mini) | |
# update parameters | |
self.W1 -= lr * grads['W1'] | |
self.b1 -= lr * grads['b1'] | |
self.W2 -= lr * grads['W2'] | |
self.b2 -= lr * grads['b2'] | |
self.W3 -= lr * grads['W3'] | |
self.b3 -= lr * grads['b3'] | |
self.W4 -= lr * grads['W4'] | |
self.b4 -= lr * grads['b4'] | |
loss_history.append(loss) | |
# print('Loss: {}'.format(loss)) | |
return loss_history | |
def eval(self, x, y): | |
# forward | |
out, _ = self.forward(x) | |
out = out - np.max(out, axis=1, keepdims=True) | |
numerator = np.exp(out) | |
probs = numerator / np.sum(numerator, axis=1, keepdims=True) | |
# get predictions | |
predictions = np.argmax(probs, axis=1) | |
# get accuracy | |
accuracy = np.mean(predictions == y) | |
return accuracy | |
### Train CNN | |
model = ConvNet() | |
losses = model.train(x_train, y_train, epochs=5) | |
## Evaluation on the test set ## | |
eval_acc = model.eval(x_test, y_test) | |
print('Evaluation accuracy: {}'.format(eval_acc)) | |
# Examples from the test set | |
indices = np.random.randint(0, len(x_test), 9) | |
x_sample = x_test[indices, :] | |
y_sample = y_test[indices] | |
out, _ = model.forward(x_sample) | |
out = out - np.max(out, axis=1, keepdims=True) | |
numerator = np.exp(out) | |
probs = numerator / np.sum(numerator, axis=1, keepdims=True) | |
sample_predictions = np.argmax(probs, axis=1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment