Created
February 1, 2017 03:54
-
-
Save nishnik/1d3d625f83a707976402f697d14d5bda to your computer and use it in GitHub Desktop.
Variational Auto-Encoder introduction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# relu activation function | |
def relu(x): | |
return T.switch(x<0, 0, x) | |
class VAE: | |
"""This class implements the Variational Auto Encoder""" | |
def __init__(self, continuous, hu_encoder, hu_decoder, n_latent, x_train, b1=0.95, b2=0.999, batch_size=100, learning_rate=0.001, lam=0): | |
# let us keep the discussion to not continuous | |
self.continuous = continuous | |
# number of hidden units in encoder | |
self.hu_encoder = hu_encoder | |
# number of hidden units in decoder | |
self.hu_decoder = hu_decoder | |
# number of latent units | |
self.n_latent = n_latent | |
# number of training sample and number of features | |
[self.N, self.features] = x_train.shape | |
# a fixed random state so that you can reproduce the results | |
self.prng = np.random.RandomState(42) | |
# bias and learning rate | |
self.b1 = b1 | |
self.b2 = b2 | |
self.learning_rate = learning_rate | |
self.lam = lam | |
self.batch_size = batch_size | |
# standard deviation for weight initialization | |
sigma_init = 0.01 | |
# these two are lambda function | |
# create_weight takes (dim_input, dim_output) as input argument and will return a theano matrix of size=(dim_input, dim_output) whose values are taken from a normal distribution with mean=0 and standard deviation=`sigma_init` | |
create_weight = lambda dim_input, dim_output: self.prng.normal(0, sigma_init, (dim_input, dim_output)).astype(theano.config.floatX) | |
# create_bias takes (dim_output) as input and will return a theano vector of size=dim_output with each value=0 | |
create_bias = lambda dim_output: np.zeros(dim_output).astype(theano.config.floatX) | |
# ENCODER | |
# create weight matrix and biases for pass from input features to hidden weights | |
W_xh = theano.shared(create_weight(self.features, hu_encoder), name='W_xh') | |
b_xh = theano.shared(create_bias(hu_encoder), name='b_xh') | |
# create weight matrix and biases for inference network (approximate a normal curve) | |
W_hmu = theano.shared(create_weight(hu_encoder, n_latent), name='W_hmu') | |
b_hmu = theano.shared(create_bias(n_latent), name='b_hmu') | |
W_hsigma = theano.shared(create_weight(hu_encoder, n_latent), name='W_hsigma') | |
b_hsigma = theano.shared(create_bias(n_latent), name='b_hsigma') | |
# DECODER | |
# create weight matrix and biases for decoder network | |
W_zh = theano.shared(create_weight(n_latent, hu_decoder), name='W_zh') | |
b_zh = theano.shared(create_bias(hu_decoder), name='b_zh') | |
# A dict for parameters, will be used for back-propagation | |
self.params = OrderedDict([("W_xh", W_xh), ("b_xh", b_xh), ("W_hmu", W_hmu), ("b_hmu", b_hmu), | |
("W_hsigma", W_hsigma), ("b_hsigma", b_hsigma), ("W_zh", W_zh), | |
("b_zh", b_zh)]) | |
if self.continuous: | |
# Lets leave this for now | |
W_hxmu = theano.shared(create_weight(hu_decoder, self.features), name='W_hxmu') | |
b_hxmu = theano.shared(create_bias(self.features), name='b_hxmu') | |
W_hxsig = theano.shared(create_weight(hu_decoder, self.features), name='W_hxsigma') | |
b_hxsig = theano.shared(create_bias(self.features), name='b_hxsigma') | |
self.params.update({'W_hxmu': W_hxmu, 'b_hxmu': b_hxmu, 'W_hxsigma': W_hxsig, 'b_hxsigma': b_hxsig}) | |
else: | |
# create weight matrix and biases for reconstruction of features from decoder | |
W_hx = theano.shared(create_weight(hu_decoder, self.features), name='W_hx') | |
b_hx = theano.shared(create_bias(self.features), name='b_hx') | |
# Update the dict of parameters | |
self.params.update({'W_hx': W_hx, 'b_hx': b_hx}) | |
# Adam parameters | |
self.m = OrderedDict() | |
self.v = OrderedDict() | |
# In this model adam optimizer have been used for optimizing each parameter, as per Adam parameter's config initialization first and second moment is dont to zero vector | |
for key, value in self.params.items(): | |
self.m[key] = theano.shared(np.zeros_like(value.get_value()).astype(theano.config.floatX), name='m_' + key) | |
self.v[key] = theano.shared(np.zeros_like(value.get_value()).astype(theano.config.floatX), name='v_' + key) | |
x_train = theano.shared(x_train.astype(theano.config.floatX), name="x_train") | |
# Go to create_gradientfunctions firstly | |
self.update, self.likelihood, self.encode, self.decode = self.create_gradientfunctions(x_train) | |
def encoder(self, x): | |
# h_encoder = activation_function (Matrix multiplication of input feature vector ('x') with weight matrix for pass from input features to hidden weights) | |
h_encoder = relu(T.dot(x, self.params['W_xh']) + self.params['b_xh'].dimshuffle('x', 0)) | |
# calculate the mean and standard deviation from inference network | |
mu = T.dot(h_encoder, self.params['W_hmu']) + self.params['b_hmu'].dimshuffle('x', 0) | |
log_sigma = T.dot(h_encoder, self.params['W_hsigma']) + self.params['b_hsigma'].dimshuffle('x', 0) | |
return mu, log_sigma | |
def sampler(self, mu, log_sigma): | |
seed = 42 | |
if "gpu" in theano.config.device: | |
srng = theano.sandbox.cuda.rng_curand.CURAND_RandomStreams(seed=seed) | |
else: | |
srng = T.shared_randomstreams.RandomStreams(seed=seed) | |
eps = srng.normal(mu.shape) | |
# Reparametrize (See here: http://blog.shakirm.com/2015/10/machine-learning-trick-of-the-day-4-reparameterisation-tricks/) | |
# This is essential for backpropagation, we are approximating a normal distribution | |
z = mu + T.exp(0.5 * log_sigma) * eps | |
return z | |
def decoder(self, x, z): | |
# calculate decoder neuron values | |
h_decoder = relu(T.dot(z, self.params['W_zh']) + self.params['b_zh'].dimshuffle('x', 0)) | |
if self.continuous: | |
reconstructed_x = T.dot(h_decoder, self.params['W_hxmu']) + self.params['b_hxmu'].dimshuffle('x', 0) | |
log_sigma_decoder = T.dot(h_decoder, self.params['W_hxsigma']) + self.params['b_hxsigma'] | |
logpxz = (-(0.5 * np.log(2 * np.pi) + 0.5 * log_sigma_decoder) - | |
0.5 * ((x - reconstructed_x)**2 / T.exp(log_sigma_decoder))).sum(axis=1) | |
else: | |
# multiply decoder layer with output matrix (last one) to reconstruct input and find loss through binary crossentropy | |
# binary crossentropy because author has used MNIST | |
reconstructed_x = T.nnet.sigmoid(T.dot(h_decoder, self.params['W_hx']) + self.params['b_hx'].dimshuffle('x', 0)) | |
logpxz = - T.nnet.binary_crossentropy(reconstructed_x, x).sum(axis=1) | |
return reconstructed_x, logpxz | |
def create_gradientfunctions(self, x_train): | |
x = T.matrix("x") | |
epoch = T.scalar("epoch") | |
batch_size = x.shape[0] | |
# Calculate the mean and std deviation for approximating the input representation to a normal distribution | |
mu, log_sigma = self.encoder(x) | |
# use the normal distribution to sample a latent variable | |
z = self.sampler(mu, log_sigma) | |
# use the latent variable to reconstruct input | |
reconstructed_x, logpxz = self.decoder(x,z) | |
# Expectation of (logpz - logqz_x) over logqz_x is equal to KLD (see appendix B): | |
KLD = 0.5 * T.sum(1 + log_sigma - mu**2 - T.exp(log_sigma), axis=1) | |
# Average over batch dimension | |
# KLD is used as a regularizer, so that our encoder doesn't cheat us | |
logpx = T.mean(logpxz + KLD) | |
# Compute all the gradients | |
gradients = T.grad(logpx, self.params.values()) | |
# Adam implemented as updates | |
updates = self.get_adam_updates(gradients, epoch) | |
batch = T.iscalar('batch') | |
givens = { | |
x: x_train[batch*self.batch_size:(batch+1)*self.batch_size, :] | |
} | |
# Define a bunch of functions for convenience | |
update = theano.function([batch, epoch], logpx, updates=updates, givens=givens) | |
likelihood = theano.function([x], logpx) | |
encode = theano.function([x], z) | |
decode = theano.function([z], reconstructed_x) | |
return update, likelihood, encode, decode | |
# A simple adam optimizer implementation | |
def get_adam_updates(self, gradients, epoch): | |
updates = OrderedDict() | |
gamma = T.sqrt(1 - self.b2**epoch) / (1 - self.b1**epoch) | |
values_iterable = zip(self.params.keys(), self.params.values(), gradients, | |
self.m.values(), self.v.values()) | |
for name, parameter, gradient, m, v in values_iterable: | |
new_m = self.b1 * m + (1. - self.b1) * gradient | |
new_v = self.b2 * v + (1. - self.b2) * (gradient**2) | |
updates[parameter] = parameter + self.learning_rate * gamma * new_m / (T.sqrt(new_v) + epsilon) | |
if 'W' in name: | |
# MAP on weights (same as L2 regularization) | |
updates[parameter] -= self.learning_rate * self.lam * (parameter * np.float32(self.batch_size / self.N)) | |
updates[m] = new_m | |
updates[v] = new_v | |
return updates |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment