Forked from gabrieleangeletti/rbm_after_refactor.py
Created
February 14, 2017 11:25
-
-
Save brainstormot/9ced987837700eec8c45e2f707b3c0e8 to your computer and use it in GitHub Desktop.
Restricted Boltzmann Machine implementation in TensorFlow, before and after code refactoring. Blog post: http://blackecho.github.io/blog/programming/2016/02/21/refactoring-rbm-tensor-flow-implementation.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow as tf | |
import numpy as np | |
import os | |
import zconfig | |
import utils | |
class RBM(object): | |
""" Restricted Boltzmann Machine implementation using TensorFlow. | |
The interface of the class is sklearn-like. | |
""" | |
def __init__(self, num_visible, num_hidden, visible_unit_type='bin', main_dir='rbm', model_name='rbm_model', | |
gibbs_sampling_steps=1, learning_rate=0.01, batch_size=10, num_epochs=10, stddev=0.1, verbose=0): | |
""" | |
:param num_visible: number of visible units | |
:param num_hidden: number of hidden units | |
:param visible_unit_type: type of the visible units (binary or gaussian) | |
:param main_dir: main directory to put the models, data and summary directories | |
:param model_name: name of the model, used to save data | |
:param gibbs_sampling_steps: optional, default 1 | |
:param learning_rate: optional, default 0.01 | |
:param batch_size: optional, default 10 | |
:param num_epochs: optional, default 10 | |
:param stddev: optional, default 0.1. Ignored if visible_unit_type is not 'gauss' | |
:param verbose: level of verbosity. optional, default 0 | |
""" | |
self.num_visible = num_visible | |
self.num_hidden = num_hidden | |
self.visible_unit_type = visible_unit_type | |
self.main_dir = main_dir | |
self.model_name = model_name | |
self.gibbs_sampling_steps = gibbs_sampling_steps | |
self.learning_rate = learning_rate | |
self.batch_size = batch_size | |
self.num_epochs = num_epochs | |
self.stddev = stddev | |
self.verbose = verbose | |
self.models_dir, self.data_dir, self.summary_dir = self._create_data_directories() | |
self.model_path = self.models_dir + self.model_name | |
self.W = None | |
self.bh_ = None | |
self.bv_ = None | |
self.w_upd8 = None | |
self.bh_upd8 = None | |
self.bv_upd8 = None | |
self.encode = None | |
self.loss_function = None | |
self.input_data = None | |
self.hrand = None | |
self.vrand = None | |
self.validation_size = None | |
self.tf_merged_summaries = None | |
self.tf_summary_writer = None | |
self.tf_session = None | |
self.tf_saver = None | |
def fit(self, train_set, validation_set=None, restore_previous_model=False): | |
""" Fit the model to the training data. | |
:param train_set: training set | |
:param validation_set: validation set. optional, default None | |
:param restore_previous_model: | |
if true, a previous trained model | |
with the same name of this model is restored from disk to continue training. | |
:return: self | |
""" | |
if validation_set is not None: | |
self.validation_size = validation_set.shape[0] | |
self._build_model() | |
with tf.Session() as self.tf_session: | |
self._initialize_tf_utilities_and_ops(restore_previous_model) | |
self._train_model(train_set, validation_set) | |
self.tf_saver.save(self.tf_session, self.model_path) | |
def _initialize_tf_utilities_and_ops(self, restore_previous_model): | |
""" Initialize TensorFlow operations: summaries, init operations, saver, summary_writer. | |
Restore a previously trained model if the flag restore_previous_model is true. | |
""" | |
self.tf_merged_summaries = tf.merge_all_summaries() | |
init_op = tf.initialize_all_variables() | |
self.tf_saver = tf.train.Saver() | |
self.tf_session.run(init_op) | |
if restore_previous_model: | |
self.tf_saver.restore(self.tf_session, self.model_path) | |
self.tf_summary_writer = tf.train.SummaryWriter(self.summary_dir, self.tf_session.graph_def) | |
def _train_model(self, train_set, validation_set): | |
""" Train the model. | |
:param train_set: training set | |
:param validation_set: validation set. optional, default None | |
:return: self | |
""" | |
for i in range(self.num_epochs): | |
self._run_train_step(train_set) | |
if validation_set is not None: | |
self._run_validation_error_and_summaries(i, validation_set) | |
def _run_train_step(self, train_set): | |
""" Run a training step. A training step is made by randomly shuffling the training set, | |
divide into batches and run the variable update nodes for each batch. | |
:param train_set: training set | |
:return: self | |
""" | |
np.random.shuffle(train_set) | |
batches = [_ for _ in utils.gen_batches(train_set, self.batch_size)] | |
updates = [self.w_upd8, self.bh_upd8, self.bv_upd8] | |
for batch in batches: | |
self.tf_session.run(updates, feed_dict=self._create_feed_dict(batch)) | |
def _run_validation_error_and_summaries(self, epoch, validation_set): | |
""" Run the summaries and error computation on the validation set. | |
:param epoch: current epoch | |
:param validation_set: validation data | |
:return: self | |
""" | |
result = self.tf_session.run([self.tf_merged_summaries, self.loss_function], | |
feed_dict=self._create_feed_dict(validation_set)) | |
summary_str = result[0] | |
err = result[1] | |
self.tf_summary_writer.add_summary(summary_str, 1) | |
if self.verbose == 1: | |
print("Validation cost at step %s: %s" % (epoch, err)) | |
def _create_feed_dict(self, data): | |
""" Create the dictionary of data to feed to TensorFlow's session during training. | |
:param data: training/validation set batch | |
:return: dictionary(self.input_data: data, self.hrand: random_uniform, self.vrand: random_uniform) | |
""" | |
return { | |
self.input_data: data, | |
self.hrand: np.random.rand(data.shape[0], self.num_hidden), | |
self.vrand: np.random.rand(data.shape[0], self.num_visible) | |
} | |
def _build_model(self): | |
""" Build the Restricted Boltzmann Machine model in TensorFlow. | |
:return: self | |
""" | |
self.input_data, self.hrand, self.vrand = self._create_placeholders() | |
self.W, self.bh_, self.bv_ = self._create_variables() | |
hprobs0, hstates0, vprobs, hprobs1, hstates1 = self.gibbs_sampling_step(self.input_data) | |
positive = self.compute_positive_association(self.input_data, hprobs0, hstates0) | |
nn_input = vprobs | |
for step in range(self.gibbs_sampling_steps - 1): | |
hprobs, hstates, vprobs, hprobs1, hstates1 = self.gibbs_sampling_step(nn_input) | |
nn_input = vprobs | |
negative = tf.matmul(tf.transpose(vprobs), hprobs1) | |
self.encode = hprobs1 # encoded data, used by the transform method | |
self.w_upd8 = self.W.assign_add(self.learning_rate * (positive - negative)) | |
self.bh_upd8 = self.bh_.assign_add(self.learning_rate * tf.reduce_mean(hprobs0 - hprobs1, 0)) | |
self.bv_upd8 = self.bv_.assign_add(self.learning_rate * tf.reduce_mean(self.input_data - vprobs, 0)) | |
self.loss_function = tf.sqrt(tf.reduce_mean(tf.square(self.input_data - vprobs))) | |
_ = tf.scalar_summary("cost", self.loss_function) | |
def _create_placeholders(self): | |
""" Create the TensorFlow placeholders for the model. | |
:return: tuple(input(shape(None, num_visible)), | |
hrand(shape(None, num_hidden)) | |
vrand(shape(None, num_visible))) | |
""" | |
x = tf.placeholder('float', [None, self.num_visible], name='x-input') | |
hrand = tf.placeholder('float', [None, self.num_hidden], name='hrand') | |
vrand = tf.placeholder('float', [None, self.num_visible], name='vrand') | |
return x, hrand, vrand | |
def _create_variables(self): | |
""" Create the TensorFlow variables for the model. | |
:return: tuple(weights(shape(num_visible, num_hidden), | |
hidden bias(shape(num_hidden)), | |
visible bias(shape(num_visible))) | |
""" | |
W = tf.Variable(tf.random_normal((self.num_visible, self.num_hidden), mean=0.0, stddev=0.01), name='weights') | |
bh_ = tf.Variable(tf.zeros([self.num_hidden]), name='hidden-bias') | |
bv_ = tf.Variable(tf.zeros([self.num_visible]), name='visible-bias') | |
return W, bh_, bv_ | |
def gibbs_sampling_step(self, visible): | |
""" Performs one step of gibbs sampling. | |
:param visible: activations of the visible units | |
:return: tuple(hidden probs, hidden states, visible probs, | |
new hidden probs, new hidden states) | |
""" | |
hprobs, hstates = self.sample_hidden_from_visible(visible) | |
vprobs = self.sample_visible_from_hidden(hprobs) | |
hprobs1, hstates1 = self.sample_hidden_from_visible(vprobs) | |
return hprobs, hstates, vprobs, hprobs1, hstates1 | |
def sample_hidden_from_visible(self, visible): | |
""" Sample the hidden units from the visible units. | |
This is the Positive phase of the Contrastive Divergence algorithm. | |
:param visible: activations of the visible units | |
:return: tuple(hidden probabilities, hidden binary states) | |
""" | |
hprobs = tf.nn.sigmoid(tf.matmul(visible, self.W) + self.bh_) | |
hstates = utils.sample_prob(hprobs, self.hrand) | |
return hprobs, hstates | |
def sample_visible_from_hidden(self, hidden): | |
""" Sample the visible units from the hidden units. | |
This is the Negative phase of the Contrastive Divergence algorithm. | |
:param hidden: activations of the hidden units | |
:return: visible probabilities | |
""" | |
visible_activation = tf.matmul(hidden, tf.transpose(self.W)) + self.bv_ | |
if self.visible_unit_type == 'bin': | |
vprobs = tf.nn.sigmoid(visible_activation) | |
elif self.visible_unit_type == 'gauss': | |
vprobs = tf.truncated_normal((1, self.num_visible), mean=visible_activation, stddev=self.stddev) | |
else: | |
vprobs = None | |
return vprobs | |
def compute_positive_association(self, visible, hidden_probs, hidden_states): | |
""" Compute positive associations between visible and hidden units. | |
:param visible: visible units | |
:param hidden_probs: hidden units probabilities | |
:param hidden_states: hidden units states | |
:return: positive association = dot(visible.T, hidden) | |
""" | |
if self.visible_unit_type == 'bin': | |
positive = tf.matmul(tf.transpose(visible), hidden_states) | |
elif self.visible_unit_type == 'gauss': | |
positive = tf.matmul(tf.transpose(visible), hidden_probs) | |
else: | |
positive = None | |
return positive | |
def _create_data_directories(self): | |
""" Create the three directories for storing respectively the models, | |
the data generated by training and the TensorFlow's summaries. | |
:return: tuple of strings(models_dir, data_dir, summary_dir) | |
""" | |
self.main_dir = self.main_dir + '/' if self.main_dir[-1] != '/' else self.main_dir | |
models_dir = config.models_dir + self.main_dir | |
data_dir = config.data_dir + self.main_dir | |
summary_dir = config.summary_dir + self.main_dir | |
for d in [models_dir, data_dir, summary_dir]: | |
if not os.path.isdir(d): | |
os.mkdir(d) | |
return models_dir, data_dir, summary_dir | |
def transform(self, data, name='train', save=False): | |
""" Transform data according to the model. | |
:type data: array_like | |
:param data: Data to transform | |
:type name: string, default 'train' | |
:param name: Identifier for the data that is being encoded | |
:type save: boolean, default 'False' | |
:param save: If true, save data to disk | |
:return: transformed data | |
""" | |
with tf.Session() as self.tf_session: | |
self.tf_saver.restore(self.tf_session, self.model_path) | |
encoded_data = self.encode.eval(self._create_feed_dict(data)) | |
if save: | |
np.save(self.data_dir + self.model_name + '-' + name, encoded_data) | |
return encoded_data | |
def load_model(self, shape, gibbs_sampling_steps, model_path): | |
""" Load a trained model from disk. The shape of the model | |
(num_visible, num_hidden) and the number of gibbs sampling steps | |
must be known in order to restore the model. | |
:param shape: tuple(num_visible, num_hidden) | |
:param gibbs_sampling_steps: | |
:param model_path: | |
:return: self | |
""" | |
self.num_visible, self.num_hidden = shape[0], shape[1] | |
self.gibbs_sampling_steps = gibbs_sampling_steps | |
self._build_model() | |
init_op = tf.initialize_all_variables() | |
self.tf_saver = tf.train.Saver() | |
with tf.Session() as self.tf_session: | |
self.tf_session.run(init_op) | |
self.tf_saver.restore(self.tf_session, model_path) | |
def get_model_parameters(self): | |
""" Return the model parameters in the form of numpy arrays. | |
:return: model parameters | |
""" | |
with tf.Session() as self.tf_session: | |
self.tf_saver.restore(self.tf_session, self.model_path) | |
return { | |
'W': self.W.eval(), | |
'bh_': self.bh_.eval(), | |
'bv_': self.bv_.eval() | |
} | |
def get_weights_as_images(self, width, height, outdir='img/', n_images=10, img_type='grey'): | |
""" Create and save the weights of the hidden units with respect to the | |
visible units as images. | |
:param width: | |
:param height: | |
:param outdir: | |
:param n_images: | |
:param img_type: | |
:return: self | |
""" | |
outdir = self.data_dir + outdir | |
with tf.Session() as self.tf_session: | |
self.tf_saver.restore(self.tf_session, self.model_path) | |
weights = self.W.eval() | |
perm = np.random.permutation(self.num_hidden)[:n_images] | |
for p in perm: | |
w = np.array([i[p] for i in weights]) | |
image_path = outdir + self.model_name + '_{}.png'.format(p) | |
utils.gen_image(w, width, height, image_path, img_type) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tensorflow.python.framework import ops | |
import tensorflow as tf | |
import numpy as np | |
import os | |
import zconfig | |
import utils | |
class RBM(object): | |
""" Restricted Boltzmann Machine implementation using TensorFlow. | |
The interface of the class is sklearn-like. | |
""" | |
def __init__(self, nvis, nhid, vis_type='bin', directory_name='rbm', model_name='', gibbs_k=1, learning_rate=0.01, | |
batch_size=10, n_iter=10, stddev=0.1, verbose=0): | |
self.nvis = nvis | |
self.nhid = nhid | |
self.vis_type = vis_type | |
self.directory_name = directory_name | |
self.model_name = model_name | |
self.gibbs_k = gibbs_k | |
self.learning_rate = learning_rate | |
self.batch_size = batch_size | |
self.n_iter = n_iter | |
self.stddev = stddev | |
self.verbose = verbose | |
# Directories paths | |
self.directory_name = self.directory_name + '/' if self.directory_name[-1] != '/' else self.directory_name | |
self.models_dir = config.models_dir + self.directory_name | |
self.data_dir = config.data_dir + self.directory_name | |
self.summary_dir = config.summary_dir + self.directory_name | |
# Create dirs | |
for d in [self.models_dir, self.data_dir, self.summary_dir]: | |
if not os.path.isdir(d): | |
os.mkdir(d) | |
if self.model_name == '': | |
# Assign model complete name | |
self.model_name = 'rbm-{}-{}-{}-{}-{}-{}'.format( | |
self.nvis, self.nhid, self.n_iter, self.batch_size, self.learning_rate, self.batch_size) | |
# ############################# # | |
# Computational graph nodes # | |
# ############################# # | |
# Model parameters | |
self.W = None | |
self.bh_ = None | |
self.bv_ = None | |
self.w_upd8 = None | |
self.bh_upd8 = None | |
self.bv_upd8 = None | |
self.encode = None | |
self.cost = None | |
self.hrand = None | |
self.vrand = None | |
self.validation_size = None | |
self.sess = None | |
self.saver = None | |
def _create_graph(self): | |
# Symbolic variables | |
self.x = tf.placeholder('float', [None, self.nvis], name='x-input') | |
self.hrand = tf.placeholder('float', [None, self.nhid], name='hrand') | |
self.vrand = tf.placeholder('float', [None, self.nvis], name='vrand-train') | |
# Biases | |
self.bh_ = tf.Variable(tf.zeros([self.nhid]), name='hidden-bias') | |
self.bv_ = tf.Variable(tf.zeros([self.nvis]), name='visible-bias') | |
self.W = tf.Variable(tf.random_normal((self.nvis, self.nhid), mean=0.0, stddev=0.01), name='weights') | |
nn_input = self.x | |
# Initialization | |
hprobs0 = None | |
hprobs = None | |
positive = None | |
vprobs = None | |
hprobs1 = None | |
hstates1 = None | |
for step in range(self.gibbs_k): | |
# Positive Contrastive Divergence phase | |
hprobs = tf.nn.sigmoid(tf.matmul(nn_input, self.W) + self.bh_) | |
hstates = utils.sample_prob(hprobs, self.hrand) | |
# Compute positive associations in step 0 | |
if step == 0: | |
hprobs0 = hprobs # save the activation probabilities of the first step | |
if self.vis_type == 'bin': | |
positive = tf.matmul(tf.transpose(nn_input), hstates) | |
elif self.vis_type == 'gauss': | |
positive = tf.matmul(tf.transpose(nn_input), hprobs) | |
# Negative Contrastive Divergence phase | |
visible_activation = tf.matmul(hprobs, tf.transpose(self.W)) + self.bv_ | |
if self.vis_type == 'bin': | |
vprobs = tf.nn.sigmoid(visible_activation) | |
elif self.vis_type == 'gauss': | |
vprobs = tf.truncated_normal((1, self.nvis), mean=visible_activation, stddev=self.stddev) | |
# Sample again from the hidden units | |
hprobs1 = tf.nn.sigmoid(tf.matmul(vprobs, self.W) + self.bh_) | |
hstates1 = utils.sample_prob(hprobs1, self.hrand) | |
# Use the reconstructed visible units as input for the next step | |
nn_input = vprobs | |
negative = tf.matmul(tf.transpose(vprobs), hprobs1) | |
self.encode = hprobs # encoded data | |
self.w_upd8 = self.W.assign_add(self.learning_rate * (positive - negative)) | |
self.bh_upd8 = self.bh_.assign_add(self.learning_rate * tf.reduce_mean(hprobs0 - hprobs1, 0)) | |
self.bv_upd8 = self.bv_.assign_add(self.learning_rate * tf.reduce_mean(self.x - vprobs, 0)) | |
self.cost = tf.sqrt(tf.reduce_mean(tf.square(self.x - vprobs))) | |
_ = tf.scalar_summary("cost", self.cost) | |
def fit(self, trX, vlX=None, restore_previous_model=False): | |
if vlX is not None: | |
self.validation_size = vlX.shape[0] | |
# Reset tensorflow's default graph | |
ops.reset_default_graph() | |
self._create_graph() | |
merged = tf.merge_all_summaries() | |
init_op = tf.initialize_all_variables() | |
self.saver = tf.train.Saver() | |
with tf.Session() as self.sess: | |
self.sess.run(init_op) | |
if restore_previous_model: | |
# Restore previous model | |
self.saver.restore(self.sess, self.models_dir + self.model_name) | |
# Change model name | |
self.model_name += '-restored{}'.format(self.n_iter) | |
# Write tensorflow summaries to summary dir | |
writer = tf.train.SummaryWriter(self.summary_dir, self.sess.graph_def) | |
for i in range(self.n_iter): | |
# Randomly shuffle the input | |
np.random.shuffle(trX) | |
batches = [_ for _ in utils.gen_batches(trX, self.batch_size)] | |
for batch in batches: | |
self.sess.run([self.w_upd8, self.bh_upd8, self.bv_upd8], | |
feed_dict={self.x: batch, | |
self.hrand: np.random.rand(batch.shape[0], self.nhid), | |
self.vrand: np.random.rand(batch.shape[0], self.nvis)}) | |
if i % 5 == 0: | |
# Record summary data | |
if vlX is not None: | |
feed = {self.x: vlX, | |
self.hrand: np.random.rand(self.validation_size, self.nhid), | |
self.vrand: np.random.rand(self.validation_size, self.nvis)} | |
result = self.sess.run([merged, self.cost], feed_dict=feed) | |
summary_str = result[0] | |
err = result[1] | |
writer.add_summary(summary_str, 1) | |
if self.verbose == 1: | |
print("Validation cost at step %s: %s" % (i, err)) | |
# Save trained model | |
self.saver.save(self.sess, self.models_dir + self.model_name) | |
def transform(self, data, name='train', gibbs_k=1, save=False, models_dir=''): | |
""" Transform data according to the model. | |
:type data: array_like | |
:param data: Data to transform | |
:type name: string, default 'train' | |
:param name: Identifier for the data that is being encoded | |
:type gibbs_k: 1 | |
:param gibbs_k: Gibbs sampling steps | |
:type save: boolean, default 'False' | |
:param save: If true, save data to disk | |
:return: transformed data | |
""" | |
with tf.Session() as self.sess: | |
# Restore trained model | |
self.saver.restore(self.sess, self.models_dir + self.model_name) | |
# Return the output of the encoding layer | |
encoded_data = self.encode.eval({self.x: data, | |
self.hrand: np.random.rand(data.shape[0], self.nhid), | |
self.vrand: np.random.rand(data.shape[0], self.nvis)}) | |
if save: | |
# Save transformation to output file | |
np.save(self.data_dir + self.model_name + '-' + name, encoded_data) | |
return encoded_data | |
def load_model(self, shape, gibbs_k, model_path): | |
""" | |
:param shape: tuple(nvis, nhid) | |
:param model_path: | |
:return: | |
""" | |
self.nvis, self.nhid = shape[0], shape[1] | |
self.gibbs_k = gibbs_k | |
self._create_graph() | |
# Initialize variables | |
init_op = tf.initialize_all_variables() | |
# Add ops to save and restore all the variables | |
self.saver = tf.train.Saver() | |
with tf.Session() as self.sess: | |
self.sess.run(init_op) | |
# Restore previous model | |
self.saver.restore(self.sess, model_path) | |
def get_model_parameters(self): | |
""" Return the model parameters in the form of numpy arrays. | |
:return: model parameters | |
""" | |
with tf.Session() as self.sess: | |
# Restore trained model | |
self.saver.restore(self.sess, self.models_dir + self.model_name) | |
return { | |
'W': self.W.eval(), | |
'bh_': self.bh_.eval(), | |
'bv_': self.bv_.eval() | |
} | |
def get_weights_as_images(self, width, height, outdir='img/', n_images=10, img_type='grey'): | |
outdir = self.data_dir + outdir | |
with tf.Session() as self.sess: | |
self.saver.restore(self.sess, self.models_dir + self.model_name) | |
weights = self.W.eval() | |
perm = np.random.permutation(self.nhid)[:n_images] | |
for p in perm: | |
w = np.array([i[p] for i in weights]) | |
image_path = outdir + self.model_name + '_{}.png'.format(p) | |
utils.gen_image(w, width, height, image_path, img_type) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from scipy import misc | |
import tensorflow as tf | |
import numpy as np | |
def sample_prob(probs, rand): | |
""" Takes a tensor of probabilities (as from a sigmoidal activation) | |
and samples from all the distributions | |
:param probs: tensor of probabilities | |
:param rand: tensor (of the same shape as probs) of random values | |
:return : binary sample of probabilities | |
""" | |
return tf.nn.relu(tf.sign(probs - rand)) | |
def gen_batches(data, batch_size): | |
""" Divide input data into batches. | |
:param data: input data | |
:param batch_size: size of each batch | |
:return: data divided into batches | |
""" | |
data = np.array(data) | |
for i in range(0, data.shape[0], batch_size): | |
yield data[i:i+batch_size] | |
def gen_image(img, width, height, outfile, img_type='grey'): | |
assert len(img) == width * height or len(img) == width * height * 3 | |
if img_type == 'grey': | |
misc.imsave(outfile, img.reshape(width, height)) | |
elif img_type == 'color': | |
misc.imsave(outfile, img.reshape(3, width, height)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
models_dir = 'models/' # dir to save/restore models | |
data_dir = 'data/' # directory to store algorithm data | |
summary_dir = 'logs/' # directory to store tensorflow summaries |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment