Skip to content

Instantly share code, notes, and snippets.

@jgranadof
Created September 6, 2016 23:36
Show Gist options
  • Save jgranadof/a6703772ebdb4db6aec92eef55d861a5 to your computer and use it in GitHub Desktop.
Save jgranadof/a6703772ebdb4db6aec92eef55d861a5 to your computer and use it in GitHub Desktop.
Visualize deeper layers in Tensorflow by displaying images which gain the highest response from neurons. Written for cifar10 model.
# !!! Note for cifar10_train_and_eval.py !!!
#
# 1. Put this file into tensorflow/models/image/cifar10 directory.
# 2. For this file to work, you need to comment out tf.image_summary() in
# file tensorflow/models/image/cifar_input.py
#
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""A binary to train CIFAR-10 using a single GPU.
Accuracy:
cifar10_train.py achieves ~86% accuracy after 100K steps (256 epochs of
data) as judged by cifar10_eval.py.
Speed: With batch_size 128.
System | Step Time (sec/batch) | Accuracy
------------------------------------------------------------------
1 Tesla K20m | 0.35-0.60 | ~86% at 60K steps (5 hours)
1 Tesla K40m | 0.25-0.35 | ~86% at 100K steps (4 hours)
Usage:
Please see the tutorial and website for how to download the CIFAR-10
data set, compile the program and train the model.
http://tensorflow.org/tutorials/deep_cnn/
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from datetime import datetime
import os.path
import time
import math
import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from tensorflow.models.image.cifar10 import cifar10
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string('train_dir', '/tmp/cifar10_train',
"""Directory where to write event logs """
"""and checkpoint.""")
tf.app.flags.DEFINE_integer('max_steps', 1000000,
"""Number of batches to run.""")
tf.app.flags.DEFINE_boolean('log_device_placement', False,
"""Whether to log device placement.""")
def evaluate_set (sess, top_k_op, num_examples):
"""Convenience function to run evaluation for for every batch.
Sum the number of correct predictions and output one precision value.
Args:
sess: current Session
top_k_op: tensor of type tf.nn.in_top_k
num_examples: number of examples to evaluate
"""
num_iter = int(math.ceil(num_examples / FLAGS.batch_size))
true_count = 0 # Counts the number of correct predictions.
total_sample_count = num_iter * FLAGS.batch_size
for step in xrange(num_iter):
predictions = sess.run([top_k_op])
true_count += np.sum(predictions)
# Compute precision
return true_count / total_sample_count
def train():
"""Train CIFAR-10 for a number of steps."""
with tf.Graph().as_default():
with tf.variable_scope("model") as scope:
global_step = tf.Variable(0, trainable=False)
# Get images and labels for CIFAR-10.
images, labels = cifar10.distorted_inputs()
images_eval, labels_eval = cifar10.inputs(eval_data=True)
# Build a Graph that computes the logits predictions from the
# inference model.
logits = cifar10.inference(images)
scope.reuse_variables()
logits_eval = cifar10.inference(images_eval)
# Calculate loss.
loss = cifar10.loss(logits, labels)
# For evaluation
top_k = tf.nn.in_top_k (logits, labels, 1)
top_k_eval = tf.nn.in_top_k (logits_eval, labels_eval, 1)
# Add precision summary
summary_train_prec = tf.placeholder(tf.float32)
summary_eval_prec = tf.placeholder(tf.float32)
tf.scalar_summary('precision/train', summary_train_prec)
tf.scalar_summary('precision/eval', summary_eval_prec)
# Build a Graph that trains the model with one batch of examples and
# updates the model parameters.
train_op = cifar10.train(loss, global_step)
# Create a saver.
saver = tf.train.Saver(tf.all_variables())
# Build the summary operation based on the TF collection of Summaries.
summary_op = tf.merge_all_summaries()
# Build an initialization operation to run below.
init = tf.initialize_all_variables()
# Start running operations on the Graph.
sess = tf.Session(config=tf.ConfigProto(
log_device_placement=FLAGS.log_device_placement))
sess.run(init)
# Start the queue runners.
tf.train.start_queue_runners(sess=sess)
summary_writer = tf.train.SummaryWriter(FLAGS.train_dir,
graph_def=sess.graph_def)
for step in xrange(FLAGS.max_steps):
start_time = time.time()
_, loss_value = sess.run([train_op, loss])
duration = time.time() - start_time
assert not np.isnan(loss_value), 'Model diverged with loss = NaN'
if step % 10 == 0:
num_examples_per_step = FLAGS.batch_size
examples_per_sec = num_examples_per_step / duration
sec_per_batch = float(duration)
format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f '
'sec/batch)')
print (format_str % (datetime.now(), step, loss_value,
examples_per_sec, sec_per_batch))
EVAL_STEP = 10
EVAL_NUM_EXAMPLES = 1024
if step % EVAL_STEP == 0:
prec_train = evaluate_set (sess, top_k, EVAL_NUM_EXAMPLES)
prec_eval = evaluate_set (sess, top_k_eval, EVAL_NUM_EXAMPLES)
print('%s: precision train = %.3f' % (datetime.now(), prec_train))
print('%s: precision eval = %.3f' % (datetime.now(), prec_eval))
if step % 100 == 0:
summary_str = sess.run(summary_op, feed_dict={summary_train_prec: prec_train,
summary_eval_prec: prec_eval})
summary_writer.add_summary(summary_str, step)
# Save the model checkpoint periodically.
if step % 1000 == 0 or (step + 1) == FLAGS.max_steps:
checkpoint_path = os.path.join(FLAGS.train_dir, 'model.ckpt')
saver.save(sess, checkpoint_path, global_step=step)
def main(argv=None): # pylint: disable=unused-argument
cifar10.maybe_download_and_extract()
if tf.gfile.Exists(FLAGS.train_dir):
tf.gfile.DeleteRecursively(FLAGS.train_dir)
tf.gfile.MakeDirs(FLAGS.train_dir)
train()
if __name__ == '__main__':
tf.app.run()
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
'''
Visualize what pooling and convolutional neurons learned
by displaying images that gain highest response.
Motivation:
It is straightforward to visualize filters in the first convolutional layer,
but not in deeper layers. One way to visualize a neuron is too find images
that the neuron fires most one. Inspired by:
[1]: "Rich feature hierarchies for accurate object detection and semantic
segmentation" by Ross Girshick et al., CVPR, 2014, section 3.1
This file has two functions for visualizing high responses:
1) visualize_conv - for some channels in a convolutional layer.
2) visualize_pooling - for some neurons in a pooling layer
Note that for a convolutional filter, the max response is searched across
both images and x,y coordinates. At the same time, for a pooling neuron,
the max response is searched only acrooss images because the coordinates
of pooling neurons are fixed (while conv. filter is shared across x,y.)
Implementation issues:
The search for maximum across images is approximate -- only one best image
from each batch can be included into the result. This is done for simplicity
-- please contribute by generalizing to several images per batch.
I use OpenCV for drawing. If you can change to PIL or whatever,
please propose a patch.
Usage:
0) Get python bindings to OpenCV
1) Examine function 'visualize_excitations'. It has an example of visualizing
conv2 and pool2 layers.
2) Change function inference() in cifar10.py so that it also returns
conv2 and pool2 tensors. See line 415 of this file.
3) Train cifar10 by running cifar10_train.py
4) Run this file.
'''
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
#import cv2
import numpy as np
import tensorflow as tf
from bisect import bisect_right
from math import ceil
from tensorflow.models.image.cifar10 import cifar10
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string('eval_data', 'test',
"""Either 'test' or 'train_eval'.""")
tf.app.flags.DEFINE_string('checkpoint_dir', '/tmp/cifar10_train',
"""Directory where to read model checkpoints.""")
tf.app.flags.DEFINE_integer('num_examples', 10000,
"""Number of examples to run.""")
tf.app.flags.DEFINE_string('excitation_layer', 'pool2',
"""Visualize excitations of this layer.""")
def _prepare_patch (img, response, y, x, dst_height, scale,
stride, accum_padding, half_receptive_field):
'''Scale patch, overlay receptive field, and response
'''
COLOR = (256,256,256)
THICKNESS = 2
# resize image
img = cv2.resize(img, dsize=(0,0), fx=scale, fy=scale,
interpolation=cv2.INTER_NEAREST)
# overlay response value
cv2.putText(img, '%0.1f' % response,
org=(0,int(dst_height*0.9)),
fontFace=cv2.FONT_HERSHEY_DUPLEX,
fontScale=dst_height*0.008,
color=COLOR,
thickness=THICKNESS)
# show the receptive field of a channel (if a user cared to pass params)
if accum_padding is None or half_receptive_field is None or stride is None:
logging.warning ('support displaying receptive field only with user input')
else:
x_min = y * stride + accum_padding - half_receptive_field
x_max = y * stride + accum_padding + half_receptive_field
y_min = x * stride + accum_padding - half_receptive_field + 1
y_max = x * stride + accum_padding + half_receptive_field + 1
x_min = int(x_min*scale)
x_max = int(x_max*scale)
y_min = int(y_min*scale)
y_max = int(y_max*scale)
cv2.rectangle(img, (x_min,y_min), (x_max,y_max),
color=COLOR,
thickness=THICKNESS)
return img
def visualize_conv (sess, images, layer, channels,
half_receptive_field=None,
accum_padding=None,
stride=None,
num_excitations=16,
num_images=1024,
dst_height=96):
'''
TL;DR: display some 'images' that receive the strongest response
from user-selected 'channels' of a convolutional 'layer'.
A 64-channel convolutional layer is consists of 64 filters.
For each of the channels, the corresponding filter naturally fires diffrently
on different pixels of different images. We're interested in highest responses.
For each filter, this function searches for such high responses, plus
the corresponding images and the coordinates of those responses.
We collect 'num_excitations' images for each filter and stack them into a row.
Rows from all filters of interest are stacked vetically into the final map.
For each image, the response value and the receptive field are visualized.
Args:
sess: tensorflow session
images: tensor for source images
layer: tensor for a convolutional layer response
channels: ids of filters of interest, a numpy array.
Example: channels=np.asarray([0,1,2]) will result in 3 rows
with responses from 0th, 1st, and 2nd filters.
half_receptive_field: integer, half of the receptive field for this layer, [1]
accum_padding: integer, accumulated padding w.r.t pixels of the input image.
Equals 0 when all previous layers use 'SAME' padding
stride: integer, equals to multiplication of strides of all prev. layers.
num_excitations: number of images to collect for each channel
num_images: number of input images to search
dst_height: will resize each image to have this height
Returns:
excitation_map: a ready-to-show image, similar to R-CNN paper.
* Suggestions on how to automatically infer half_receptive_field, accum_padding,
and stride are welcome.
'''
assert isinstance(channels, np.ndarray), 'channels must be a numpy array'
assert len(channels.shape) == 1, 'need 1D array [num_filters]'
# now shape is [im_id, Y, X, ch]
assert layer.get_shape()[0].value == FLAGS.batch_size
Y = layer.get_shape()[1].value
X = layer.get_shape()[2].value
num_ch = layer.get_shape()[3].value
logging.info ('Y: %d, X: %d, num_ch: %d' % (Y, X, num_ch))
# to shape [ch, Y, X, im_id], because we'll reduce on Y, X, and im_id
layer0 = tf.transpose(layer, (3,1,2,0))
layer1 = tf.reshape(layer0, [num_ch, -1])
# indices of the highest responses across batch, X, and Y
responses, best_ids = tf.nn.top_k(layer1, k=1)
# make three lists of empty lists
resps = [list([]) for _ in xrange(len(channels))]
imges = [list([]) for _ in xrange(len(channels))]
yx = [list([]) for _ in xrange(len(channels))]
# Start the queue runners.
coord = tf.train.Coordinator()
try:
threads = []
for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS):
threads.extend(qr.create_threads(sess, coord=coord, daemon=True,
start=True))
# the same as in cifar10_eval, split evaluation by batches
num_iter = int(ceil(num_images / FLAGS.batch_size))
for step in range(num_iter):
logging.debug ('==========')
logging.info ('step %d out of %d' % (step, num_iter))
if coord.should_stop():
break
best_ids_vec, images_vec, responses_vec = \
sess.run([best_ids, images, responses])
# after this point everything is numpy and opencv
# collect best responding image from the batch for each filter=channel
for ch_id, ch in enumerate(channels):
logging.debug ('----------')
logging.debug ('ch_id: %d, ch: %s' % (ch_id, ch))
best_response = responses_vec [ch,0]
best_id = best_ids_vec [ch,0]
logging.debug ('best_id: %d, Y: %d, X: %d' % (best_id, Y, X))
# undo reshape -- figure out best indices in Y,X,batch_id coordinates
best_im = best_id % FLAGS.batch_size
best_y = int(best_id / FLAGS.batch_size) / X
best_x = int(best_id / FLAGS.batch_size) % X
# take the image
best_image = images_vec [best_im,:,:,:]
logging.debug ('best_im,best_y,best_x: %d,%d,%d, best_response: %f' %
(best_im, best_y, best_x, best_response))
# look up the insertion point in the sorted responses lists
i = bisect_right (resps[ch_id], best_response)
# if the previous response is exactly the same, the image must be same too
if i > 0 and resps[ch_id][i-1] == best_response:
logging.debug ('got same response. Skip.')
continue
# insert both response and image into respective lists
resps[ch_id].insert(i, best_response)
imges[ch_id].insert(i, best_image)
yx[ch_id].insert (i, (best_y, best_x))
# pop_front if lists went big and added response is better than current min
if len(resps[ch_id]) > num_excitations:
del resps[ch_id][0]
del imges[ch_id][0]
del yx[ch_id][0]
logging.debug (resps)
except Exception as e: # pylint: disable=broad-except
coord.request_stop(e)
coord.request_stop()
coord.join(threads, stop_grace_period_secs=10)
# scale for resizing images
src_height = images.get_shape()[1].value
scale = float(dst_height) / src_height
for ch_id, _ in enumerate(channels):
for img_id, img in enumerate(imges[ch_id]):
imges[ch_id][img_id] = _prepare_patch(
imges[ch_id][img_id], resps[ch_id][img_id],
yx[ch_id][img_id][1], yx[ch_id][img_id][0],
dst_height, scale,
stride, accum_padding, half_receptive_field)
# concatenate images for this channel
imges[ch_id] = np.concatenate(list(imges[ch_id]), axis=1)
# concatenate stripes of all channels into one map
excitation_map = np.concatenate(list(imges), axis=0)
return excitation_map
def visualize_pooling (sess, images, layer, neurons,
half_receptive_field=None,
accum_padding=None,
stride=None,
num_excitations=16,
num_images=1024,
dst_height=96):
'''
TL;DR: display some 'images' that receive the strongest response
from user-selected neurons of a pooling 'layer'.
A pooling layer is of shape Y x X x Channels.
Each neuron from that layer is connected to a pixel in the output feature map.
This function visualizes what a neuron have learned by displying images
which receive the strongest responses on that neuron.
We collect 'num_excitations' images for each neuron and stack them into a row.
Rows from all neurons of interest are stacked vetically into the final map.
For each image, the response value and the receptive field are visualized.
Args:
sess: tensorflow session
images: tensor for source images
layer: tensor for a convolutional layer response
neurons: neurons to see best excitations for.
It's probably only a fraction of the layer neurons.
Example: neurons=np.asarray([[0,1,2],[58,60,4]])
half_receptive_field: integer, half of the receptive field for this layer, [1]
accum_padding: integer, accumulated padding w.r.t pixels of the input image.
Equals 0 when all previous layers use 'SAME' padding
stride: integer, equals to multiplication of strides of all prev. layers.
num_excitations: number of images to collect for each channel
num_images: number of input images to search
dst_height: will resize each image to have this height
Returns:
excitation_map: a ready-to-show image, similar to R-CNN paper.
* Suggestions on how to automatically infer half_receptive_field, accum_padding,
and stride are welcome.
'''
assert isinstance(neurons, np.ndarray), 'neurons must be a numpy array'
assert len(neurons.shape) == 2 and neurons.shape[1] == 3, 'need shape [N,3]'
# indices of the "most exciting" patches in a batch, for each neuron
_, best_ids = tf.nn.top_k(tf.transpose(layer, (1,2,3,0)), k=1)
# make two lists of empty lists
# will store num_excitations of best layer/images for each neuron
resps = [list([]) for _ in xrange(len(neurons))]
imges = [list([]) for _ in xrange(len(neurons))]
# Start the queue runners.
coord = tf.train.Coordinator()
try:
threads = []
for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS):
threads.extend(qr.create_threads(sess, coord=coord, daemon=True,
start=True))
# the same as in cifar10_eval, split evaluation by batches
num_iter = int(ceil(num_images / FLAGS.batch_size))
for step in range(num_iter):
logging.debug ('==========')
logging.info ('step %d out of %d' % (step, num_iter))
if coord.should_stop():
break
best_ids_mat, images_mat, responses_mat = sess.run(
[best_ids, images, layer])
# after this point everything is numpy and opencv
# collect best responding image from the batch for each neuron=[y,x,ch]
for n_id, n in enumerate(neurons):
logging.debug ('----------')
logging.debug ('n_id: %d, n: %s' % (n_id, str(n)))
best_id = best_ids_mat [n[0],n[1],n[2],0]
best_image = images_mat [best_id,:,:,:]
best_response = responses_mat [best_id,n[0],n[1],n[2]]
logging.debug ('best_id: %d, best_response: %f' % (best_id, best_response))
# look up the insertion point in the sorted responses lists
i = bisect_right (resps[n_id], best_response)
# if the previous response is exactly the same, the image must be same too
if i > 0 and resps[n_id][i-1] == best_response:
logging.debug ('got same response. Skip.')
continue
# insert both response and image into respective lists
resps[n_id].insert(i, best_response)
imges[n_id].insert(i, best_image)
# pop_front if lists went big and added response is better than current min
if len(resps[n_id]) > num_excitations:
del resps[n_id][0]
del imges[n_id][0]
logging.debug (resps)
except Exception as e: # pylint: disable=broad-except
coord.request_stop(e)
coord.request_stop()
coord.join(threads, stop_grace_period_secs=10)
# scale for resizing images
src_height = images.get_shape()[1].value
scale = float(dst_height) / src_height
for n_id, n in enumerate(neurons):
for img_id, img in enumerate(imges[n_id]):
imges[n_id][img_id] = _prepare_patch(
imges[n_id][img_id], resps[n_id][img_id],
n[1], n[0],
dst_height, scale,
stride, accum_padding, half_receptive_field)
# concatenate images for this neuron, and then all the resultant stripes
imges[n_id] = np.concatenate(list(imges[n_id]), axis=1)
excitation_map = np.concatenate(list(imges), axis=0)
return excitation_map
def visualize_excitations():
''' Restore a trained model, and run one of the visualizations. '''
with tf.Graph().as_default():
# Get images for CIFAR-10.
eval_data = FLAGS.eval_data == 'test'
images, _ = cifar10.inputs(eval_data=eval_data)
# Get conv2 and pool2 responses
_, conv2, pool2 = cifar10.inference(images)
# Restore the moving average version of the learned variables for eval.
variable_averages = tf.train.ExponentialMovingAverage(
cifar10.MOVING_AVERAGE_DECAY)
variables_to_restore = variable_averages.variables_to_restore()
saver = tf.train.Saver(variables_to_restore)
with tf.Session() as sess:
ckpt = tf.train.get_checkpoint_state(FLAGS.checkpoint_dir)
if ckpt and ckpt.model_checkpoint_path:
# Restores from checkpoint
saver.restore(sess, ckpt.model_checkpoint_path)
else:
print('No checkpoint file found')
return
if FLAGS.excitation_layer == 'conv2':
channels=np.asarray([0,31,63]) # first, 31st, and last channels
excitation_map = visualize_conv (sess, images, conv2, channels,
half_receptive_field=5,
accum_padding=0,
stride=2,
dst_height=96,
num_images=FLAGS.num_examples)
elif FLAGS.excitation_layer == 'pool2':
neurons=np.asarray([[0,0,0], # top-left corner of first map
[5,5,63], # bottom-right corner of last map
[3,4,5]]) # in the middle of 5th map
excitation_map = visualize_pooling (sess, images, pool2, neurons,
half_receptive_field=6,
accum_padding=0,
stride=4,
dst_height=96,
num_images=FLAGS.num_examples)
else:
raise Exception ('add your own layers and parameters')
excitation_map = cv2.cvtColor(excitation_map, cv2.COLOR_RGB2BGR)
cv2.imshow('excitations', excitation_map)
cv2.waitKey(-1)
def main(argv=None): # pylint: disable=unused-argument
logging.basicConfig (level=logging.INFO)
cifar10.maybe_download_and_extract()
visualize_excitations()
if __name__ == '__main__':
tf.app.run()
def put_kernels_on_grid (kernel, grid_Y, grid_X, pad=1):
'''Visualize conv. features as an image (mostly for the 1st layer).
Place kernel into a grid, with some paddings between adjacent filters.
Args:
kernel: tensor of shape [Y, X, NumChannels, NumKernels]
(grid_Y, grid_X): shape of the grid. Require: NumKernels == grid_Y * grid_X
User is responsible of how to break into two multiples.
pad: number of black pixels around each filter (between them)
Return:
Tensor of shape [(Y+pad)*grid_Y, (X+pad)*grid_X, NumChannels, 1].
'''
# pad X and Y
x1 = tf.pad(kernel, tf.constant( [[pad,0],[pad,0],[0,0],[0,0]] ))
# X and Y dimensions, w.r.t. padding
Y = kernel.get_shape()[0] + pad
X = kernel.get_shape()[1] + pad
# put NumKernels to the 1st dimension
x2 = tf.transpose(x1, (3, 0, 1, 2))
# organize grid on Y axis
x3 = tf.reshape(x2, tf.pack([grid_X, Y * grid_Y, X, 3]))
# switch X and Y axes
x4 = tf.transpose(x3, (0, 2, 1, 3))
# organize grid on X axis
x5 = tf.reshape(x4, tf.pack([1, X * grid_X, Y * grid_Y, 3]))
# back to normal order (not combining with the next step for clarity)
x6 = tf.transpose(x5, (2, 1, 3, 0))
# to tf.image_summary order [batch_size, height, width, channels],
# where in this case batch_size == 1
x7 = tf.transpose(x6, (3, 0, 1, 2))
# scale to [0, 1]
x_min = tf.reduce_min(x7)
x_max = tf.reduce_max(x7)
x8 = (x7 - x_min) / (x_max - x_min)
return x8
#
# ... and somewhere inside "def train():" after calling "inference()"
#
# Visualize conv1 features
with tf.variable_scope('conv1') as scope_conv:
tf.get_variable_scope().reuse_variables()
weights = tf.get_variable('weights')
grid_x = grid_y = 8 # to get a square grid for 64 conv1 features
grid = put_kernels_on_grid (weights, (grid_y, grid_x))
tf.image_summary('conv1/features', grid, max_images=1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment