Created
December 31, 2017 09:13
-
-
Save giuseppebonaccorso/ae4b38ff88d21898dca421359a7bce36 to your computer and use it in GitHub Desktop.
Stories with Convolutional Hetero-Encoders
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import matplotlib.pyplot as plt | |
import multiprocessing | |
import numpy as np | |
import tensorflow as tf | |
from keras.datasets import cifar10 | |
# Set random seed (for reproducibility) | |
np.random.seed(1000) | |
tf.set_random_seed(1000) | |
width = 32 | |
height = 32 | |
batch_size = 10 | |
nb_epochs = 500 | |
code_length = 1024 | |
use_gpu = True | |
# Load the dataset | |
(X_train, Y_train), (X_test, Y_test) = cifar10.load_data() | |
# Select 50 samples | |
X_source = X_train[0:50] | |
X_dest = X_source.copy() | |
np.random.shuffle(X_dest) | |
def encoder(encoder_input): | |
# Convolutional layer 1 | |
conv1 = tf.layers.conv2d(inputs=encoder_input, | |
filters=32, | |
kernel_size=(3, 3), | |
kernel_initializer=tf.contrib.layers.xavier_initializer(), | |
activation=tf.nn.tanh) | |
# Convolutional output (flattened) | |
conv_output = tf.contrib.layers.flatten(conv1) | |
# Encoder Dense layer 1 | |
d_layer_1 = tf.layers.dense(inputs=conv_output, | |
units=1024, | |
activation=tf.nn.tanh) | |
# Code layer | |
code_layer = tf.layers.dense(inputs=d_layer_1, | |
units=code_length, | |
activation=tf.nn.tanh) | |
return code_layer | |
def decoder(code_sequence, bs): | |
# Decoder Dense layer 1 | |
d_layer_1 = tf.layers.dense(inputs=code_sequence, | |
units=1024, | |
activation=tf.nn.tanh) | |
# Code output layer | |
code_output = tf.layers.dense(inputs=d_layer_1, | |
units=(height - 2) * (width - 2) * 3, | |
activation=tf.nn.tanh) | |
# Deconvolution input | |
deconv_input = tf.reshape(code_output, (bs, height - 2, width - 2, 3)) | |
# Deconvolution layer 1 | |
deconv1 = tf.layers.conv2d_transpose(inputs=deconv_input, | |
filters=3, | |
kernel_size=(3, 3), | |
kernel_initializer=tf.contrib.layers.xavier_initializer(), | |
activation=tf.sigmoid) | |
# Output batch | |
output_batch = tf.cast(tf.reshape(deconv1, (bs, height, width, 3)) * 255.0, tf.uint8) | |
return deconv1, output_batch | |
def create_batch(t): | |
X = np.zeros((batch_size, height, width, 3), dtype=np.float32) | |
Y = np.zeros((batch_size, height, width, 3), dtype=np.float32) | |
if t < X_source.shape[0] - batch_size: | |
tmax = t + batch_size | |
else: | |
tmax = X_source.shape[0] | |
for k, image in enumerate(X_source[t:tmax]): | |
X[k, :, :, :] = image / 255.0 | |
for k, image in enumerate(X_dest[t:tmax]): | |
Y[k, :, :, :] = image / 255.0 | |
return X, Y | |
# Create a Tensorflow Graph | |
graph = tf.Graph() | |
with graph.as_default(): | |
with tf.device('/cpu:0'): | |
# Global step | |
global_step = tf.Variable(0, trainable=False) | |
with tf.device('/gpu:0' if use_gpu else '/cpu:0'): | |
# Input batch | |
input_images = tf.placeholder(tf.float32, shape=(None, height, width, 3)) | |
# Output batch | |
output_images = tf.placeholder(tf.float32, shape=(None, height, width, 3)) | |
# Batch_size | |
t_batch_size = tf.placeholder(tf.int32, shape=()) | |
# Encoder | |
code_layer = encoder(encoder_input=input_images) | |
# Decoder | |
deconv_output, output_batch = decoder(code_sequence=code_layer, | |
bs=t_batch_size) | |
# Reconstruction L2 loss | |
loss = tf.nn.l2_loss(output_images - deconv_output) | |
# Training operations | |
learning_rate = tf.train.exponential_decay(learning_rate=0.00025, | |
global_step=global_step, | |
decay_steps=int(X_source.shape[0] / (2 * batch_size)), | |
decay_rate=0.9, | |
staircase=True) | |
trainer = tf.train.AdamOptimizer(learning_rate=learning_rate) | |
training_step = trainer.minimize(loss) | |
def predict(X, bs=1): | |
feed_dict = { | |
input_images: X.reshape((1, height, width, 3)) / 255.0, | |
output_images: np.zeros((bs, height, width, 3), dtype=np.float32), | |
t_batch_size: bs | |
} | |
return session.run([output_batch], feed_dict=feed_dict)[0] | |
def story(t): | |
oimages = np.zeros(shape=(20, height, width, 3), dtype=np.uint8) | |
oimages[0, :, :, :] = X_source[t] | |
for i in range(1, 20): | |
oimages[i, :, :, :] = predict(oimages[i - 1]) | |
fig, ax = plt.subplots(2, 10, figsize=(18, 4)) | |
for i in range(2): | |
for j in range(10): | |
ax[i, j].get_xaxis().set_visible(False) | |
ax[i, j].get_yaxis().set_visible(False) | |
ax[i, j].imshow(oimages[(10 * i) + j]) | |
plt.show() | |
if __name__ == '__main__': | |
# Create a Tensorflow Session | |
config = tf.ConfigProto(intra_op_parallelism_threads=multiprocessing.cpu_count(), | |
inter_op_parallelism_threads=multiprocessing.cpu_count(), | |
allow_soft_placement=True, | |
device_count={'CPU': 1, | |
'GPU': 1 if use_gpu else 0}) | |
session = tf.InteractiveSession(graph=graph, config=config) | |
# Initialize all variables | |
tf.global_variables_initializer().run() | |
# Train the model | |
for e in range(nb_epochs): | |
total_loss = 0.0 | |
for t in range(0, X_source.shape[0], batch_size): | |
X, Y = create_batch(t) | |
feed_dict = { | |
input_images: X, | |
output_images: Y, | |
t_batch_size: batch_size | |
} | |
_, t_loss = session.run([training_step, loss], feed_dict=feed_dict) | |
total_loss += t_loss | |
print('Epoch {} - Loss: {}'. | |
format(e + 1, | |
total_loss / float(X_train.shape[0]))) | |
# Show some stories | |
story(0) | |
# story(1) | |
# story(9) | |
# ... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment