Skip to content

Instantly share code, notes, and snippets.

@gabrieleangeletti
Last active October 15, 2019 15:16
Show Gist options
  • Save gabrieleangeletti/3a6e4d512d3aa8aa6cf9 to your computer and use it in GitHub Desktop.
Save gabrieleangeletti/3a6e4d512d3aa8aa6cf9 to your computer and use it in GitHub Desktop.
Denoising Autoencoder implementation using TensorFlow.
import tensorflow as tf
import numpy as np
import os
import zconfig
import utils
class DenoisingAutoencoder(object):
""" Implementation of Denoising Autoencoders using TensorFlow.
The interface of the class is sklearn-like.
"""
def __init__(self, model_name='dae', n_components=256, main_dir='dae/', enc_act_func='tanh',
dec_act_func='none', loss_func='mean_squared', num_epochs=10, batch_size=10, dataset='mnist',
xavier_init=1, opt='gradient_descent', learning_rate=0.01, momentum=0.5, corr_type='none',
corr_frac=0., verbose=1, seed=-1):
"""
:param main_dir: main directory to put the models, data and summary directories
:param n_components: number of hidden units
:param enc_act_func: Activation function for the encoder. ['tanh', 'sigmoid']
:param dec_act_func: Activation function for the decoder. ['tanh', 'sigmoid']
:param loss_func: Loss function. ['mean_squared', 'cross_entropy']
:param xavier_init: Value of the constant for xavier weights initialization
:param opt: Which tensorflow optimizer to use. ['gradient_descent', 'momentum', 'ada_grad']
:param learning_rate: Initial learning rate
:param momentum: Momentum parameter
:param corr_type: Type of input corruption. ["none", "masking", "salt_and_pepper"]
:param corr_frac: Fraction of the input to corrupt.
:param verbose: Level of verbosity. 0 - silent, 1 - print accuracy.
:param num_epochs: Number of epochs
:param batch_size: Size of each mini-batch
:param dataset: Optional name for the dataset.
:param seed: positive integer for seeding random generators. Ignored if < 0.
"""
self.model_name = model_name
self.n_components = n_components
self.main_dir = main_dir
self.enc_act_func = enc_act_func
self.dec_act_func = dec_act_func
self.loss_func = loss_func
self.num_epochs = num_epochs
self.batch_size = batch_size
self.dataset = dataset
self.xavier_init = xavier_init
self.opt = opt
self.learning_rate = learning_rate
self.momentum = momentum
self.corr_type = corr_type
self.corr_frac = corr_frac
self.verbose = verbose
self.seed = seed
if self.seed >= 0:
np.random.seed(self.seed)
tf.set_random_seed(self.seed)
self.models_dir, self.data_dir, self.tf_summary_dir = self._create_data_directories()
self.model_path = self.models_dir + self.model_name
self.input_data = None
self.input_data_corr = None
self.W_ = None
self.bh_ = None
self.bv_ = None
self.encode = None
self.decode = None
self.train_step = None
self.cost = None
self.tf_session = None
self.tf_merged_summaries = None
self.tf_summary_writer = None
self.tf_saver = None
def fit(self, train_set, validation_set=None, restore_previous_model=False):
""" Fit the model to the data.
:param train_set: Training data.
:param validation_set: optional, default None. Validation data.
:param restore_previous_model:
if true, a previous trained model
with the same name of this model is restored from disk to continue training.
:return: self
"""
n_features = train_set.shape[1]
self._build_model(n_features)
with tf.Session() as self.tf_session:
self._initialize_tf_utilities_and_ops(restore_previous_model)
self._train_model(train_set, validation_set)
self.tf_saver.save(self.tf_session, self.models_dir + self.model_name)
def _initialize_tf_utilities_and_ops(self, restore_previous_model):
""" Initialize TensorFlow operations: summaries, init operations, saver, summary_writer.
Restore a previously trained model if the flag restore_previous_model is true.
"""
self.tf_merged_summaries = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
self.tf_saver = tf.train.Saver()
self.tf_session.run(init_op)
if restore_previous_model:
self.tf_saver.restore(self.tf_session, self.model_path)
self.tf_summary_writer = tf.train.SummaryWriter(self.tf_summary_dir, self.tf_session.graph_def)
def _train_model(self, train_set, validation_set):
"""Train the model.
:param train_set: training set
:param validation_set: validation set. optional, default None
:return: self
"""
corruption_ratio = np.round(self.corr_frac * train_set.shape[1]).astype(np.int)
for i in range(self.num_epochs):
self._run_train_step(train_set, corruption_ratio)
if i % 5 == 0:
if validation_set is not None:
self._run_validation_error_and_summaries(i, validation_set)
def _run_train_step(self, train_set, corruption_ratio):
""" Run a training step. A training step is made by randomly corrupting the training set,
randomly shuffling it, divide it into batches and run the optimizer for each batch.
:param train_set: training set
:param corruption_ratio: fraction of elements to corrupt
:return: self
"""
x_corrupted = self._corrupt_input(train_set, corruption_ratio)
shuff = zip(train_set, x_corrupted)
np.random.shuffle(shuff)
batches = [_ for _ in utils.gen_batches(shuff, self.batch_size)]
for batch in batches:
x_batch, x_corr_batch = zip(*batch)
tr_feed = {self.input_data: x_batch, self.input_data_corr: x_corr_batch}
self.tf_session.run(self.train_step, feed_dict=tr_feed)
def _corrupt_input(self, data, v):
""" Corrupt a fraction 'v' of 'data' according to the
noise method of this autoencoder.
:return: corrupted data
"""
if self.corr_type == 'masking':
x_corrupted = utils.masking_noise(data, v)
elif self.corr_type == 'salt_and_pepper':
x_corrupted = utils.salt_and_pepper_noise(data, v)
elif self.corr_type == 'none':
x_corrupted = data
else:
x_corrupted = None
return x_corrupted
def _run_validation_error_and_summaries(self, epoch, validation_set):
""" Run the summaries and error computation on the validation set.
:param epoch: current epoch
:param validation_set: validation data
:return: self
"""
vl_feed = {self.input_data: validation_set, self.input_data_corr: validation_set}
result = self.tf_session.run([self.tf_merged_summaries, self.cost], feed_dict=vl_feed)
summary_str = result[0]
err = result[1]
self.tf_summary_writer.add_summary(summary_str, epoch)
if self.verbose == 1:
print("Validation cost at step %s: %s" % (epoch, err))
def _build_model(self, n_features):
""" Creates the computational graph.
:type n_features: int
:param n_features: Number of features.
:return: self
"""
self.input_data, self.input_data_corr = self._create_placeholders(n_features)
self.W_, self.bh_, self.bv_ = self._create_variables(n_features)
self._create_encode_layer()
self._create_decode_layer()
self._create_cost_function_node()
self._create_train_step_node()
def _create_placeholders(self, n_features):
""" Create the TensorFlow placeholders for the model.
:return: tuple(input_data(shape(None, n_features)),
input_data_corr(shape(None, n_features)))
"""
input_data = tf.placeholder('float', [None, n_features], name='x-input')
input_data_corr = tf.placeholder('float', [None, n_features], name='x-corr-input')
return input_data, input_data_corr
def _create_variables(self, n_features):
""" Create the TensorFlow variables for the model.
:return: tuple(weights(shape(n_features, n_components)),
hidden bias(shape(n_components)),
visible bias(shape(n_features)))
"""
W_ = tf.Variable(utils.xavier_init(n_features, self.n_components, self.xavier_init), name='enc-w')
bh_ = tf.Variable(tf.zeros([self.n_components]), name='hidden-bias')
bv_ = tf.Variable(tf.zeros([n_features]), name='visible-bias')
return W_, bh_, bv_
def _create_encode_layer(self):
""" Create the encoding layer of the network.
:return: self
"""
with tf.name_scope("W_x_bh"):
if self.enc_act_func == 'sigmoid':
self.encode = tf.nn.sigmoid(tf.matmul(self.input_data_corr, self.W_) + self.bh_)
elif self.enc_act_func == 'tanh':
self.encode = tf.nn.tanh(tf.matmul(self.input_data_corr, self.W_) + self.bh_)
else:
self.encode = None
def _create_decode_layer(self):
""" Create the decoding layer of the network.
:return: self
"""
with tf.name_scope("Wg_y_bv"):
if self.dec_act_func == 'sigmoid':
self.decode = tf.nn.sigmoid(tf.matmul(self.encode, tf.transpose(self.W_)) + self.bv_)
elif self.dec_act_func == 'tanh':
self.decode = tf.nn.tanh(tf.matmul(self.encode, tf.transpose(self.W_)) + self.bv_)
elif self.dec_act_func == 'none':
self.decode = tf.matmul(self.encode, tf.transpose(self.W_)) + self.bv_
else:
self.decode = None
def _create_cost_function_node(self):
""" create the cost function node of the network.
:return: self
"""
with tf.name_scope("cost"):
if self.loss_func == 'cross_entropy':
self.cost = - tf.reduce_sum(self.input_data * tf.log(self.decode))
_ = tf.scalar_summary("cross_entropy", self.cost)
elif self.loss_func == 'mean_squared':
self.cost = tf.sqrt(tf.reduce_mean(tf.square(self.input_data - self.decode)))
_ = tf.scalar_summary("mean_squared", self.cost)
else:
self.cost = None
def _create_train_step_node(self):
""" create the training step node of the network.
:return: self
"""
with tf.name_scope("train"):
if self.opt == 'gradient_descent':
self.train_step = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.cost)
elif self.opt == 'ada_grad':
self.train_step = tf.train.AdagradOptimizer(self.learning_rate).minimize(self.cost)
elif self.opt == 'momentum':
self.train_step = tf.train.MomentumOptimizer(self.learning_rate, self.momentum).minimize(self.cost)
else:
self.train_step = None
def transform(self, data, name='train', save=False):
""" Transform data according to the model.
:param data: Data to transform
:param name: Identifier for the data that is being encoded
:param save: If true, save data to disk
:return: transformed data
"""
with tf.Session() as self.tf_session:
self.tf_saver.restore(self.tf_session, self.models_dir + self.model_name)
encoded_data = self.encode.eval({self.input_data_corr: data})
if save:
np.save(self.data_dir + self.model_name + '-' + name, encoded_data)
return encoded_data
def load_model(self, shape, model_path):
""" Restore a previously trained model from disk.
:param shape: tuple(n_features, n_components)
:param model_path: path to the trained model
:return: self, the trained model
"""
self.n_components = shape[1]
self._build_model(shape[0])
init_op = tf.initialize_all_variables()
self.tf_saver = tf.train.Saver()
with tf.Session() as self.tf_session:
self.tf_session.run(init_op)
self.tf_saver.restore(self.tf_session, model_path)
def get_model_parameters(self):
""" Return the model parameters in the form of numpy arrays.
:return: model parameters
"""
with tf.Session() as self.tf_session:
self.tf_saver.restore(self.tf_session, self.models_dir + self.model_name)
return {
'enc_w': self.W_.eval(),
'enc_b': self.bh_.eval(),
'dec_b': self.bv_.eval()
}
def _create_data_directories(self):
""" Create the three directories for storing respectively the models,
the data generated by training and the TensorFlow's summaries.
:return: tuple of strings(models_dir, data_dir, summary_dir)
"""
self.main_dir = self.main_dir + '/' if self.main_dir[-1] != '/' else self.main_dir
models_dir = zconfig.models_dir + self.main_dir
data_dir = zconfig.data_dir + self.main_dir
summary_dir = zconfig.summary_dir + self.main_dir
for d in [models_dir, data_dir, summary_dir]:
if not os.path.isdir(d):
os.mkdir(d)
return models_dir, data_dir, summary_dir
def get_weights_as_images(self, width, height, outdir='img/', max_images=10, model_path=None):
""" Save the weights of this autoencoder as images, one image per hidden unit.
Useful to visualize what the autoencoder has learned.
:type width: int
:param width: Width of the images
:type height: int
:param height: Height of the images
:type outdir: string, default 'data/sdae/img'
:param outdir: Output directory for the images. This path is appended to self.data_dir
:type max_images: int, default 10
:param max_images: Number of images to return.
"""
assert max_images <= self.n_components
outdir = self.data_dir + outdir
if not os.path.isdir(outdir):
os.mkdir(outdir)
with tf.Session() as self.tf_session:
if model_path is not None:
self.tf_saver.restore(self.tf_session, model_path)
else:
self.tf_saver.restore(self.tf_session, self.models_dir + self.model_name)
enc_weights = self.W_.eval()
perm = np.random.permutation(self.n_components)[:max_images]
for p in perm:
enc_w = np.array([i[p] for i in enc_weights])
image_path = outdir + self.model_name + '-enc_weights_{}.png'.format(p)
utils.gen_image(enc_w, width, height, image_path)
import tensorflow as tf
import autoencoder
import datasets
# #################### #
# Flags definition #
# #################### #
flags = tf.app.flags
FLAGS = flags.FLAGS
# Global configuration
flags.DEFINE_string('model_name', '', 'Model name.')
flags.DEFINE_string('dataset', 'mnist', 'Which dataset to use. ["mnist", "cifar10"]')
flags.DEFINE_string('cifar_dir', '', 'Path to the cifar 10 dataset directory.')
flags.DEFINE_integer('seed', -1, 'Seed for the random generators (>= 0). Useful for testing hyperparameters.')
flags.DEFINE_boolean('restore_previous_model', False, 'If true, restore previous model corresponding to model name.')
flags.DEFINE_boolean('encode_train', False, 'Whether to encode and store the training set.')
flags.DEFINE_boolean('encode_valid', False, 'Whether to encode and store the validation set.')
flags.DEFINE_boolean('encode_test', False, 'Whether to encode and store the test set.')
# Stacked Denoising Autoencoder specific parameters
flags.DEFINE_integer('n_components', 256, 'Number of hidden units in the dae.')
flags.DEFINE_string('corr_type', 'none', 'Type of input corruption. ["none", "masking", "salt_and_pepper"]')
flags.DEFINE_float('corr_frac', 0., 'Fraction of the input to corrupt.')
flags.DEFINE_integer('xavier_init', 1, 'Value for the constant in xavier weights initialization.')
flags.DEFINE_string('enc_act_func', 'tanh', 'Activation function for the encoder. ["sigmoid", "tanh"]')
flags.DEFINE_string('dec_act_func', 'none', 'Activation function for the decoder. ["sigmoid", "tanh", "none"]')
flags.DEFINE_string('main_dir', 'dae/', 'Directory to store data relative to the algorithm.')
flags.DEFINE_string('loss_func', 'mean_squared', 'Loss function. ["mean_squared" or "cross_entropy"]')
flags.DEFINE_integer('verbose', 0, 'Level of verbosity. 0 - silent, 1 - print accuracy.')
flags.DEFINE_integer('weight_images', 0, 'Number of weight images to generate.')
flags.DEFINE_string('opt', 'gradient_descent', '["gradient_descent", "ada_grad", "momentum"]')
flags.DEFINE_float('learning_rate', 0.01, 'Initial learning rate.')
flags.DEFINE_float('momentum', 0.5, 'Momentum parameter.')
flags.DEFINE_integer('num_epochs', 10, 'Number of epochs.')
flags.DEFINE_integer('batch_size', 10, 'Size of each mini-batch.')
assert FLAGS.dataset in ['mnist', 'cifar10']
assert FLAGS.enc_act_func in ['sigmoid', 'tanh']
assert FLAGS.dec_act_func in ['sigmoid', 'tanh', 'none']
assert FLAGS.corr_type in ['masking', 'salt_and_pepper', 'none']
assert 0. <= FLAGS.corr_frac <= 1.
assert FLAGS.loss_func in ['cross_entropy', 'mean_squared']
assert FLAGS.opt in ['gradient_descent', 'ada_grad', 'momentum']
if __name__ == '__main__':
if FLAGS.dataset == 'mnist':
# ################# #
# MNIST Dataset #
# ################# #
trX, vlX, teX = datasets.load_mnist_dataset(mode='unsupervised')
elif FLAGS.dataset == 'cifar10':
# ################### #
# Cifar10 Dataset #
# ################### #
trX, teX = datasets.load_cifar10_dataset(FLAGS.cifar_dir, mode='unsupervised')
vlX = teX[:5000] # Validation set is the first half of the test set
else: # cannot be reached, just for completeness
trX = None
vlX = None
teX = None
# Create the object
dae = autoencoder.DenoisingAutoencoder(
seed=FLAGS.seed, model_name=FLAGS.model_name, n_components=FLAGS.n_components,
enc_act_func=FLAGS.enc_act_func, dec_act_func=FLAGS.dec_act_func, xavier_init=FLAGS.xavier_init,
corr_type=FLAGS.corr_type, corr_frac=FLAGS.corr_frac, dataset=FLAGS.dataset,
loss_func=FLAGS.loss_func, main_dir=FLAGS.main_dir, opt=FLAGS.opt,
learning_rate=FLAGS.learning_rate, momentum=FLAGS.momentum,
verbose=FLAGS.verbose, num_epochs=FLAGS.num_epochs, batch_size=FLAGS.batch_size)
# Fit the model
dae.fit(trX, teX, restore_previous_model=FLAGS.restore_previous_model)
# Encode the training data and store it
dae.transform(trX, name='train', save=FLAGS.encode_train)
dae.transform(vlX, name='validation', save=FLAGS.encode_valid)
dae.transform(teX, name='test', save=FLAGS.encode_test)
# save images
dae.get_weights_as_images(28, 28, max_images=FLAGS.weight_images)
from scipy import misc
import tensorflow as tf
import numpy as np
# ############# #
# Utilities #
# ############# #
def xavier_init(fan_in, fan_out, const=1):
""" Xavier initialization of network weights.
https://stackoverflow.com/questions/33640581/how-to-do-xavier-initialization-on-tensorflow
:param fan_in: fan in of the network (n_features)
:param fan_out: fan out of the network (n_components)
:param const: multiplicative constant
"""
low = -const * np.sqrt(6.0 / (fan_in + fan_out))
high = const * np.sqrt(6.0 / (fan_in + fan_out))
return tf.random_uniform((fan_in, fan_out), minval=low, maxval=high)
def gen_batches(data, batch_size):
""" Divide input data into batches.
:param data: input data
:param batch_size: size of each batch
:return: data divided into batches
"""
data = np.array(data)
for i in range(0, data.shape[0], batch_size):
yield data[i:i+batch_size]
def masking_noise(X, v):
""" Apply masking noise to data in X, in other words a fraction v of elements of X
(chosen at random) is forced to zero.
:param X: array_like, Input data
:param v: int, fraction of elements to distort
:return: transformed data
"""
X_noise = X.copy()
n_samples = X.shape[0]
n_features = X.shape[1]
for i in range(n_samples):
mask = np.random.randint(0, n_features, v)
for m in mask:
X_noise[i][m] = 0.
return X_noise
def salt_and_pepper_noise(X, v):
""" Apply salt and pepper noise to data in X, in other words a fraction v of elements of X
(chosen at random) is set to its maximum or minimum value according to a fair coin flip.
If minimum or maximum are not given, the min (max) value in X is taken.
:param X: array_like, Input data
:param v: int, fraction of elements to distort
:return: transformed data
"""
X_noise = X.copy()
n_features = X.shape[1]
mn = X.min()
mx = X.max()
for i, sample in enumerate(X):
mask = np.random.randint(0, n_features, v)
for m in mask:
if np.random.random() < 0.5:
X_noise[i][m] = mn
else:
X_noise[i][m] = mx
return X_noise
def masking_noise(X, v):
""" Apply masking noise to data in X, in other words a fraction v of elements of X
(chosen at random) is forced to zero.
:param X: array_like, Input data
:param v: int, fraction of elements to distort
:return: transformed data
"""
X_noise = X.copy()
n_samples = X.shape[0]
n_features = X.shape[1]
for i in range(n_samples):
mask = np.random.randint(0, n_features, v)
for m in mask:
X_noise[i][m] = 0.
return X_noise
def salt_and_pepper_noise(X, v):
""" Apply salt and pepper noise to data in X, in other words a fraction v of elements of X
(chosen at random) is set to its maximum or minimum value according to a fair coin flip.
If minimum or maximum are not given, the min (max) value in X is taken.
:param X: array_like, Input data
:param v: int, fraction of elements to distort
:return: transformed data
"""
X_noise = X.copy()
n_features = X.shape[1]
mn = X.min()
mx = X.max()
for i, sample in enumerate(X):
mask = np.random.randint(0, n_features, v)
for m in mask:
if np.random.random() < 0.5:
X_noise[i][m] = mn
else:
X_noise[i][m] = mx
return X_noise
def gen_image(img, width, height, outfile, img_type='grey'):
assert len(img) == width * height or len(img) == width * height * 3
if img_type == 'grey':
misc.imsave(outfile, img.reshape(width, height))
elif img_type == 'color':
misc.imsave(outfile, img.reshape(3, width, height))
models_dir = 'models/' # dir to save/restore models
data_dir = 'data/' # directory to store algorithm data
summary_dir = 'logs/' # directory to store tensorflow summaries
@narmadhas19
Copy link

