Created
April 19, 2018 23:44
-
-
Save alexlee-gk/1ae88125ec38efc48b542a4c0356078f to your computer and use it in GitHub Desktop.
Equivalence and timing tests for upsample_conv2d and conv_pool2d.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import tensorflow as tf | |
from video_prediction.ops import pad2d_paddings, conv2d, deconv2d, upsample2d, upsample_conv2d, pool2d, conv_pool2d | |
def test_upsample_conv2d(): | |
sess = tf.Session() | |
batch = 16 | |
for strides in ([2, 2], [3, 4]): | |
for in_height, in_width in ([8, 8], [16, 8]): | |
for kernel_size in ([3, 3], [4, 4], [5, 5], [3, 6], [3, 7]): | |
for in_channels in (1, 3): | |
for filters in (1, 3): | |
inputs = np.random.random((batch, in_height, in_width, in_channels)).astype(np.float32) | |
kernel = np.random.random((kernel_size[0], kernel_size[1], in_channels, filters)).astype(np.float32) | |
bias = np.random.random(filters).astype(np.float32) | |
inputs_ph = tf.placeholder(tf.float32, shape=inputs.shape) | |
kernel_ph = tf.placeholder(tf.float32, shape=kernel.shape) | |
bias_ph = tf.placeholder(tf.float32, shape=bias.shape) | |
outputs = upsample_conv2d(inputs_ph, filters, kernel_size=kernel_size, strides=strides, | |
kernel=kernel_ph, bias=bias_ph) | |
# upsample with bilinear interpolation | |
inputs_up = upsample2d(inputs_ph, strides, padding='VALID') | |
# convolve upsampled input with kernel | |
outputs_up = conv2d(inputs_up, filters, kernel_size=kernel_size, | |
kernel=kernel_ph, bias=bias_ph, padding='FULL') | |
# crop appropriately | |
same_paddings = pad2d_paddings(inputs_ph, kernel_size, strides=(1, 1), padding='SAME') | |
full_paddings = pad2d_paddings(inputs_ph, kernel_size, strides=(1, 1), padding='FULL') | |
crop_top = (strides[0] - strides[0] % 2) // 2 + full_paddings[1][1] - same_paddings[1][1] | |
crop_left = (strides[1] - strides[1] % 2) // 2 + full_paddings[2][1] - same_paddings[2][1] | |
outputs_up = outputs_up[:, crop_top:crop_top + strides[0] * in_height, | |
crop_left:crop_left + strides[1] * in_width, :] | |
feed_dict = {inputs_ph: inputs, kernel_ph: kernel, bias_ph: bias} | |
inputs_up_result, outputs_result, outputs_up_result = sess.run([inputs_up, outputs, outputs_up], feed_dict=feed_dict) | |
assert np.allclose(outputs_result, outputs_up_result, atol=1.0 / 255.0) | |
def test_conv_pool2d(): | |
sess = tf.Session() | |
batch = 16 | |
for strides in ([2, 2], [3, 4]): | |
for in_height, in_width in ([12, 8], [18, 16]): # should be multiple of strides | |
for kernel_size in ([3, 3], [4, 4], [5, 5], [3, 6], [3, 7]): | |
for in_channels in (1, 3): | |
for filters in (1, 3): | |
inputs = np.random.random((batch, in_height, in_width, in_channels)).astype(np.float32) | |
kernel = np.random.random((kernel_size[0], kernel_size[1], in_channels, filters)).astype(np.float32) | |
bias = np.random.random(filters).astype(np.float32) | |
inputs_ph = tf.placeholder(tf.float32, shape=inputs.shape) | |
kernel_ph = tf.placeholder(tf.float32, shape=kernel.shape) | |
bias_ph = tf.placeholder(tf.float32, shape=bias.shape) | |
outputs = conv_pool2d(inputs_ph, filters, kernel_size=kernel_size, strides=strides, | |
kernel=kernel_ph, bias=bias_ph, pool_mode='avg') | |
inputs_conv = conv2d(inputs_ph, filters, kernel_size=kernel_size, | |
kernel=kernel_ph, bias=bias_ph) | |
outputs_pool = pool2d(inputs_conv, pool_size=strides, strides=strides, pool_mode='avg') | |
feed_dict = {inputs_ph: inputs, kernel_ph: kernel, bias_ph: bias} | |
outputs_result, outputs_pool_result = sess.run([outputs, outputs_pool], feed_dict=feed_dict) | |
assert np.allclose(outputs_result, outputs_pool_result, atol=1.0 / 255.0) | |
def test_upsample_conv2d_speed(): | |
import time | |
sess = tf.Session() | |
batch = 16 | |
strides = [2, 2] | |
in_height = 64 | |
in_width = 64 | |
in_channels = 128 | |
kernel_size = [4, 4] | |
filters = 64 | |
inputs = tf.get_variable('inputs_upsample_conv2d', (batch, in_height, in_width, in_channels), dtype=tf.float32, | |
initializer=tf.truncated_normal_initializer(stddev=0.02)) | |
kernel = tf.get_variable('kernel_upsample_conv2d', (kernel_size[0], kernel_size[1], in_channels, filters), dtype=tf.float32, | |
initializer=tf.truncated_normal_initializer(stddev=0.02)) | |
kernel_transposed = tf.get_variable('kernel_transposed_upsample_conv2d', | |
(kernel_size[0], kernel_size[1], filters, in_channels), dtype=tf.float32, | |
initializer=tf.truncated_normal_initializer(stddev=0.02)) | |
outputs = upsample_conv2d(inputs, filters, kernel_size=kernel_size, strides=strides, | |
kernel=kernel, use_bias=False) | |
inputs_up = upsample2d(inputs, strides, padding='VALID') | |
outputs_up = conv2d(inputs_up, filters, kernel_size=kernel_size, kernel=kernel, | |
use_bias=False)[:, :strides[0] * in_height, :strides[1] * in_width, :] | |
outputs_stride = deconv2d(inputs, filters, kernel_size=kernel_size, strides=strides, | |
kernel=kernel_transposed, use_bias=False) | |
outputs_sums = [tf.reduce_sum(outputs), tf.reduce_sum(outputs_up), tf.reduce_sum(outputs_stride)] | |
train_ops = [tf.train.AdamOptimizer().minimize(outputs_sum, var_list=[kernel, kernel_transposed]) | |
for outputs_sum in outputs_sums] | |
sess.run(tf.global_variables_initializer()) | |
outputs_times = [0.0 for _ in outputs_sums] | |
for t in range(1000): | |
for i, outputs_sum in enumerate(outputs_sums): | |
if t > 10: | |
start_time = time.time() | |
sess.run(outputs_sum) | |
if t > 10: | |
outputs_times[i] += time.time() - start_time | |
print(outputs_times) | |
outputs_times = [0.0 for _ in train_ops] | |
for t in range(1000): | |
for i, train_op in enumerate(train_ops): | |
if t > 10: | |
start_time = time.time() | |
sess.run(train_op) | |
if t > 10: | |
outputs_times[i] += time.time() - start_time | |
print(outputs_times) | |
def test_conv_pool2d_speed(): | |
import time | |
sess = tf.Session() | |
batch = 16 | |
strides = [2, 2] | |
in_height = 128 | |
in_width = 128 | |
in_channels = 64 | |
kernel_size = [4, 4] | |
filters = 128 | |
inputs = tf.get_variable('inputs_conv_pool2d', (batch, in_height, in_width, in_channels), dtype=tf.float32, | |
initializer=tf.truncated_normal_initializer(stddev=0.02)) | |
kernel = tf.get_variable('kernel_conv_pool2d', (kernel_size[0], kernel_size[1], in_channels, filters), dtype=tf.float32, | |
initializer=tf.truncated_normal_initializer(stddev=0.02)) | |
outputs = conv_pool2d(inputs, filters, kernel_size=kernel_size, strides=strides, | |
kernel=kernel, use_bias=False) | |
inputs_conv = conv2d(inputs, filters, kernel_size=kernel_size, kernel=kernel, | |
use_bias=False, padding='SAME') | |
outputs_pool = pool2d(inputs_conv, pool_size=strides, strides=strides, pool_mode='avg') | |
outputs_stride = conv2d(inputs, filters, kernel_size=kernel_size, strides=strides, | |
kernel=kernel, use_bias=False) | |
outputs_sums = [tf.reduce_sum(outputs), tf.reduce_sum(outputs_pool), tf.reduce_sum(outputs_stride)] | |
train_ops = [tf.train.AdamOptimizer().minimize(outputs_sum, var_list=[kernel]) | |
for outputs_sum in outputs_sums] | |
sess.run(tf.global_variables_initializer()) | |
outputs_times = [0.0 for _ in outputs_sums] | |
for t in range(1000): | |
for i, outputs_sum in enumerate(outputs_sums): | |
if t > 10: | |
start_time = time.time() | |
sess.run(outputs_sum) | |
if t > 10: | |
outputs_times[i] += time.time() - start_time | |
print(outputs_times) | |
outputs_times = [0.0 for _ in train_ops] | |
for t in range(1000): | |
for i, train_op in enumerate(train_ops): | |
if t > 10: | |
start_time = time.time() | |
sess.run(train_op) | |
if t > 10: | |
outputs_times[i] += time.time() - start_time | |
print(outputs_times) | |
if __name__ == "__main__": | |
test_upsample_conv2d() | |
test_conv_pool2d() | |
test_upsample_conv2d_speed() | |
test_conv_pool2d_speed() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment