Last active
August 31, 2016 11:48
-
-
Save vfdev-5/741347673fb07e5a1a30990f5bb92505 to your computer and use it in GitHub Desktop.
Exchanges
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import cv2 | |
from validation import _blackboxed_image_iterator | |
image = 175.0 * np.ones((110, 105, 3), dtype=np.uint8) | |
for image_copy, i, j in _blackboxed_image_iterator(image, 100): | |
print i, j | |
cv2.imshow("Image", image_copy) | |
cv2.waitKey(50) | |
cv2.waitKey(0) | |
cv2.destroyAllWindows() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Methods to | |
# | |
# Python | |
from os.path import exists | |
import logging | |
# Numpy | |
import numpy as np | |
# Opencv | |
import cv2 | |
# Caffe | |
import caffe | |
def _blackboxed_image_iterator(image, size): | |
""" | |
Yields image with a black box zone inside and box position indices (x, y), also progress in percent | |
Black boxes are inserted on a copy of the original and the covering is without intersections or overlapping as in a tiling procedure. | |
For example, in 1D blackboxed output images are : | |
- Original image : [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] | |
- black box size = 3 | |
-> [0,0,0,4,5,6,7,8,9,10,11,12,13,14,15] | |
-> [1,2,3,0,0,0,7,8,9,10,11,12,13,14,15] | |
-> [1,2,3,4,5,6,0,0,0,10,11,12,13,14,15] | |
-> [1,2,3,4,5,6,7,8,9, 0, 0, 0,13,14,15] | |
-> [1,2,3,4,5,6,7,8,9,10,11,12, 0, 0, 0] | |
""" | |
total = np.ceil(image.shape[1]*1.0/size) * np.ceil(image.shape[0] * 1.0/size) | |
progress = 0 | |
for i in range(0, image.shape[1], size): | |
sx = size if i + size < image.shape[1] else i + size - image.shape[1] | |
for j in range(0, image.shape[0], size): | |
sy = size if j + size < image.shape[0] else j + size - image.shape[0] | |
image_copy = image.copy() | |
image_copy[j:j+sy, i:i+sx, :] = 0 | |
yield image_copy, i, j, progress * 1.0 / total | |
progress += 1 | |
def _blackboxed_image_iterator2(image, size): | |
""" | |
Yields image with a black box zone inside and box position indices (x, y), also progress in percent | |
Black boxes are inserted on a copy of the original at all pixels locations | |
For example, in 1D blackboxed output images are : | |
- Original image : [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] | |
- black box size = 3 | |
-> [0,0,0,4,5,6,7,8,9,10,11,12,13,14,15] | |
-> [1,0,0,0,5,6,7,8,9,10,11,12,13,14,15] | |
-> [1,2,0,0,0,6,7,8,9,10,11,12,13,14,15] | |
-> [1,2,3,0,0,0,7,8,9,10,11,12,13,14,15] | |
-> [1,2,3,4,0,0,0,8,9,10,11,12,13,14,15] | |
-> [1,2,3,4,5,0,0,0,9,10,11,12,13,14,15] | |
-> [1,2,3,4,5,6,0,0,0,10,11,12,13,14,15] | |
-> ... | |
-> [1,2,3,4,5,6,7,8,9,10,11, 0, 0, 0,15] | |
-> [1,2,3,4,5,6,7,8,9,10,11,12, 0, 0, 0] | |
""" | |
total = (image.shape[1] - size) * (image.shape[0] - size) | |
progress = 0 | |
for i in range(image.shape[1] - size): | |
for j in range(image.shape[0] - size): | |
image_copy = image.copy() | |
image_copy[j:j+size, i:i+size, :] = 0 | |
yield image_copy, i, j, progress * 1.0 / total | |
progress += 1 | |
def _compute_distance(r1, r2): | |
""" | |
Method to compute a scalar from two results of forward pass | |
:param r1: an ndarray of shape (K, ) | |
:param r2: an ndarray of shape (K, ) | |
:return: a scalar value computed from the first key. | |
""" | |
assert r1.shape == r2.shape, "Input arrays should have same shapes: {} != {}".format(r1.shape, r2.shape) | |
# Compare first 5 largest probabilities: | |
v = r1 - r2 | |
# max distance value is sqrt(2), min distance value is 0 | |
return np.sqrt(np.dot(v, v)) / np.sqrt(2) | |
class NetValidation(object): | |
""" | |
:class: NetValidation is a structure to compute forward passes | |
Usage: | |
nv = NetValidation('/path/to/caffe/model.prototxt', '/path/to/caffe/weights.caffemodel') | |
nv.mean_image = mean_image # mean_image.shape = (height, width, channels) | |
# nv.verbose = True | |
output1 = nv.forward_pass(image) # image.shape = (height, width, channels) | |
output2 = nv.batch_forward_pass(images) # images.shape = (batch_size, height, width, channels) | |
# output1, output2 is a dict, e.g. {'prob': [[...], ...]} | |
# resolution -> bbox_size = (1-resolution)*(image.size - 1) + 1 | |
output, heatmap = nv.tiled_heatmap(resolution=0.95, image=image, results_distance=_compute_distance, batch_size=5) | |
""" | |
def __init__(self, model_path, weights_path): | |
self.net = caffe.Net(model_path, weights_path, caffe.TEST) | |
self._mean_image = None | |
self._transformer = caffe.io.Transformer({'data': net.blobs['data'].data.shape}) | |
self._transformer.set_transpose('data', (2, 0, 1)) # move image channels to outermost dimension | |
self._transformer.set_channel_swap('data', (2, 1, 0)) # swap channels from RGB to BGR | |
self.verbose = False | |
@property | |
def mean_image(self): | |
""" Mean image used to transform input image during forward pass """ | |
return self._mean_image | |
@mean_image.setter | |
def mean_image(self, mean_image): | |
""" | |
Set the mean image. | |
:param: mean_image is a ndarray with shape (height, width, channels) | |
""" | |
assert isinstance(mean_image, np.ndarray) and len(mean_image.shape) == 3, \ | |
"Parameter mean_image is a ndarray with shape (height, width, channels)" | |
self._mean_image = mean_image | |
if self.net.blobs['data'].data.shape[1:] != self._mean_image.shape: | |
# resize | |
ms = self.net.blobs['data'].data.shape[2:] # (B, C, H, W) -> (H, W) | |
self._mean_image = cv2.resize(self._mean_image, (ms[1], ms[0])) | |
# (H, W, C) -> (C, H, W) | |
self._mean_image = self._mean_image.transpose((2, 0, 1)) | |
self._transformer.set_mean('data', self._mean_image) # subtract the dataset-mean value in each channel | |
def forward_pass(self, image): | |
""" | |
Compute forward pass using the model from model_path with weights from weights_path | |
:param: image is a ndarray with shape (height, width, channels) | |
:param: mean_image is a ndarray with shape (height, width, channels) | |
:return: net output | |
""" | |
assert isinstance(image, np.ndarray) and len(image.shape) == 3, \ | |
"Parameter image is a ndarray with shape (height, width, channels)" | |
transformed_image = self._transformer.preprocess('data', image) | |
ms = self.net.blobs['data'].data.shape | |
self.net.blobs['data'].reshape(*((1,) + ms[1:])) | |
self.net.blobs['data'].data[...] = transformed_image | |
# perform classification | |
output = self.net.forward() | |
return output | |
def batch_forward_pass(self, images): | |
""" | |
Compute forward pass using the model from model_path with weights from weights_path | |
:param: images is a ndarray with shape (batch_size, height, width, channels) | |
:return: net output | |
""" | |
assert isinstance(images, np.ndarray) and len(images.shape) == 4, \ | |
"Parameter image is a ndarray with shape (height, width, channels)" | |
ms = self.net.blobs['data'].data.shape | |
k = images.shape[0] | |
transformed_images = np.empty((k,) + ms[1:]) | |
for i in range(images.shape[0]): | |
transformed_images[i, ...] = self._transformer.preprocess('data', images[i, ...]) | |
self.net.blobs['data'].reshape(*((k,) + ms[1:])) | |
self.net.blobs['data'].data[...] = transformed_images | |
# perform classification | |
output = self.net.forward() | |
return output | |
def tiled_heatmap(self, resolution, image, results_distance=_compute_distance, batch_size=5): | |
""" | |
Compute a 'heatmap' of the network on the image using black box mask method. | |
- Forward pass is computed on the origin image and output is stored | |
- A black box zeroes a part of the image and forward pass is computed on the modified image | |
-- distance is computed between two results | |
- Loop on the positions of the black box and compute the distance | |
:param: resolution is a variable between 0 and 1 to compute black box size. | |
bbox_size = (1-resolution)*(image.size - 1) + 1 | |
:param: image is a ndarray with shape (height, width, channels) | |
:param: results_distance is a function to compute a scalar value from two results of net forward passes | |
The signature is func(dict, dict) -> real lied between 0 and 1 | |
:return: forward pass result dictionary, heatmap | |
""" | |
res0 = self.forward_pass(image) | |
assert len(res0) > 0, "Net forward pass output is empty" | |
# Take the first key from the net output | |
key = res0.keys()[0] | |
heatmap = np.zeros(image.shape[:-1]) | |
image_size = min(image.shape[0], image.shape[1]) | |
bbox_size = int((1-resolution)*(image_size - 1) + 1) | |
images = np.empty((batch_size, ) + image.shape) | |
count = 0 | |
x_vec = [] | |
y_vec = [] | |
def _compute(): | |
results = self.batch_forward_pass(images) | |
assert len(results) > 0, "Net forward pass output is empty" | |
assert key in results, "The first key is not found in both inputs" | |
v0 = res0[key] | |
v1 = results[key] | |
assert v0.shape[0] == 1 and v1.shape[0] == batch_size, "..." | |
for i in range(min(v1.shape[0], count)): | |
distance = results_distance(v0[0], v1[i, :]) | |
heatmap[y_vec[i]:y_vec[i]+bbox_size, x_vec[i]:x_vec[i]+bbox_size] = distance | |
if verbose: | |
cv2.imshow("Heatmap interactive computation ...", (255.0 * heatmap).astype(np.uint8)) | |
cv2.waitKey(20) | |
for image_copy, x, y, progress_percent in _blackboxed_image_iterator(image, bbox_size): | |
logging.log(logging.INFO, "Net heatmap computation : [%s / 100]" % progress_percent) | |
if count == batch_size: | |
_compute() | |
images = np.empty((batch_size, ) + image.shape) | |
count = 0 | |
x_vec = [] | |
y_vec = [] | |
images[count, ...] = image_copy | |
x_vec.append(x) | |
y_vec.append(y) | |
count += 1 | |
if count > 0: | |
_compute() | |
if verbose: | |
cv2.destroyAllWindows() | |
return res0, heatmap | |
def full_heatmap(self, bbox_size, image, results_distance=_compute_distance, batch_size=5): | |
""" | |
Compute a 'heatmap' of the network on the image using black box mask method. | |
- Forward pass is computed on the origin image and output is stored | |
- A black box zeroes a part of the image and forward pass is computed on the modified image | |
-- distance is computed between two results | |
- Loop on the positions of the black box and compute the distance | |
:param: bbox_size is the black box size. | |
:param: image is a ndarray with shape (height, width, channels) | |
:param: results_distance is a function to compute a scalar value from two results of net forward passes | |
The signature is func(dict, dict) -> real lied between 0 and 1 | |
:return: forward pass result dictionary, heatmap | |
This method computes 'image.width * image.height' of forward passes, thus this computation can take a lot of time ! | |
""" | |
assert False, "Not implemented" | |
# res0 = self.forward_pass(image) | |
# assert len(res0) > 0, "Net forward pass output is empty" | |
# # Take the first key from the net output | |
# key = res0.keys()[0] | |
# heatmap = np.zeros(image.shape[:-1]) | |
# images = np.empty((batch_size, ) + image.shape) | |
# count = 0 | |
# x_vec = [] | |
# y_vec = [] | |
# def _compute(): | |
# results = self.batch_forward_pass(images) | |
# assert len(results) > 0, "Net forward pass output is empty" | |
# assert key in results, "The first key is not found in both inputs" | |
# v0 = res0[key] | |
# v1 = results[key] | |
# assert v0.shape[0] == 1 and v1.shape[0] == batch_size, "..." | |
# for i in range(min(v1.shape[0], count)): | |
# distance = results_distance(v0[0], v1[i, :]) | |
# heatmap[y_vec[i]:y_vec[i]+bbox_size, x_vec[i]:x_vec[i]+bbox_size] = distance | |
# if verbose: | |
# cv2.imshow("Heatmap interactive computation ...", (255.0 * heatmap).astype(np.uint8)) | |
# cv2.waitKey(20) | |
# for image_copy, x, y, progress_percent in _blackboxed_image_iterator(image, bbox_size): | |
# logging.log(logging.INFO, "Net heatmap computation : [%s / 100]" % progress_percent) | |
# if count == batch_size: | |
# _compute() | |
# images = np.empty((batch_size, ) + image.shape) | |
# count = 0 | |
# x_vec = [] | |
# y_vec = [] | |
# images[count, ...] = image_copy | |
# x_vec.append(x) | |
# y_vec.append(y) | |
# count += 1 | |
# if count > 0: | |
# _compute() | |
# if verbose: | |
# cv2.destroyAllWindows() | |
# return res0, heatmap | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment