Skip to content

Instantly share code, notes, and snippets.

@mlzxy
Created November 25, 2016 19:25
Show Gist options
  • Save mlzxy/98333c36497dd16c65c482f55f6a8ce4 to your computer and use it in GitHub Desktop.
Save mlzxy/98333c36497dd16c65c482f55f6a8ce4 to your computer and use it in GitHub Desktop.
my tensorflow wrapper for neural network dev
import tensorflow as tf
from util import model_save_temp_folder__, join, isfile, datetime, list_prod, to_list, mkdir, tqdm
from tensorflow.python.ops import seq2seq as s2s
from tensorflow.contrib import layers
import numpy as np
import random
PADDING_SAME = 'SAME'
PADDING_VALID = 'VALID'
tf_sigmoid = tf.nn.sigmoid
tf_tanh = tf.nn.tanh
tf_elu = tf.nn.elu
tf_relu = tf.nn.relu
def tf_leaky_relu(leak):
return lambda x: tf.maximum(x, leak*x)
def tf_dense(input_, output_size, stddev=0.02, bias_start=0.0, reuse=False, name=None,
activation=None, bn=False):
shape = input_.get_shape().as_list()
scope = name
with tf.variable_scope(scope or "Dense", reuse=reuse):
matrix = tf.get_variable("matrix", [shape[1], output_size], tf.float32,
tf.random_normal_initializer(stddev=stddev))
bias = tf.get_variable("bias", [output_size],
initializer=tf.constant_initializer(bias_start, dtype=tf.float32))
result = tf.matmul(input_, matrix) + bias
if bn:
result = tf_batch_norm(result, mode='1d')
if activation is not None:
result = activation(result)
return result
dense = tf_dense
def tf_seq2seq(encoder_inputs, decoder_inputs, loop_function=None, cell=None, name="seq2seq",
use_loop_function=False):
"""
:param encoder_inputs: A list of 2D Tensors [batch_size x input_size]
:param decoder_inputs: A list of 2D Tensors [batch_size x input_size]
:param loop_function:
:param cell:
:param use_loop_function:
:param scope:
:return:
"""
scope = name
if loop_function is None or use_loop_function is False:
return s2s.basic_rnn_seq2seq(encoder_inputs, decoder_inputs, cell, scope=scope)
else:
return s2s.tied_rnn_seq2seq(encoder_inputs, decoder_inputs, cell,
loop_function=loop_function, scope=scope)
def tf_input(shape, name, dtype=tf.float32, scope=None):
if scope is None:
return tf.placeholder(dtype, shape=shape, name=name)
else:
with tf.variable_scope(scope):
return tf.placeholder(dtype, shape=shape, name=name)
def tf_var(shape, name, dtype=tf.float32, scope=None,
reuse=False,
regularizer=None,
initializer=tf.random_normal_initializer(0.001)):
if scope is None:
return tf.get_variable(name=name, shape=shape, dtype=dtype,
initializer=initializer, regularizer=regularizer)
else:
with tf.variable_scope(scope, reuse=reuse):
return tf.get_variable(name=name, shape=shape, dtype=dtype,
initializer=initializer, regularizer=regularizer)
def tf_basic_vae(input_dim=100, encoder=None, decoder=None, z_dim=100,
optimizer=tf.train.AdamOptimizer(0.005),
reuse=False, name="tf_basic_vae"):
"""
TODO
:return:
"""
scope = name
def n(message):
return "{0}_{1}".format(scope, message)
if isinstance(input_dim, int):
input_dim = (input_dim, )
if isinstance(input_dim, list):
input_dim = tuple(input_dim)
with tf.variable_scope(scope, reuse=reuse):
input_var = tf_input([None] + to_list(input_dim), n('input_var'))
batch_size = input_var.get_shape()[0]
epsilon = tf_input((batch_size, z_dim), n('epsilon'))
with tf.variable_scope("encoder", reuse=False):
mu, sigma = encoder(input_var)
z = mu + tf.mul(tf.exp(0.5 * sigma), epsilon)
with tf.variable_scope("decoder", reuse=False):
recon_input_var, recon_input_var_activated = decoder(z)
with tf.variable_scope("decoder", reuse=True):
gen_input_var, gen_input_var_activated = decoder(epsilon)
kld = -0.5 * tf.reduce_sum(1 + sigma - tf.pow(mu, 2) - tf.exp(sigma), reduction_indices=1)
bce = tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(
tf_flatten_for_dense(recon_input_var), tf_flatten_for_dense(input_var)),
reduction_indices=1)
loss = tf.reduce_mean(kld + bce)
_ = tf.scalar_summary(n("loss"), loss)
summary_op = tf.merge_all_summaries()
train_op = optimizer.minimize(loss)
return {
'op': [train_op, summary_op],
'loss': loss,
'output': [recon_input_var, recon_input_var_activated,
gen_input_var, gen_input_var_activated],
'input': [input_var, epsilon],
'kld': kld,
'bce': bce,
'mu': mu,
'sigma': sigma,
'epsilon': epsilon,
'input_var': input_var,
'generation': gen_input_var_activated,
'train': train_op
}
class VaeModel:
def __init__(self, **kwargs):
m = tf_basic_vae(**kwargs)
self.train_op = m['op'][0]
self.generation_output = m['generation']
self.input = m['input_var']
self.mu = m['mu']
self.sigma = m['sigma']
self.epsilon = m['epsilon']
self.loss = m['loss']
self.summary_op = m['op'][1]
def tf_load_model(session, saver, model_name, log=print, save_dir=model_save_temp_folder__):
model_dir = join(save_dir, model_name)
mkdir(model_dir)
model_path = join(model_dir, 'model.ckpt')
ckpt_path = model_path
if isfile(ckpt_path):
log("Restoring saved parameters")
saver.restore(session, ckpt_path)
else:
log("Initializing parameters")
session.run(tf.initialize_all_variables())
def tf_save_model(session, saver, model_name, save_dir=model_save_temp_folder__, log=print):
log("Saving model {0}".format(model_name))
model_dir = join(save_dir, model_name)
mkdir(model_dir)
model_path = join(model_dir, 'model.ckpt')
saver.save(session, model_path)
log("finish!")
class ModelSaver:
def __init__(self, session, model_name, log=print, save_dir=model_save_temp_folder__):
self.saver = tf.train.Saver()
self.model_name = model_name
self.session = session
self.save_dir = save_dir
self.log = log
def load(self):
tf_load_model(self.session, self.saver, self.model_name, log=self.log, save_dir=self.save_dir)
def save(self):
tf_save_model(self.session, self.saver, self.model_name, log=self.log, save_dir=self.save_dir)
def tf_need_test(step, freq):
r = step % freq
return r == 0
def tf_time():
return datetime.now()
def tf_log(i, loss, n_iter=1000, start_time=None, log=print, message=""):
now = tf_time()
run_time = now - start_time if start_time is not None else None
log(message + "{0}/{1} iterations, loss = {2}, cost time = {3}".format(i, n_iter, loss, run_time))
def tf_dense_with_gated_activation(input_var, output_size=None, name="tf_dense_with_gated_activation",
reuse=False, stddev=1,
bias_start=0.0, after_activation=None, residual=False):
scope = name
if residual:
output_size = input_var.get_shape()[1]
else:
if output_size is None:
raise Exception("Output size has to be specified if it is not a residual layer")
with tf.variable_scope(scope, reuse=reuse):
branch_sigmoid = tf.nn.sigmoid(tf_dense(input_var, output_size,
name="branch_sigmoid", reuse=reuse, stddev=stddev,
bias_start=bias_start))
branch_tanh = tf.nn.tanh(tf_dense(input_var, output_size,
name="branch_tanh", reuse=reuse, stddev=stddev, bias_start=bias_start))
result = tf.mul(branch_tanh, branch_sigmoid, name="gate")
if residual:
result = tf_dense(result, output_size, name="residual_connection",
reuse=reuse, stddev=stddev, bias_start=bias_start)
result = tf.add(result, input_var)
if after_activation is not None:
result = after_activation(result)
else:
if after_activation is not None:
raise Exception("after_activation only works with residual blocks!")
return result
def tf_deconv2d(input_, output_shape,
k_h=5, k_w=5, d_h=1, d_w=1, stddev=0.02, bn=False,
name="deconv2d", padding='VALID', reuse=False, activation=None):
"""
shape calculation in https://gist.github.com/BenBBear/19df8ac8ff17926f2659962f8b1dc5b1
"""
with tf.variable_scope(name, reuse=reuse):
# filter : [height, width, output_channels, in_channels]
batch_size = tf_get_batch_size(input_)
output_shape = to_list(output_shape)
w = tf.get_variable('w', [k_h, k_w, output_shape[-1], input_.get_shape()[-1]],
initializer=tf.random_normal_initializer(stddev=stddev))
deconv = tf.nn.conv2d_transpose(input_, w, output_shape=[batch_size, ] + output_shape,
strides=[1, d_h, d_w, 1], padding=padding)
biases = tf.get_variable('b', [output_shape[-1]], initializer=tf.constant_initializer(0.0))
deconv = tf.nn.bias_add(deconv, biases)
deconv = tf.reshape(deconv, [batch_size, ] + output_shape)
if bn:
deconv = tf_batch_norm(deconv, mode='2d')
if activation is not None:
deconv = activation(deconv)
return deconv
def tf_conv2d(input_, output_dim, k_h=5, k_w=5, d_h=1, d_w=1, stddev=0.02,
name="conv2d", reuse=False, activation=None, padding='SAME', bn=False):
with tf.variable_scope(name, reuse=reuse):
w = tf.get_variable('w', [k_h, k_w, input_.get_shape()[-1], output_dim],
initializer=tf.truncated_normal_initializer(stddev=stddev))
conv = tf.nn.conv2d(input_, w, strides=[1, d_h, d_w, 1], padding=padding)
biases = tf.get_variable('b', [output_dim], initializer=tf.constant_initializer(0.0))
conv = tf.nn.bias_add(conv, biases)
if bn:
conv = tf_batch_norm(conv, mode='2d')
if activation is not None:
conv = activation(conv)
return conv
def tf_get_batch_size(input_var):
return tf.shape(input_var)[0]
def tf_flatten_for_dense(input_var):
shape_list = list(input_var.get_shape()[1:])
return tf.reshape(input_var, [-1, list_prod(shape_list)])
def tf_reshape_for_conv(input_var, shape):
return tf.reshape(input_var, [-1, ] + list(shape))
tf_flatten = layers.flatten
def int_shape(x):
'''Returns the shape of a tensor as a tuple of
integers or None entries.
Note that this function only works with TensorFlow.
'''
shape = x.get_shape()
return tuple([i.__int__() for i in shape])
def tf_up_sampling_2d(input_var, height_factor, width_factor):
original_shape = int_shape(input_var)
new_shape = tf.shape(input_var)[1:3]
new_shape *= tf.constant(np.array([height_factor, width_factor]).astype('int32'))
x = tf.image.resize_nearest_neighbor(input_var, new_shape)
x.set_shape((None, original_shape[1] * height_factor if original_shape[1] is not None else None,
original_shape[2] * width_factor if original_shape[2] is not None else None, None))
return x
def tf_repeat_elements(x, rep, axis):
'''Repeats the elements of a tensor along an axis, like np.repeat
If x has shape (s1, s2, s3) and axis=1, the output
will have shape (s1, s2 * rep, s3)
'''
x_shape = x.get_shape().as_list()
# slices along the repeat axis
splits = tf.split(axis, x_shape[axis], x)
# repeat each slice the given number of reps
x_rep = [s for s in splits for _ in range(rep)]
return tf.concat(axis, x_rep)
def tf_up_sampling_3d(input_var, depth_factor, height_factor, width_factor):
output = tf_repeat_elements(input_var, depth_factor, axis=1)
output = tf_repeat_elements(output, height_factor, axis=2)
output = tf_repeat_elements(output, width_factor, axis=3)
return output
def tf_conv3d(input_, output_dim, k_d=5, k_h=5, k_w=5, d_d=1, d_h=1, d_w=1, stddev=0.02,
name="conv3d", reuse=False, activation=None, padding='SAME', bn=True):
with tf.variable_scope(name, reuse=reuse):
w = tf.get_variable('w', [k_d, k_h, k_w, input_.get_shape()[-1], output_dim],
initializer=tf.truncated_normal_initializer(stddev=stddev))
conv = tf.nn.conv3d(input_, w, strides=[1, d_d, d_h, d_w, 1], padding=padding)
biases = tf.get_variable('b', [output_dim], initializer=tf.constant_initializer(0.0))
conv = tf.nn.bias_add(conv, biases)
if bn:
conv = tf_batch_norm(conv, mode='3d')
if activation is not None:
conv = activation(conv)
return conv
def tf_deconv3d(input_, output_shape,
k_d=5, k_h=5, k_w=5, d_d=1, d_h=1, d_w=1, stddev=0.02, bn=False,
name="deconv3d", padding='VALID', reuse=False, activation=None):
with tf.variable_scope(name, reuse=reuse):
# filter : [height, width, output_channels, in_channels]
batch_size = tf_get_batch_size(input_)
output_shape = to_list(output_shape)
w = tf.get_variable('w', [k_d, k_h, k_w, output_shape[-1], input_.get_shape()[-1]],
initializer=tf.random_normal_initializer(stddev=stddev))
deconv = tf.nn.conv3d_transpose(input_, w, output_shape=[batch_size, ] + output_shape,
strides=[1, d_d, d_h, d_w, 1], padding=padding)
biases = tf.get_variable('b', [output_shape[-1]], initializer=tf.constant_initializer(0.0))
deconv = tf.nn.bias_add(deconv, biases)
deconv = tf.reshape(deconv, [batch_size, ] + output_shape)
if bn:
deconv = tf_batch_norm(deconv, mode='3d')
if activation is not None:
deconv = activation(deconv)
return deconv
def tf_gan(input_dim=100, z_dim=100, d_net=None, g_net=None,
optimizer=lambda: tf.train.AdamOptimizer(0.0002, beta1=0.5),
reuse=False, name="tf_basic_gan"):
"""
second_train_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
"scope/prefix/for/second/vars")
"""
def n(message):
return "{0}_{1}".format(name, message)
input_shape = to_list(input_dim)
input_data = tf_input([None, ] + input_shape, name=n("input_images"))
# batch_size = tf_get_batch_size(input_data)
# input_label = tf_input([None, 1], name=n("input_labels"))
input_z = tf_input([None, z_dim], name=n("uniform_z"))
g_net_scope = n("g_net")
d_net_scope = n("d_net")
with tf.variable_scope(g_net_scope, reuse=False) as vs:
output_gen = g_net(input_z)
variables_gen = tf.get_collection(tf.GraphKeys.VARIABLES, scope=vs.name)
with tf.variable_scope(d_net_scope, reuse=False) as vs:
discriminator_no_sigmoid, discriminator = d_net(input_data)
variables_dis = tf.get_collection(tf.GraphKeys.VARIABLES, scope=vs.name)
with tf.variable_scope(d_net_scope, reuse=True) as vs:
gan_net_no_sigmoid, gan_net = d_net(output_gen)
variables_all = variables_gen + variables_dis
loss_dis_real = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(discriminator_no_sigmoid,
tf.ones_like(discriminator)))
loss_dis_fake = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(gan_net_no_sigmoid,
tf.zeros_like(gan_net)))
loss_dis = loss_dis_real + loss_dis_fake
loss_gen = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(gan_net_no_sigmoid,
tf.ones_like(gan_net_no_sigmoid)))
train_gen_op = optimizer().minimize(loss_gen, var_list=variables_gen)
train_dis_op = optimizer().minimize(loss_dis, var_list=variables_dis)
train_whole_op = optimizer().minimize(loss_gen, var_list=variables_all)
tf.scalar_summary(n("loss_gen"), loss_gen)
tf.scalar_summary(n("loss_dis"), loss_dis)
tf.scalar_summary(n("loss_dis_real"), loss_dis_real)
tf.scalar_summary(n("loss_dis_fake"), loss_dis_fake)
summary_op = tf.merge_all_summaries()
return {
'op': [train_dis_op, train_gen_op, train_whole_op, summary_op],
'loss': [loss_dis, loss_gen, loss_dis_fake, loss_dis_real],
'input': [input_data, input_z],
'gan_net': [gan_net_no_sigmoid, gan_net],
'generation': output_gen,
'dis_net': [discriminator_no_sigmoid, discriminator],
'variables': [variables_dis, variables_gen, variables_all]
}
class GanModel:
G = 0
D = 1
DG = 2
Message = ["Gan - Generator: ", "Gan - Discriminator: ", "Gan - Dis + Gen: "]
def __init__(self, **kwargs):
k = kwargs
m = tf_gan(input_dim=k['input_dim'], z_dim=k['z_dim'], d_net=k['d_net'], g_net=k['g_net'],
optimizer=k['optimizer'], reuse=k['reuse'], name=k['name'])
self.z_dim = kwargs['z_dim']
self.train_dis_op, self.train_gen_op, self.train_whole_op, self.summary_op = m['op']
self.loss_dis, self.loss_gen, self.loss_dis_fake, self.loss_dis_real = m['loss']
self.input_data, self.input_z = m['input']
self.gan_net_no_sigmoid, self.gan_net = m['gan_net']
self.discriminator_no_sigmoid, self.discriminator = m['dis_net']
self.generation = m['generation']
self.variables_dis, self.variables_gen, self.variables_all = m['variables']
self.session = kwargs['session']
self.next_batch = kwargs['next_batch']
self.batch_size = kwargs['batch_size']
self.input_shape = to_list(kwargs['input_dim'])
self.log = kwargs['log']
self.model_name = kwargs['name']
self.test_fun = kwargs['test_fun']
self.test_freq = kwargs['test_freq']
self.saver = ModelSaver(self.session, self.model_name, log=self.log)
self.saver.load()
def generate_data(self, batch_size=None):
if batch_size is None:
batch_size = self.batch_size
return self.session.run(self.generation, feed_dict={
self.input_z: self.random_z(batch_size)
})
def random_z(self, batch_size):
return np.random.normal(0, 1, (batch_size, self.z_dim)).astype(np.float32)
def generate_mix_sample(self):
real_images, _ = self.next_batch(self.batch_size)
real_images = real_images.reshape([real_images.shape[0], ] + self.input_shape)
generated_images = self.generate_data()
label = np.zeros([2 * self.batch_size, 1])
label[:self.batch_size, 0] = 1 # real images
return tf_concat_batch(real_images, generated_images), label
def train(self, steps=[('D', 100)] + 3*[('G', 1000, 0.5, 1), ('D', 400)], log=print, message="", save=False):
d_loss_list = []
d_loss_real_list = []
d_loss_fake_list = []
g_loss_list = []
all_loss = [["d_loss", d_loss_list], ["g_loss", g_loss_list],
["d_loss_real", d_loss_real_list], ["d_loss_fake", d_loss_fake_list]]
for step in steps:
for i in tqdm(range(step[1]), desc=step[0]):
start_time = tf_time()
batch_z = self.random_z(self.batch_size)
batch_images, _ = self.next_batch(self.batch_size)
batch_images = batch_images.reshape([batch_images.shape[0], ] + self.input_shape)
_, d_loss, d_loss_real, d_loss_fake = self.session.run(
[self.train_dis_op, self.loss_dis, self.loss_dis_real, self.loss_dis_fake],
feed_dict={
self.input_data: batch_images,
self.input_z: batch_z
})
d_loss_list.append(d_loss)
d_loss_fake_list.append(d_loss_fake)
d_loss_real_list.append(d_loss_real)
if step[0] == 'G':
train_op = self.train_whole_op
if random.random() < step[2]:
train_op = self.train_gen_op
for _ in range(step[3]):
_, g_loss = self.session.run([train_op, self.loss_gen], feed_dict={
self.input_z: batch_z
})
g_loss_list.append(g_loss)
if tf_need_test(i, self.test_freq):
if save:
self.saver.save()
self.test_fun(model=self, n_iter=i, losses=all_loss)
if log is not None:
log("--- D-G --- ")
tf_log(i, d_loss, n_iter=n_step, start_time=start_time, log=log,
message=message + "Discriminator: ")
tf_log(i, g_loss, n_iter=n_step, start_time=start_time, log=log,
message=message + "GAN: ")
if save:
self.saver.save()
self.test_fun(model=self, n_iter=i, losses=all_loss)
return all_loss
def tf_concat_batch(*args):
return np.concatenate(args)
def tf_max_pool2d(input_, d_h=3, d_w=3, name="pool2d", reuse=False, padding='SAME'):
window = [1, d_h, d_w, 1]
return tf.nn.max_pool(input_, window, window, padding=padding, name=name)
def tf_max_pool3d(input_, d_d=3, d_h=3, d_w=3, name="pool3d", reuse=False, padding='SAME'):
window = [1, d_d, d_h, d_w, 1]
return tf.nn.max_pool(input_, window, window, padding=padding, name=name)
def tf_batch_norm(input_var, epsilon=1e-5, decay=0.9, mode='3d', name="batch_norm_2d", reuse=False):
axes = [0, 1, 2]
if mode == '3d':
axes = [0, 1, 2, 3]
elif mode == '2d':
axes = [0, 1, 2]
elif mode == '1d':
axes = [0, 1]
with tf.variable_scope(name, reuse=reuse):
shape = input_var.get_shape().as_list()
beta = tf.get_variable("beta", [shape[-1]],
initializer=tf.constant_initializer(0.))
gamma = tf.get_variable("gamma", [shape[-1]],
initializer=tf.random_normal_initializer(1., 0.02))
batch_mean, batch_var = tf.nn.moments(input_var, axes, name='moments')
ema = tf.train.ExponentialMovingAverage(decay=decay)
ema_apply_op = ema.apply([batch_mean, batch_var])
with tf.control_dependencies([ema_apply_op]):
mean, var = tf.identity(batch_mean), tf.identity(batch_var)
normed = tf.nn.batch_normalization(input_var, mean, var, beta, gamma, epsilon, name="batch_normalization")
return normed
def tf_dropout(input_var, drop_prob, name="dropout"):
return tf.nn.dropout(input_var, 1-drop_prob, name=name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment