Created
September 6, 2016 23:36
-
-
Save jgranadof/a6703772ebdb4db6aec92eef55d861a5 to your computer and use it in GitHub Desktop.
Visualize deeper layers in Tensorflow by displaying images which gain the highest response from neurons. Written for cifar10 model.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# !!! Note for cifar10_train_and_eval.py !!! | |
# | |
# 1. Put this file into tensorflow/models/image/cifar10 directory. | |
# 2. For this file to work, you need to comment out tf.image_summary() in | |
# file tensorflow/models/image/cifar_input.py | |
# | |
# Copyright 2015 Google Inc. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# ============================================================================== | |
"""A binary to train CIFAR-10 using a single GPU. | |
Accuracy: | |
cifar10_train.py achieves ~86% accuracy after 100K steps (256 epochs of | |
data) as judged by cifar10_eval.py. | |
Speed: With batch_size 128. | |
System | Step Time (sec/batch) | Accuracy | |
------------------------------------------------------------------ | |
1 Tesla K20m | 0.35-0.60 | ~86% at 60K steps (5 hours) | |
1 Tesla K40m | 0.25-0.35 | ~86% at 100K steps (4 hours) | |
Usage: | |
Please see the tutorial and website for how to download the CIFAR-10 | |
data set, compile the program and train the model. | |
http://tensorflow.org/tutorials/deep_cnn/ | |
""" | |
from __future__ import absolute_import | |
from __future__ import division | |
from __future__ import print_function | |
from datetime import datetime | |
import os.path | |
import time | |
import math | |
import numpy as np | |
from six.moves import xrange # pylint: disable=redefined-builtin | |
import tensorflow as tf | |
from tensorflow.models.image.cifar10 import cifar10 | |
FLAGS = tf.app.flags.FLAGS | |
tf.app.flags.DEFINE_string('train_dir', '/tmp/cifar10_train', | |
"""Directory where to write event logs """ | |
"""and checkpoint.""") | |
tf.app.flags.DEFINE_integer('max_steps', 1000000, | |
"""Number of batches to run.""") | |
tf.app.flags.DEFINE_boolean('log_device_placement', False, | |
"""Whether to log device placement.""") | |
def evaluate_set (sess, top_k_op, num_examples): | |
"""Convenience function to run evaluation for for every batch. | |
Sum the number of correct predictions and output one precision value. | |
Args: | |
sess: current Session | |
top_k_op: tensor of type tf.nn.in_top_k | |
num_examples: number of examples to evaluate | |
""" | |
num_iter = int(math.ceil(num_examples / FLAGS.batch_size)) | |
true_count = 0 # Counts the number of correct predictions. | |
total_sample_count = num_iter * FLAGS.batch_size | |
for step in xrange(num_iter): | |
predictions = sess.run([top_k_op]) | |
true_count += np.sum(predictions) | |
# Compute precision | |
return true_count / total_sample_count | |
def train(): | |
"""Train CIFAR-10 for a number of steps.""" | |
with tf.Graph().as_default(): | |
with tf.variable_scope("model") as scope: | |
global_step = tf.Variable(0, trainable=False) | |
# Get images and labels for CIFAR-10. | |
images, labels = cifar10.distorted_inputs() | |
images_eval, labels_eval = cifar10.inputs(eval_data=True) | |
# Build a Graph that computes the logits predictions from the | |
# inference model. | |
logits = cifar10.inference(images) | |
scope.reuse_variables() | |
logits_eval = cifar10.inference(images_eval) | |
# Calculate loss. | |
loss = cifar10.loss(logits, labels) | |
# For evaluation | |
top_k = tf.nn.in_top_k (logits, labels, 1) | |
top_k_eval = tf.nn.in_top_k (logits_eval, labels_eval, 1) | |
# Add precision summary | |
summary_train_prec = tf.placeholder(tf.float32) | |
summary_eval_prec = tf.placeholder(tf.float32) | |
tf.scalar_summary('precision/train', summary_train_prec) | |
tf.scalar_summary('precision/eval', summary_eval_prec) | |
# Build a Graph that trains the model with one batch of examples and | |
# updates the model parameters. | |
train_op = cifar10.train(loss, global_step) | |
# Create a saver. | |
saver = tf.train.Saver(tf.all_variables()) | |
# Build the summary operation based on the TF collection of Summaries. | |
summary_op = tf.merge_all_summaries() | |
# Build an initialization operation to run below. | |
init = tf.initialize_all_variables() | |
# Start running operations on the Graph. | |
sess = tf.Session(config=tf.ConfigProto( | |
log_device_placement=FLAGS.log_device_placement)) | |
sess.run(init) | |
# Start the queue runners. | |
tf.train.start_queue_runners(sess=sess) | |
summary_writer = tf.train.SummaryWriter(FLAGS.train_dir, | |
graph_def=sess.graph_def) | |
for step in xrange(FLAGS.max_steps): | |
start_time = time.time() | |
_, loss_value = sess.run([train_op, loss]) | |
duration = time.time() - start_time | |
assert not np.isnan(loss_value), 'Model diverged with loss = NaN' | |
if step % 10 == 0: | |
num_examples_per_step = FLAGS.batch_size | |
examples_per_sec = num_examples_per_step / duration | |
sec_per_batch = float(duration) | |
format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f ' | |
'sec/batch)') | |
print (format_str % (datetime.now(), step, loss_value, | |
examples_per_sec, sec_per_batch)) | |
EVAL_STEP = 10 | |
EVAL_NUM_EXAMPLES = 1024 | |
if step % EVAL_STEP == 0: | |
prec_train = evaluate_set (sess, top_k, EVAL_NUM_EXAMPLES) | |
prec_eval = evaluate_set (sess, top_k_eval, EVAL_NUM_EXAMPLES) | |
print('%s: precision train = %.3f' % (datetime.now(), prec_train)) | |
print('%s: precision eval = %.3f' % (datetime.now(), prec_eval)) | |
if step % 100 == 0: | |
summary_str = sess.run(summary_op, feed_dict={summary_train_prec: prec_train, | |
summary_eval_prec: prec_eval}) | |
summary_writer.add_summary(summary_str, step) | |
# Save the model checkpoint periodically. | |
if step % 1000 == 0 or (step + 1) == FLAGS.max_steps: | |
checkpoint_path = os.path.join(FLAGS.train_dir, 'model.ckpt') | |
saver.save(sess, checkpoint_path, global_step=step) | |
def main(argv=None): # pylint: disable=unused-argument | |
cifar10.maybe_download_and_extract() | |
if tf.gfile.Exists(FLAGS.train_dir): | |
tf.gfile.DeleteRecursively(FLAGS.train_dir) | |
tf.gfile.MakeDirs(FLAGS.train_dir) | |
train() | |
if __name__ == '__main__': | |
tf.app.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2015 Google Inc. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# ============================================================================== | |
''' | |
Visualize what pooling and convolutional neurons learned | |
by displaying images that gain highest response. | |
Motivation: | |
It is straightforward to visualize filters in the first convolutional layer, | |
but not in deeper layers. One way to visualize a neuron is too find images | |
that the neuron fires most one. Inspired by: | |
[1]: "Rich feature hierarchies for accurate object detection and semantic | |
segmentation" by Ross Girshick et al., CVPR, 2014, section 3.1 | |
This file has two functions for visualizing high responses: | |
1) visualize_conv - for some channels in a convolutional layer. | |
2) visualize_pooling - for some neurons in a pooling layer | |
Note that for a convolutional filter, the max response is searched across | |
both images and x,y coordinates. At the same time, for a pooling neuron, | |
the max response is searched only acrooss images because the coordinates | |
of pooling neurons are fixed (while conv. filter is shared across x,y.) | |
Implementation issues: | |
The search for maximum across images is approximate -- only one best image | |
from each batch can be included into the result. This is done for simplicity | |
-- please contribute by generalizing to several images per batch. | |
I use OpenCV for drawing. If you can change to PIL or whatever, | |
please propose a patch. | |
Usage: | |
0) Get python bindings to OpenCV | |
1) Examine function 'visualize_excitations'. It has an example of visualizing | |
conv2 and pool2 layers. | |
2) Change function inference() in cifar10.py so that it also returns | |
conv2 and pool2 tensors. See line 415 of this file. | |
3) Train cifar10 by running cifar10_train.py | |
4) Run this file. | |
''' | |
from __future__ import absolute_import | |
from __future__ import division | |
from __future__ import print_function | |
import logging | |
#import cv2 | |
import numpy as np | |
import tensorflow as tf | |
from bisect import bisect_right | |
from math import ceil | |
from tensorflow.models.image.cifar10 import cifar10 | |
FLAGS = tf.app.flags.FLAGS | |
tf.app.flags.DEFINE_string('eval_data', 'test', | |
"""Either 'test' or 'train_eval'.""") | |
tf.app.flags.DEFINE_string('checkpoint_dir', '/tmp/cifar10_train', | |
"""Directory where to read model checkpoints.""") | |
tf.app.flags.DEFINE_integer('num_examples', 10000, | |
"""Number of examples to run.""") | |
tf.app.flags.DEFINE_string('excitation_layer', 'pool2', | |
"""Visualize excitations of this layer.""") | |
def _prepare_patch (img, response, y, x, dst_height, scale, | |
stride, accum_padding, half_receptive_field): | |
'''Scale patch, overlay receptive field, and response | |
''' | |
COLOR = (256,256,256) | |
THICKNESS = 2 | |
# resize image | |
img = cv2.resize(img, dsize=(0,0), fx=scale, fy=scale, | |
interpolation=cv2.INTER_NEAREST) | |
# overlay response value | |
cv2.putText(img, '%0.1f' % response, | |
org=(0,int(dst_height*0.9)), | |
fontFace=cv2.FONT_HERSHEY_DUPLEX, | |
fontScale=dst_height*0.008, | |
color=COLOR, | |
thickness=THICKNESS) | |
# show the receptive field of a channel (if a user cared to pass params) | |
if accum_padding is None or half_receptive_field is None or stride is None: | |
logging.warning ('support displaying receptive field only with user input') | |
else: | |
x_min = y * stride + accum_padding - half_receptive_field | |
x_max = y * stride + accum_padding + half_receptive_field | |
y_min = x * stride + accum_padding - half_receptive_field + 1 | |
y_max = x * stride + accum_padding + half_receptive_field + 1 | |
x_min = int(x_min*scale) | |
x_max = int(x_max*scale) | |
y_min = int(y_min*scale) | |
y_max = int(y_max*scale) | |
cv2.rectangle(img, (x_min,y_min), (x_max,y_max), | |
color=COLOR, | |
thickness=THICKNESS) | |
return img | |
def visualize_conv (sess, images, layer, channels, | |
half_receptive_field=None, | |
accum_padding=None, | |
stride=None, | |
num_excitations=16, | |
num_images=1024, | |
dst_height=96): | |
''' | |
TL;DR: display some 'images' that receive the strongest response | |
from user-selected 'channels' of a convolutional 'layer'. | |
A 64-channel convolutional layer is consists of 64 filters. | |
For each of the channels, the corresponding filter naturally fires diffrently | |
on different pixels of different images. We're interested in highest responses. | |
For each filter, this function searches for such high responses, plus | |
the corresponding images and the coordinates of those responses. | |
We collect 'num_excitations' images for each filter and stack them into a row. | |
Rows from all filters of interest are stacked vetically into the final map. | |
For each image, the response value and the receptive field are visualized. | |
Args: | |
sess: tensorflow session | |
images: tensor for source images | |
layer: tensor for a convolutional layer response | |
channels: ids of filters of interest, a numpy array. | |
Example: channels=np.asarray([0,1,2]) will result in 3 rows | |
with responses from 0th, 1st, and 2nd filters. | |
half_receptive_field: integer, half of the receptive field for this layer, [1] | |
accum_padding: integer, accumulated padding w.r.t pixels of the input image. | |
Equals 0 when all previous layers use 'SAME' padding | |
stride: integer, equals to multiplication of strides of all prev. layers. | |
num_excitations: number of images to collect for each channel | |
num_images: number of input images to search | |
dst_height: will resize each image to have this height | |
Returns: | |
excitation_map: a ready-to-show image, similar to R-CNN paper. | |
* Suggestions on how to automatically infer half_receptive_field, accum_padding, | |
and stride are welcome. | |
''' | |
assert isinstance(channels, np.ndarray), 'channels must be a numpy array' | |
assert len(channels.shape) == 1, 'need 1D array [num_filters]' | |
# now shape is [im_id, Y, X, ch] | |
assert layer.get_shape()[0].value == FLAGS.batch_size | |
Y = layer.get_shape()[1].value | |
X = layer.get_shape()[2].value | |
num_ch = layer.get_shape()[3].value | |
logging.info ('Y: %d, X: %d, num_ch: %d' % (Y, X, num_ch)) | |
# to shape [ch, Y, X, im_id], because we'll reduce on Y, X, and im_id | |
layer0 = tf.transpose(layer, (3,1,2,0)) | |
layer1 = tf.reshape(layer0, [num_ch, -1]) | |
# indices of the highest responses across batch, X, and Y | |
responses, best_ids = tf.nn.top_k(layer1, k=1) | |
# make three lists of empty lists | |
resps = [list([]) for _ in xrange(len(channels))] | |
imges = [list([]) for _ in xrange(len(channels))] | |
yx = [list([]) for _ in xrange(len(channels))] | |
# Start the queue runners. | |
coord = tf.train.Coordinator() | |
try: | |
threads = [] | |
for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS): | |
threads.extend(qr.create_threads(sess, coord=coord, daemon=True, | |
start=True)) | |
# the same as in cifar10_eval, split evaluation by batches | |
num_iter = int(ceil(num_images / FLAGS.batch_size)) | |
for step in range(num_iter): | |
logging.debug ('==========') | |
logging.info ('step %d out of %d' % (step, num_iter)) | |
if coord.should_stop(): | |
break | |
best_ids_vec, images_vec, responses_vec = \ | |
sess.run([best_ids, images, responses]) | |
# after this point everything is numpy and opencv | |
# collect best responding image from the batch for each filter=channel | |
for ch_id, ch in enumerate(channels): | |
logging.debug ('----------') | |
logging.debug ('ch_id: %d, ch: %s' % (ch_id, ch)) | |
best_response = responses_vec [ch,0] | |
best_id = best_ids_vec [ch,0] | |
logging.debug ('best_id: %d, Y: %d, X: %d' % (best_id, Y, X)) | |
# undo reshape -- figure out best indices in Y,X,batch_id coordinates | |
best_im = best_id % FLAGS.batch_size | |
best_y = int(best_id / FLAGS.batch_size) / X | |
best_x = int(best_id / FLAGS.batch_size) % X | |
# take the image | |
best_image = images_vec [best_im,:,:,:] | |
logging.debug ('best_im,best_y,best_x: %d,%d,%d, best_response: %f' % | |
(best_im, best_y, best_x, best_response)) | |
# look up the insertion point in the sorted responses lists | |
i = bisect_right (resps[ch_id], best_response) | |
# if the previous response is exactly the same, the image must be same too | |
if i > 0 and resps[ch_id][i-1] == best_response: | |
logging.debug ('got same response. Skip.') | |
continue | |
# insert both response and image into respective lists | |
resps[ch_id].insert(i, best_response) | |
imges[ch_id].insert(i, best_image) | |
yx[ch_id].insert (i, (best_y, best_x)) | |
# pop_front if lists went big and added response is better than current min | |
if len(resps[ch_id]) > num_excitations: | |
del resps[ch_id][0] | |
del imges[ch_id][0] | |
del yx[ch_id][0] | |
logging.debug (resps) | |
except Exception as e: # pylint: disable=broad-except | |
coord.request_stop(e) | |
coord.request_stop() | |
coord.join(threads, stop_grace_period_secs=10) | |
# scale for resizing images | |
src_height = images.get_shape()[1].value | |
scale = float(dst_height) / src_height | |
for ch_id, _ in enumerate(channels): | |
for img_id, img in enumerate(imges[ch_id]): | |
imges[ch_id][img_id] = _prepare_patch( | |
imges[ch_id][img_id], resps[ch_id][img_id], | |
yx[ch_id][img_id][1], yx[ch_id][img_id][0], | |
dst_height, scale, | |
stride, accum_padding, half_receptive_field) | |
# concatenate images for this channel | |
imges[ch_id] = np.concatenate(list(imges[ch_id]), axis=1) | |
# concatenate stripes of all channels into one map | |
excitation_map = np.concatenate(list(imges), axis=0) | |
return excitation_map | |
def visualize_pooling (sess, images, layer, neurons, | |
half_receptive_field=None, | |
accum_padding=None, | |
stride=None, | |
num_excitations=16, | |
num_images=1024, | |
dst_height=96): | |
''' | |
TL;DR: display some 'images' that receive the strongest response | |
from user-selected neurons of a pooling 'layer'. | |
A pooling layer is of shape Y x X x Channels. | |
Each neuron from that layer is connected to a pixel in the output feature map. | |
This function visualizes what a neuron have learned by displying images | |
which receive the strongest responses on that neuron. | |
We collect 'num_excitations' images for each neuron and stack them into a row. | |
Rows from all neurons of interest are stacked vetically into the final map. | |
For each image, the response value and the receptive field are visualized. | |
Args: | |
sess: tensorflow session | |
images: tensor for source images | |
layer: tensor for a convolutional layer response | |
neurons: neurons to see best excitations for. | |
It's probably only a fraction of the layer neurons. | |
Example: neurons=np.asarray([[0,1,2],[58,60,4]]) | |
half_receptive_field: integer, half of the receptive field for this layer, [1] | |
accum_padding: integer, accumulated padding w.r.t pixels of the input image. | |
Equals 0 when all previous layers use 'SAME' padding | |
stride: integer, equals to multiplication of strides of all prev. layers. | |
num_excitations: number of images to collect for each channel | |
num_images: number of input images to search | |
dst_height: will resize each image to have this height | |
Returns: | |
excitation_map: a ready-to-show image, similar to R-CNN paper. | |
* Suggestions on how to automatically infer half_receptive_field, accum_padding, | |
and stride are welcome. | |
''' | |
assert isinstance(neurons, np.ndarray), 'neurons must be a numpy array' | |
assert len(neurons.shape) == 2 and neurons.shape[1] == 3, 'need shape [N,3]' | |
# indices of the "most exciting" patches in a batch, for each neuron | |
_, best_ids = tf.nn.top_k(tf.transpose(layer, (1,2,3,0)), k=1) | |
# make two lists of empty lists | |
# will store num_excitations of best layer/images for each neuron | |
resps = [list([]) for _ in xrange(len(neurons))] | |
imges = [list([]) for _ in xrange(len(neurons))] | |
# Start the queue runners. | |
coord = tf.train.Coordinator() | |
try: | |
threads = [] | |
for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS): | |
threads.extend(qr.create_threads(sess, coord=coord, daemon=True, | |
start=True)) | |
# the same as in cifar10_eval, split evaluation by batches | |
num_iter = int(ceil(num_images / FLAGS.batch_size)) | |
for step in range(num_iter): | |
logging.debug ('==========') | |
logging.info ('step %d out of %d' % (step, num_iter)) | |
if coord.should_stop(): | |
break | |
best_ids_mat, images_mat, responses_mat = sess.run( | |
[best_ids, images, layer]) | |
# after this point everything is numpy and opencv | |
# collect best responding image from the batch for each neuron=[y,x,ch] | |
for n_id, n in enumerate(neurons): | |
logging.debug ('----------') | |
logging.debug ('n_id: %d, n: %s' % (n_id, str(n))) | |
best_id = best_ids_mat [n[0],n[1],n[2],0] | |
best_image = images_mat [best_id,:,:,:] | |
best_response = responses_mat [best_id,n[0],n[1],n[2]] | |
logging.debug ('best_id: %d, best_response: %f' % (best_id, best_response)) | |
# look up the insertion point in the sorted responses lists | |
i = bisect_right (resps[n_id], best_response) | |
# if the previous response is exactly the same, the image must be same too | |
if i > 0 and resps[n_id][i-1] == best_response: | |
logging.debug ('got same response. Skip.') | |
continue | |
# insert both response and image into respective lists | |
resps[n_id].insert(i, best_response) | |
imges[n_id].insert(i, best_image) | |
# pop_front if lists went big and added response is better than current min | |
if len(resps[n_id]) > num_excitations: | |
del resps[n_id][0] | |
del imges[n_id][0] | |
logging.debug (resps) | |
except Exception as e: # pylint: disable=broad-except | |
coord.request_stop(e) | |
coord.request_stop() | |
coord.join(threads, stop_grace_period_secs=10) | |
# scale for resizing images | |
src_height = images.get_shape()[1].value | |
scale = float(dst_height) / src_height | |
for n_id, n in enumerate(neurons): | |
for img_id, img in enumerate(imges[n_id]): | |
imges[n_id][img_id] = _prepare_patch( | |
imges[n_id][img_id], resps[n_id][img_id], | |
n[1], n[0], | |
dst_height, scale, | |
stride, accum_padding, half_receptive_field) | |
# concatenate images for this neuron, and then all the resultant stripes | |
imges[n_id] = np.concatenate(list(imges[n_id]), axis=1) | |
excitation_map = np.concatenate(list(imges), axis=0) | |
return excitation_map | |
def visualize_excitations(): | |
''' Restore a trained model, and run one of the visualizations. ''' | |
with tf.Graph().as_default(): | |
# Get images for CIFAR-10. | |
eval_data = FLAGS.eval_data == 'test' | |
images, _ = cifar10.inputs(eval_data=eval_data) | |
# Get conv2 and pool2 responses | |
_, conv2, pool2 = cifar10.inference(images) | |
# Restore the moving average version of the learned variables for eval. | |
variable_averages = tf.train.ExponentialMovingAverage( | |
cifar10.MOVING_AVERAGE_DECAY) | |
variables_to_restore = variable_averages.variables_to_restore() | |
saver = tf.train.Saver(variables_to_restore) | |
with tf.Session() as sess: | |
ckpt = tf.train.get_checkpoint_state(FLAGS.checkpoint_dir) | |
if ckpt and ckpt.model_checkpoint_path: | |
# Restores from checkpoint | |
saver.restore(sess, ckpt.model_checkpoint_path) | |
else: | |
print('No checkpoint file found') | |
return | |
if FLAGS.excitation_layer == 'conv2': | |
channels=np.asarray([0,31,63]) # first, 31st, and last channels | |
excitation_map = visualize_conv (sess, images, conv2, channels, | |
half_receptive_field=5, | |
accum_padding=0, | |
stride=2, | |
dst_height=96, | |
num_images=FLAGS.num_examples) | |
elif FLAGS.excitation_layer == 'pool2': | |
neurons=np.asarray([[0,0,0], # top-left corner of first map | |
[5,5,63], # bottom-right corner of last map | |
[3,4,5]]) # in the middle of 5th map | |
excitation_map = visualize_pooling (sess, images, pool2, neurons, | |
half_receptive_field=6, | |
accum_padding=0, | |
stride=4, | |
dst_height=96, | |
num_images=FLAGS.num_examples) | |
else: | |
raise Exception ('add your own layers and parameters') | |
excitation_map = cv2.cvtColor(excitation_map, cv2.COLOR_RGB2BGR) | |
cv2.imshow('excitations', excitation_map) | |
cv2.waitKey(-1) | |
def main(argv=None): # pylint: disable=unused-argument | |
logging.basicConfig (level=logging.INFO) | |
cifar10.maybe_download_and_extract() | |
visualize_excitations() | |
if __name__ == '__main__': | |
tf.app.run() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def put_kernels_on_grid (kernel, grid_Y, grid_X, pad=1): | |
'''Visualize conv. features as an image (mostly for the 1st layer). | |
Place kernel into a grid, with some paddings between adjacent filters. | |
Args: | |
kernel: tensor of shape [Y, X, NumChannels, NumKernels] | |
(grid_Y, grid_X): shape of the grid. Require: NumKernels == grid_Y * grid_X | |
User is responsible of how to break into two multiples. | |
pad: number of black pixels around each filter (between them) | |
Return: | |
Tensor of shape [(Y+pad)*grid_Y, (X+pad)*grid_X, NumChannels, 1]. | |
''' | |
# pad X and Y | |
x1 = tf.pad(kernel, tf.constant( [[pad,0],[pad,0],[0,0],[0,0]] )) | |
# X and Y dimensions, w.r.t. padding | |
Y = kernel.get_shape()[0] + pad | |
X = kernel.get_shape()[1] + pad | |
# put NumKernels to the 1st dimension | |
x2 = tf.transpose(x1, (3, 0, 1, 2)) | |
# organize grid on Y axis | |
x3 = tf.reshape(x2, tf.pack([grid_X, Y * grid_Y, X, 3])) | |
# switch X and Y axes | |
x4 = tf.transpose(x3, (0, 2, 1, 3)) | |
# organize grid on X axis | |
x5 = tf.reshape(x4, tf.pack([1, X * grid_X, Y * grid_Y, 3])) | |
# back to normal order (not combining with the next step for clarity) | |
x6 = tf.transpose(x5, (2, 1, 3, 0)) | |
# to tf.image_summary order [batch_size, height, width, channels], | |
# where in this case batch_size == 1 | |
x7 = tf.transpose(x6, (3, 0, 1, 2)) | |
# scale to [0, 1] | |
x_min = tf.reduce_min(x7) | |
x_max = tf.reduce_max(x7) | |
x8 = (x7 - x_min) / (x_max - x_min) | |
return x8 | |
# | |
# ... and somewhere inside "def train():" after calling "inference()" | |
# | |
# Visualize conv1 features | |
with tf.variable_scope('conv1') as scope_conv: | |
tf.get_variable_scope().reuse_variables() | |
weights = tf.get_variable('weights') | |
grid_x = grid_y = 8 # to get a square grid for 64 conv1 features | |
grid = put_kernels_on_grid (weights, (grid_y, grid_x)) | |
tf.image_summary('conv1/features', grid, max_images=1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment