Created
March 25, 2016 03:09
-
-
Save jxnl/96904fa441b8488ff5eb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import division | |
| from tqdm import * | |
| import chainer | |
| from chainer import functions as F | |
| from chainer import links as L | |
| import numpy as np | |
| class FullyConnected(Chain): | |
| def __init__(self, layers): | |
| self.layers = layers | |
| self._layers = [] | |
| for i, (n_in, n_out) in enumerate(zip(layers, layers[1:])): | |
| self.append(L.Linear(n_in, n_out)) | |
| super(FullyConnected, self).__init__( | |
| { | |
| idz: layer | |
| for idz, layer in enumerate(self._layers) | |
| } | |
| ) | |
| def __call__(self, x, train=False): | |
| batch = x | |
| n_layers = len(self.encode_layers) | |
| for layer in range(n_layers): | |
| batch = F.dropout(F.relu(getattr(self, layer)(batch)), train=train) | |
| return batch | |
| class BinaryEncoder(Chain): | |
| def __init__(self, optimizer, enc=[1000, 400], code=16, dec=[400, 1000], gain=5000): | |
| super(BinaryEncoder, self).__init__( | |
| encoder=FullyConnected(layers=enc + [code]), | |
| dencoder=FullyConnected(layers=[code] + dec), | |
| ) | |
| self.gain = gain | |
| self.ramp_gain = None | |
| self.opt = optimizer | |
| def _binarize(self, x): | |
| return F.clipped_relu(x, 1 / self.slope) * int(self.slope) | |
| def encode(self, x, train=False): | |
| code = self.enc(x, train=train) | |
| code = self._binarize(code) | |
| return code | |
| def decode(self, x, train=False): | |
| out = self.decoder(x, train=train) | |
| return out | |
| def forward(self, x, train=False): | |
| code = self.encode(x, train=train) | |
| output = self.decode(code, train=train) | |
| return output | |
| def compute_loss(self, x, loss_function, y=None, func=None, train=True): | |
| reconstructed = self.forward(x, train=train) | |
| reconstructed = func(reconstructed) if func else reconstructed | |
| target = y if y else x | |
| loss = loss_function(reconstructed, target) | |
| return loss | |
| def fit(self, X, y=None, epochs=60, batchsize=2 ** 16): | |
| datasize = len(X) | |
| for _ in tqdm(range(epochs)): | |
| if self.ramp_gain: | |
| self.gain *= self.ramp_gain | |
| for i in range(0, datasize, batchsize): | |
| # construct minibatch | |
| idx = np.random.permutation(datasize) | |
| x = X.iloc[idx[i: i + batchsize]].values | |
| # gradients | |
| self.zerograds() | |
| loss = self.compute_loss(x) | |
| loss.backward() | |
| self.opt.update() | |
| return self | |
| def save(self): | |
| raise NotImplementedError | |
| def load(self): | |
| raise NotImplementedError |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This currently does not implement saving, loading, and profiling.
I'd also like there to be some asserts to check data sizes