Traceback (most recent call last):
File "C:\Users\Narmadha\AppData\Local\Programs\Python\Python35\run_autoencoder.py", line 9, in
import datasets
ImportError: No module named 'datasets'
what is the solution for this error? pls help

@narmadhas19
Copy link

how can we use denoise autoencoder for text file

@alvinwan
Copy link

alvinwan commented Apr 10, 2018

I believe the datasets import comes from this script (credits: blackecho) https://github.com/blackecho/Deep-Learning-TensorFlow/blob/ddeb1f2848da7b7bee166ad2152b4afc46bb2086/yadlt/utils/datasets.py.

Another caveat (as of time of writing 4/10/18), the script uses old tensorflow syntax from r0.12 (https://stackoverflow.com/a/41066345/4855984)

@2017develper
Copy link

i have this error i don't know how i can solve it
WARNING:tensorflow:From C:\Users\USER\Desktop\DAAE\autoencoder.py:108 in _initialize_tf_utilities_and_ops.: initialize_all_variables (from tensorflow.python.ops.variables) is deprecated and will be removed after 2017-03-02.
Instructions for updating:
Use tf.global_variables_initializer instead.
WARNING:tensorflow:Passing a GraphDef to the SummaryWriter is deprecated. Pass a Graph object instead, such as sess.graph.
Traceback (most recent call last):
File "run_autoencoder.py", line 82, in
dae.fit(trX, teX, restore_previous_model=FLAGS.restore_previous_model)
File "C:\Users\USER\Desktop\DAAE\autoencoder.py", line 98, in fit
self._train_model(train_set, validation_set)
File "C:\Users\USER\Desktop\DAAE\autoencoder.py", line 132, in _train_model
self._run_train_step(train_set, corruption_ratio)
File "C:\Users\USER\Desktop\DAAE\autoencoder.py", line 151, in _run_train_step
np.random.shuffle(shuff)
File "mtrand.pyx", line 4816, in mtrand.RandomState.shuffle
TypeError: object of type 'zip' has no len()

@wklock
Copy link

wklock commented May 10, 2018

You need to wrap the zip(train_set, x_corrupted) in _run_train_step in a list. The line will become shuff = list(zip(train_set, x_corrupted)).

@louislawhk01
Copy link

Great workl!
But the mse loss implementation In the _create_cost_function_node of autoencoder.py seems to be wrong?
Should it be self.cost = tf.reduce_mean(tf.reduce_sum(tf.square(self.input_data - self.decode),1)) ?

We should do reduce sum before reduce_mean and no need for square root I suppose?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment