Last active
December 2, 2021 23:09
-
-
Save jiunbae/a1980db474dea5890a74acdbe3e0d36d to your computer and use it in GitHub Desktop.
제1회 프로그래머스 배 띠용 코드 챌린지 ~ 한 줄로 짤 코드를 누가 436 줄로 만들어놨어요? ~
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Author: Jiun Bae (2019/08/18) | |
## ## See also (https://github.com/jiunbae/ITE4053/tree/master/NumpyNeuralNetwork) | |
from typing import Optional, List, Union, Tuple, Iterable, Callable | |
from functools import reduce | |
import numpy as np | |
## Class Definition | |
# _Module | |
# |-- Activation | |
# | |-- ReLU | |
# | |-- Sigmoid | |
# |-- Loss | |
# | |-- MSELoss | |
# | |-- BCELoss | |
# |-- Layer | |
# |-- Dense | |
# |-- Sequential | |
# Metric | |
# |-- Accuracy | |
# Optimizer | |
# |-- SGD | |
class _Module(object): | |
def __init__(self, *args): | |
self._last_input, self._last_output = None, None | |
def __call__(self, X: np.ndarray) \ | |
-> np.ndarray: | |
return self.forward(X) | |
def forward(self, X: np.ndarray, grad: bool = True) \ | |
-> np.ndarray: | |
if grad: | |
if self._last_input is not None: | |
pass #backward is not called after forward. | |
self._last_input = X | |
return self._last_input | |
def after_forward(self, output: np.ndarray) \ | |
-> np.ndarray: | |
self._last_output = output | |
return self._last_output | |
def backward(self, Y: np.ndarray) \ | |
-> np.ndarray: | |
if self._last_input is None: | |
raise Exception("backward call must after forward.") | |
result = self._last_input | |
self._last_input = None | |
return result | |
def after_backward(self, output: np.ndarray) \ | |
-> np.ndarray: | |
return output | |
def update(self, *args): | |
pass | |
class Activation(_Module): | |
pass | |
class Sigmoid(Activation): | |
name = 'sigmoid' | |
def forward(self, X: np.ndarray, grad: bool = True) -> np.ndarray: | |
super(Sigmoid, self).forward(X, grad) | |
return 1. / (1. + np.exp(-X)) | |
def backward(self, grad: np.ndarray) -> np.ndarray: | |
last = super(Sigmoid, self).backward(grad) | |
result = self.forward(last, grad=False) | |
return grad * result * (1. - result) | |
class ReLU(Activation): | |
name = 'relu' | |
def forward(self, X: np.ndarray, grad: bool = True) -> np.ndarray: | |
super(ReLU, self).forward(X, grad) | |
return np.maximum(0, X) | |
def backward(self, grad: np.ndarray) -> np.ndarray: | |
last = super(ReLU, self).backward(grad) | |
grad = grad.copy() | |
grad[last <= 0] = 0 | |
return grad | |
class Loss(_Module): | |
def __int__(self, *args): | |
super(Loss, self).__init__() | |
def __call__(self, inputs: np.ndarray, targets: np.ndarray) \ | |
-> np.ndarray: | |
return self.forward(inputs, targets) | |
def forward(self, inputs: np.ndarray, targets: np.ndarray) \ | |
-> np.ndarray: | |
return np.zeros(0) | |
class MSELoss(Loss): | |
name = 'mean_squared_error' | |
def __int__(self, *args): | |
super(MSELoss, self).__init__(*args) | |
def forward(self, inputs: np.ndarray, targets: np.ndarray) \ | |
-> np.ndarray: | |
super(MSELoss, self).forward(inputs, targets) | |
cost = np.mean((inputs - targets) ** 2) | |
return cost | |
class BCELoss(Loss): | |
name = 'binary_crossentropy' | |
def __init__(self, *args): | |
super(BCELoss, self).__init__(*args) | |
def forward(self, inputs: np.ndarray, targets: np.ndarray) \ | |
-> np.ndarray: | |
super(BCELoss, self).forward(inputs, targets) | |
size = np.size(inputs, -1) | |
cost = -1 / size * (np.dot(targets, np.log(inputs).T) + | |
np.dot(1 - targets, np.log(1 - inputs).T)) | |
return np.squeeze(cost) | |
class Metric: | |
def __call__(self, inputs: np.ndarray, targets: np.ndarray) \ | |
-> float: | |
return 0 | |
class Accuracy(Metric): | |
name = 'accuracy' | |
def __call__(self, inputs: np.ndarray, targets: np.ndarray) \ | |
-> float: | |
return ((inputs * 81).round() == (targets * 81).round()).all(axis=0).mean() | |
class Optimizer(object): | |
def __init__(self, *args, **kwargs): | |
self.iteration = 0 | |
def __iter__(self): | |
return self | |
def __next__(self): | |
self.iteration += 1 | |
def get_update(self, params: np.ndarray, grads: np.ndarray): | |
pass | |
class SGD(Optimizer): | |
def __init__(self, lr: float = .5, **kwargs): | |
super(SGD, self).__init__(**kwargs) | |
self.lr = lr | |
def __next__(self): | |
super(SGD, self).__next__() | |
self.lr *= .9999999 | |
def get_update(self, params: np.ndarray, grads: np.ndarray): | |
return params - self.lr * grads | |
def collect(name: str, klass: type) \ | |
-> object: | |
def caller_wrapper(f: Callable) \ | |
-> object: | |
return lambda *args, **kwargs: f(*args, **kwargs) | |
return type(name, (object, ), { | |
getattr(k, 'name', k.__name__): caller_wrapper(k) | |
for k in klass.__subclasses__() | |
}) | |
activations = collect('Activations', Activation) | |
optimizers = collect('Optimizers', Optimizer) | |
losses = collect('Losses', Loss) | |
metrics = collect('Metrics', Metric) | |
class Layer(_Module): | |
def __init__(self, output_dim: int, input_dim: int, | |
activation: Optional[Activation] = None, | |
*args): | |
super(Layer, self).__init__(*args) | |
self.output_dim = output_dim | |
self.input_dim = input_dim | |
self.activation = activation and activation() | |
self.dW, self.db = 0, 0 | |
def __call__(self, X: np.ndarray, grad: bool = True) \ | |
-> np.ndarray: | |
return self.forward(X, grad) | |
def after_forward(self, output: np.ndarray) \ | |
-> np.ndarray: | |
output = super(Layer, self).after_forward(output) | |
if self.activation is not None: | |
output = self.activation.forward(output) | |
return output | |
def backward(self, grad: np.ndarray) \ | |
-> np.ndarray: | |
if self.activation is not None: | |
grad = self.activation.backward(grad) | |
return grad | |
def update(self, optimizer: Optimizer): | |
super(Layer, self).update() | |
# clear last input and output for next step | |
_ = self.last_input, self.last_output | |
@property | |
def last_input(self) \ | |
-> np.ndarray: | |
result = self._last_input | |
self._last_input = None | |
return result | |
@property | |
def last_output(self)\ | |
-> np.ndarray: | |
result = self._last_output | |
self._last_output = None | |
return result | |
@property | |
def parameters(self) \ | |
-> np.ndarray: | |
return np.empty(0) | |
class Dense(Layer): | |
def __init__(self, output_dim: int, input_dim: int, | |
activation: Optional[Activation] = None, | |
*args): | |
super(Dense, self).__init__(output_dim, input_dim, activation, *args) | |
self.params = np.random.randn(self.output_dim, self.input_dim) * .1 | |
self.bias = np.random.randn(self.output_dim, 1) * .1 | |
def forward(self, X: np.ndarray, grad: bool = True) \ | |
-> np.ndarray: | |
super(Dense, self).forward(X, grad) | |
result = np.dot(self.params, X) + self.bias | |
return self.after_forward(result) | |
def backward(self, grad: np.ndarray) \ | |
-> np.ndarray: | |
grad = super(Dense, self).backward(grad) | |
size = np.size(grad, -1) | |
self.dW = np.dot(grad, self._last_input.T) / size | |
self.db = np.sum(grad, axis=1, keepdims=True) / size | |
result = np.dot(self.params.T, grad) | |
return result | |
def update(self, optimizer: Optimizer): | |
super(Dense, self).update(optimizer) | |
self.parameters = optimizer.get_update(self.parameters, np.hstack([self.dW, self.db])) | |
self.dW, self.db = 0, 0 | |
@property | |
def parameters(self) \ | |
-> np.ndarray: | |
return np.hstack([ | |
self.params, | |
self.bias, | |
]) | |
@parameters.setter | |
def parameters(self, parameters: np.ndarray): | |
self.params = parameters[:, :-1].reshape(self.params.shape) | |
self.bias = parameters[:, -1].reshape(self.bias.shape) | |
@property | |
def size(self) \ | |
-> int: | |
return self.parameters.size | |
class Sequential(Layer): | |
def __init__(self, layers: List[Layer]): | |
super(Sequential, self).__init__( | |
layers[0].input_dim, layers[-1].output_dim) | |
self.layers: Iterable[Layer] = layers | |
self.optimizer: Optional[Optimizer] = None | |
self.loss: Optional[Loss] = None | |
self.metric: Optional[Metric] = None | |
def forward(self, X: np.ndarray, grad: bool = True) \ | |
-> np.ndarray: | |
super(Sequential, self).forward(X, grad) | |
result = reduce(lambda inputs, layer: layer.forward( | |
inputs, grad), [X, *self.layers]) | |
result += 1e-8 | |
return self.after_forward(result) | |
def backward(self, Y: np.ndarray) \ | |
-> np.ndarray: | |
super(Sequential, self).backward(Y) | |
grad = -(np.divide(Y, self._last_output) - | |
np.divide(1 - Y, 1 - self._last_output)) | |
result = reduce(lambda g, layer: layer.backward(g), | |
[grad, *reversed(self.layers)]) | |
return result | |
def update(self): | |
super(Sequential, self).update(self.optimizer) | |
any(map(lambda layer: layer.update(self.optimizer), | |
filter(lambda layer: issubclass(type(layer), Layer), self.layers))) | |
next(self.optimizer) | |
def compile(self, | |
optimizer: Union[str, Optimizer], | |
loss: Union[str, Loss], | |
metric: Union[str, Metric]): | |
self.optimizer = optimizer if isinstance(optimizer, Optimizer) \ | |
else getattr(optimizers, optimizer)() | |
self.loss = loss if isinstance(loss, Loss) \ | |
else getattr(losses, loss)() | |
self.metric = metric if isinstance(metric, Metric) \ | |
else getattr(metrics, metric)() | |
def fit(self, X: np.ndarray, Y: np.ndarray, | |
epochs: int = 1000): | |
for epoch in range(epochs): | |
forward = self.forward(X.T) | |
loss = self.loss(forward, Y) | |
self.backward(Y) | |
self.update() | |
def evaluate(self, X: np.ndarray, Y: np.ndarray) \ | |
-> Tuple[Union[float, np.ndarray], float]: | |
forward = self.forward(X.T, grad=False) | |
loss = self.loss(forward, Y) | |
return loss, self.metric(forward, Y) | |
@property | |
def size(self) \ | |
-> int: | |
return sum(map(lambda x: x.size, self.layers)) | |
@property | |
def parameters(self) \ | |
-> Iterable[np.ndarray]: | |
for layer in self.layers: | |
yield layer.parameters | |
## The answer to life the universe and everything | |
np.random.seed(42) | |
## Create training data 1 x 1 to 9 x 9 | |
X = np.transpose(np.array(np.meshgrid(np.arange(1, 10), np.arange(1, 10))), (2, 1, 0)).reshape(-1, 2) | |
Y = (X[:, 0] * X[:, 1]) / 81 | |
model = Sequential([ | |
Dense(81, input_dim=2, activation=ReLU), | |
Dense(81, input_dim=81, activation=ReLU), | |
Dense(81, input_dim=81, activation=ReLU), | |
Dense(1, input_dim=81, activation=Sigmoid), | |
]) | |
## Model training takes at least 5 minutes. | |
model.compile(optimizer=SGD(), loss=BCELoss(), metric=Accuracy()) | |
model.fit(X, Y, epochs=36000) | |
results = (model.forward(X.T, grad=False) * 81).round().squeeze() | |
for (i, j), k in zip(X, results): | |
print(f'{i} x {j} = {int(k)}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
와~~ 대단해~~