Last active
June 7, 2018 14:56
-
-
Save manashmandal/b687b6b6bcefe832feefef9fb5780b94 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from tqdm import tqdm | |
import cv2 | |
import numpy as np | |
from pprint import pprint | |
from os.path import join | |
import glob | |
from sklearn.utils import shuffle | |
from keras.utils import to_categorical | |
SEED = 20 | |
img_shape = (50, 50) | |
random_state = np.random.RandomState(SEED) | |
choice = random_state.choice | |
APPLE_DIRECTORY = "./apple/" | |
BALL_DIRECTORY = "./ball/" | |
BANANA_DIRECTORY = "./banana/" | |
LABELS_DICTIONARY = { | |
'apple' : 0, | |
'ball' : 1, | |
'banana' : 2 | |
} | |
def get_image_count(directory, index=0): | |
if index >= len(os.listdir(directory)): | |
raise ValueError("Index must be less than number of subdirectory in the top level directory") | |
if index == -1: | |
total_count = 0 | |
# Get all image counts | |
for subdir in os.listdir(directory): | |
total_count += len( | |
os.listdir( | |
join(directory, subdir) | |
) | |
) | |
return total_count | |
return len( | |
os.listdir(join(directory ,os.listdir(directory)[index])) | |
) | |
def get_image_from_directory_by_index( | |
directory, index, index1, index2, shape=(224, 224)): | |
images = [] | |
image_count = index2 - index1 | |
if image_count > get_image_count(directory, index=index): | |
image_count = get_image_count(directory, index=index) | |
subdirs = os.listdir(directory) | |
if index >= len(subdirs): | |
raise ValueError("Index must be less than number of subdirectories in the top level directory") | |
imagepaths = glob.glob( join( join( directory, subdirs[index] ), '*' ) )[index1:index2] | |
for imgpath in imagepaths: | |
img = cv2.imread(imgpath, cv2.COLOR_BGR2RGB) | |
img = cv2.resize(img, img_shape, cv2.INTER_CUBIC) | |
img = img / 255.0 | |
images.append(img) | |
return np.array(images) | |
def generate_label(label, count): | |
return np.ones(count) * LABELS_DICTIONARY[label] | |
def get_image_from_directory(directory, image_count, shape=(224, 224)): | |
images = [] | |
current_img_count = 0 | |
# Get total image count | |
total_image_count= get_image_count(directory, -1) | |
# Image count cant be greater than | |
if image_count > total_image_count: | |
return False, total_image_count | |
# while current_img_count != image_count: | |
for subdir in os.listdir(directory): | |
_subdir = glob.glob(join(join(directory, subdir), '*')) | |
for imagepath in _subdir: | |
img = cv2.imread(imagepath, cv2.COLOR_BGR2RGB) | |
img = cv2.resize(img, shape, interpolation=cv2.INTER_CUBIC) | |
img = img / 255.0 | |
images.append(img) | |
current_img_count += 1 | |
if current_img_count == image_count: | |
return np.array(images) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from sklearn.utils import shuffle | |
from numpy.random import choice | |
from imageutils import get_image_from_directory | |
Q_IMG_INDEX = 0 | |
P_IMG_INDEX = 1 | |
N_IMG_INDEX = 2 | |
Q_INDEX = 3 | |
P_INDEX = 4 | |
N_INDEX = 5 | |
NUM_SAMPLES = 1000 | |
BATCH_SIZE = 64 | |
# Example | |
apple_indices = shuffle(list(range(NUM_SAMPLES))) | |
ball_indices = shuffle(list(range(NUM_SAMPLES))) | |
banana_indices = shuffle(list(range(NUM_SAMPLES))) | |
cap_indices = shuffle(list(range(NUM_SAMPLES))) | |
camera_indices = shuffle(list(range(NUM_SAMPLES))) | |
scissors_indices = shuffle(list(range(NUM_SAMPLES))) | |
tomatoes_indices = shuffle(list(range(NUM_SAMPLES))) | |
cell_phone_indices = shuffle(list(range(NUM_SAMPLES))) | |
print("LOADING APPLE IMAGES") | |
apple = get_image_from_directory('./apple', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING BALL IMAGES") | |
ball = get_image_from_directory('./ball', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING BANANA IMAGES") | |
banana = get_image_from_directory('./banana', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING CAP IMAGES") | |
cap = get_image_from_directory('./cap', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING CAMERA IMAGES") | |
camera = get_image_from_directory('./camera', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING SCISSORS IMAGES") | |
scissors = get_image_from_directory('./scissors', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING TOMATO IMAGES") | |
tomatoes = get_image_from_directory('./tomato', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING CELL PHONE IMAGES") | |
cell_phone = get_image_from_directory('./cell_phone', NUM_SAMPLES, shape=(50, 50)) | |
classes = ['apple', 'ball', 'banana', 'cap', 'tomato', 'scissors', 'cell_phone', 'camera'] | |
data_dict_indices = { | |
'apple' : apple_indices, | |
'ball' : ball_indices, | |
'banana' : banana_indices, | |
'cap' : cap_indices, | |
'camera' : camera_indices, | |
'scissors' : scissors_indices, | |
'tomato' : tomatoes_indices, | |
'cell_phone' : cell_phone_indices | |
} | |
data_dict = { | |
'apple' : apple, | |
'ball' : ball, | |
'banana' : banana, | |
'cap' : cap, | |
'tomato' : tomatoes, | |
'scissors' : scissors, | |
'camera' : camera, | |
'cell_phone' : cell_phone | |
} | |
CLASS_INDEX = { | |
k : v for v, k in enumerate(list(data_dict_indices.keys())) | |
} | |
INDEX2CLASS = { | |
v : k for k, v in zip(CLASS_INDEX.keys(), CLASS_INDEX.values()) | |
} | |
def onehot(labels, max=8): | |
return np.eye(max)[np.asarray(labels, dtype=np.int32)] | |
# Make sure indices are shuffled before fed into this function | |
def generate_one_sample(data_class): | |
classes = list(data_class.keys()) | |
num_classes = list(range(len(data_class.keys()))) | |
# Get which one will be positive and which one will be negative class | |
query_positive_class = choice(num_classes, 1)[0] | |
negative_class = choice(list(set(num_classes) - set([query_positive_class.tolist()])), 1)[0] | |
query_positive_class_label = classes[query_positive_class] | |
negative_class_label = classes[negative_class] | |
query_positive_indices = choice( data_class[query_positive_class_label], 2 ).tolist() | |
negative_index = choice(data_class[negative_class_label], 1).tolist()[0] | |
return (query_positive_indices[0], query_positive_indices[1], negative_index), (query_positive_class_label, query_positive_class_label, negative_class_label) | |
def generate_triplet_batch_numpy(data_dict_indices, batch_size=32): | |
train_labels = [] | |
for i in range(batch_size): | |
train, labels = generate_one_sample(data_dict_indices) | |
train = list(train) | |
label = [ CLASS_INDEX[label] for label in labels ] | |
train_label = train + label | |
train_labels.append(train_label) | |
return np.array(train_labels) | |
def triplet(data_dict, train_index): | |
q_placeholder = np.zeros((len(train_index), 50, 50, 3)) | |
p_placeholder = np.zeros((len(train_index), 50, 50, 3)) | |
n_placeholder = np.zeros((len(train_index), 50, 50, 3)) | |
for c in CLASS_INDEX.keys(): | |
q_placeholder_loc = np.where(train_index[:, Q_INDEX] == CLASS_INDEX[c])[0] | |
q_placeholder[ | |
q_placeholder_loc | |
] = data_dict[c][ train_index[:, Q_IMG_INDEX] [q_placeholder_loc]] | |
p_placeholder_loc = np.where(train_index[:, P_INDEX] == CLASS_INDEX[c])[0] | |
p_placeholder[ | |
p_placeholder_loc | |
] = data_dict[c][ train_index[:, P_IMG_INDEX] [p_placeholder_loc]] | |
n_placeholder_loc = np.where(train_index[:, N_INDEX] == CLASS_INDEX[c])[0] | |
n_placeholder[ | |
n_placeholder_loc | |
] = data_dict[c][ train_index[:, N_IMG_INDEX] [n_placeholder_loc]] | |
return (q_placeholder, p_placeholder, n_placeholder) | |
def triplet_generator(data_dict, train_indices, batch_size=64, _set='TRAIN'): | |
query, positive, negative = triplet(data_dict, train_indices) | |
query_index = train_indices[:, Q_INDEX] | |
positive_index = train_indices[:, P_INDEX] | |
negative_index = train_indices[:, N_INDEX] | |
n_samples = len(query) | |
while True: | |
for i in range(0, n_samples, batch_size ): | |
upper_limit = min(i + batch_size, n_samples) | |
# print("Batch {} : {}".format(_set, i // batch_size)) | |
yield ( | |
query[ i: upper_limit], | |
positive [i : upper_limit], | |
negative [i : upper_limit], | |
onehot(query_index[i: upper_limit]), | |
onehot(positive_index[i : upper_limit]), | |
onehot(negative_index[i : upper_limit]) | |
) | |
# train_indices = np.load('train_indices.npy') | |
# train_gen = triplet_generator(data_dict, train_indices) | |
# test_indices = np.load('test_indices.npy') | |
# test_gen = triplet_generator(data_dict, test_indices, _set='TEST') | |
# train_indices = np.load('train_2500.npy') | |
# train_gen = triplet_generator(data_dict, train_indices) | |
# test_indices = np.load('test_2500.npy') | |
# test_gen = triplet_generator(data_dict, test_indices, _set='TEST') | |
train_indices = np.asarray( np.load('train_8_class_6k_samples.npy'), dtype=np.int32) | |
train_gen = triplet_generator(data_dict, train_indices) | |
test_indices = np.asarray(np.load('test_8_class_4k_samples.npy'),dtype=np.int32) | |
test_gen = triplet_generator(data_dict, test_indices, _set='TEST') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tensorflow as tf | |
import numpy as np | |
from tqdm import tqdm | |
from datetime import datetime | |
import matplotlib.pyplot as plt | |
from imageutils import get_image_from_directory | |
PREDICTION = True | |
if PREDICTION == True: | |
NUM_SAMPLES = 1000 | |
data_dict = { | |
'apple' : 'apple', | |
'ball' : 'ball', | |
'banana' : 'banana', | |
'cap' : 'cap', | |
'tomato' : 'tomatoes', | |
'scissors' : 'scissors', | |
'camera' : 'camera', | |
'cell_phone' : 'cell_phone' | |
} | |
CLASS_INDEX = { | |
k : v for v, k in enumerate(list(data_dict.keys())) | |
} | |
INDEX2CLASS = { | |
v : k for k, v in zip(CLASS_INDEX.keys(), CLASS_INDEX.values()) | |
} | |
print("LOADING APPLE IMAGES") | |
apple = get_image_from_directory('./apple', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING BALL IMAGES") | |
ball = get_image_from_directory('./ball', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING BANANA IMAGES") | |
banana = get_image_from_directory('./banana', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING CAP IMAGES") | |
cap = get_image_from_directory('./cap', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING CAMERA IMAGES") | |
camera = get_image_from_directory('./camera', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING SCISSORS IMAGES") | |
scissors = get_image_from_directory('./scissors', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING TOMATO IMAGES") | |
tomatoes = get_image_from_directory('./tomato', NUM_SAMPLES, shape=(50, 50)) | |
print("LOADING CELL PHONE IMAGES") | |
cell_phone = get_image_from_directory('./cell_phone', NUM_SAMPLES, shape=(50, 50)) | |
to_extract_feature = np.vstack(( | |
apple, ball, banana, cap, camera, scissors, tomatoes, cell_phone | |
)) | |
to_extract_feature_labels = np.vstack(( | |
np.ones(NUM_SAMPLES) * CLASS_INDEX['apple'], | |
np.ones(NUM_SAMPLES) * CLASS_INDEX['ball'], | |
np.ones(NUM_SAMPLES) * CLASS_INDEX['banana'], | |
np.ones(NUM_SAMPLES) * CLASS_INDEX['cap'], | |
np.ones(NUM_SAMPLES) * CLASS_INDEX['camera'], | |
np.ones(NUM_SAMPLES) * CLASS_INDEX['scissors'], | |
np.ones(NUM_SAMPLES) * CLASS_INDEX['tomato'], | |
np.ones(NUM_SAMPLES) * CLASS_INDEX['cell_phone'] | |
)) | |
else: | |
from tg3 import train_gen, test_gen, BATCH_SIZE | |
HEIGHT = 50 | |
WIDTH = 50 | |
CHANNEL = 3 | |
CLASSES = 8 | |
IMAGE_INPUT = tf.placeholder(tf.float32, [None, HEIGHT, WIDTH, CHANNEL]) | |
X1 = tf.placeholder(tf.float32, [None, HEIGHT, WIDTH, CHANNEL]) | |
y1 = tf.placeholder(tf.float32, [None, CLASSES]) | |
X2 = tf.placeholder(tf.float32, [None, HEIGHT, WIDTH, CHANNEL]) | |
y2 = tf.placeholder(tf.float32, [None, CLASSES]) | |
X3 = tf.placeholder(tf.float32, [None, HEIGHT, WIDTH, CHANNEL]) | |
y3 = tf.placeholder(tf.float32, [None, CLASSES]) | |
# Conv 1_1 | |
kernel11_shape = [3, 3, 3, 64] | |
kernel11_name = 'kernel11' | |
conv11_shape = [1, 1, 1, 1] | |
biases11_shape = [64] | |
biases11_name = 'biases11' | |
conv11_padding = 'SAME' | |
# Conv 1_2 | |
kernel12_shape = [3, 3, 64, 64] | |
kernel12_name = 'kernel12' | |
conv12_shape = [1, 1, 1, 1] | |
conv12_padding = 'SAME' | |
biases12_shape = [64] | |
biases12_name = 'biases12' | |
# Pool 1 | |
pool1_ksize = [1, 2, 2, 1] | |
pool1_strides = [1, 2, 2, 1] | |
pool1_padding = 'SAME' | |
pool1_name = 'pool1' | |
# Conv 2_1 | |
kernel21_name = 'kernel21' | |
kernel21_shape = [3, 3, 64, 128] | |
conv21_shape = [1, 1, 1, 1] | |
conv21_padding = 'SAME' | |
biases21_shape = [128] | |
biases21_name = 'biases21' | |
# Conv 2_2 | |
kernel22_name = 'kernel22' | |
kernel22_shape = [3, 3, 128, 128] | |
conv22_shape = [1, 1, 1, 1] | |
conv22_padding = 'SAME' | |
biases22_shape = [128] | |
biases22_name = 'biases22' | |
# pool 2 | |
pool2_ksize = [1, 2, 2, 1] | |
pool2_strides = [1, 2, 2, 1] | |
pool2_padding = 'SAME' | |
pool2_name = 'pool2' | |
# Conv 3_1 | |
kernel31_name = 'kernel31' | |
kernel31_shape = [3, 3, 128, 256] | |
conv31_shape = [1, 1, 1, 1] | |
conv31_padding = 'SAME' | |
biases31_shape = [256] | |
biases31_name = 'biases31' | |
# Conv 3_2 | |
kernel32_name = 'kernel32' | |
kernel32_shape = [3, 3, 256, 256] | |
conv32_shape = [1, 1, 1, 1] | |
conv32_padding = 'SAME' | |
biases32_shape = [256] | |
biases32_name = 'biases32' | |
# Conv 3_3 | |
kernel33_name = 'kernel33' | |
kernel33_shape = [3, 3, 256, 256] | |
conv33_shape = [1, 1, 1, 1] | |
conv33_padding = 'SAME' | |
biases33_shape = [256] | |
biases33_name = 'biases33' | |
# Pool 3 | |
pool3_ksize = [1, 2, 2, 1] | |
pool3_strides = [1, 2, 2, 1] | |
pool3_padding = 'SAME' | |
pool3_name = 'pool3' | |
# Conv 4_1 | |
kernel41_name = 'kernel41' | |
kernel41_shape = [3, 3, 256, 512] | |
conv41_shape = [1, 1, 1, 1] | |
conv41_padding = 'SAME' | |
biases41_shape = [512] | |
biases41_name = 'biases41' | |
# Conv 4_2 | |
kernel42_name = 'kernel42' | |
kernel42_shape = [3, 3, 512, 512] | |
conv42_shape = [1, 1, 1, 1] | |
conv42_padding = 'SAME' | |
biases42_shape = [512] | |
biases42_name = 'biases42' | |
# Conv 4_3 | |
kernel43_name = 'kernel43' | |
kernel43_shape = [3, 3, 512, 512] | |
conv43_shape = [1, 1, 1, 1] | |
conv43_padding = 'SAME' | |
biases43_shape = [512] | |
biases43_name = 'biases43' | |
# Pool 4 | |
pool4_ksize = [1, 2, 2, 1] | |
pool4_strides = [1, 2, 2, 1] | |
pool4_padding = 'SAME' | |
pool4_name = 'pool4' | |
# Conv 5_1 | |
kernel51_name = 'kernel51' | |
kernel51_shape = [3, 3, 512, 512] | |
conv51_shape = [1, 1, 1, 1] | |
conv51_padding = 'SAME' | |
biases51_shape = [512] | |
biases51_name = 'biases51' | |
# Conv 5_2 | |
kernel52_name = 'kernel52' | |
kernel52_shape = [3, 3, 512, 512] | |
conv52_shape = [1, 1, 1, 1] | |
conv52_padding = 'SAME' | |
biases52_shape = [512] | |
biases52_name = 'biases52' | |
# Conv 5_3 | |
kernel53_name = 'kernel53' | |
kernel53_shape = [3, 3, 512, 512] | |
conv53_shape = [1, 1, 1, 1] | |
conv53_padding = 'SAME' | |
biases53_shape = [512] | |
biases53_name = 'biases53' | |
# Pool 5 | |
pool5_ksize = [1, 2, 2, 1] | |
pool5_strides = [1, 2, 2, 1] | |
pool5_padding = 'SAME' | |
pool5_name = 'pool5' | |
# fully connected 1 | |
fc1w_name = 'fc1' | |
fc1w_shape = [2048, 4096] | |
fc1b_name = 'fb1' | |
fc1b_shape = [4096] | |
# Fully connected 2 | |
fc2w_name = 'fc2' | |
fc2b_name = 'fb2' | |
fc2w_shape = [4096, 4096] | |
fc2b_shape = [4096] | |
# fully connected 3 | |
fc3w_name = 'fc3' | |
fc3b_name = 'fb3' | |
fc3w_shape = [4096, CLASSES] | |
fc3b_shape = [CLASSES] | |
triplet_variables = None | |
classification_variables = None | |
with tf.variable_scope('vgg') as scope: | |
kernel11 = tf.get_variable(name=kernel11_name, shape=kernel11_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases11 = tf.get_variable(name=biases11_name, shape=biases11_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel12 = tf.get_variable(name=kernel12_name, shape=kernel12_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases12 = tf.get_variable(name=biases12_name, shape=biases12_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel21 = tf.get_variable(name=kernel21_name, shape=kernel21_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases21 = tf.get_variable(name=biases21_name, shape=biases21_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel22 = tf.get_variable(name=kernel22_name, shape=kernel22_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases22 = tf.get_variable(name=biases22_name, shape=biases22_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel31 = tf.get_variable(name=kernel31_name, shape=kernel31_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases31 = tf.get_variable(name=biases31_name, shape=biases31_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel32 = tf.get_variable(name=kernel32_name, shape=kernel32_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases32 = tf.get_variable(name=biases32_name, shape=biases32_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel33 = tf.get_variable(name=kernel33_name, shape=kernel33_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases33 = tf.get_variable(name=biases33_name, shape=biases33_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel41 = tf.get_variable(name=kernel41_name, shape=kernel41_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases41 = tf.get_variable(name=biases41_name, shape=biases41_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel42 = tf.get_variable(name=kernel42_name, shape=kernel42_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases42 = tf.get_variable(name=biases42_name, shape=biases42_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel43 = tf.get_variable(name=kernel43_name, shape=kernel43_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases43 = tf.get_variable(name=biases43_name, shape=biases43_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel51 = tf.get_variable(name=kernel51_name, shape=kernel51_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases51 = tf.get_variable(name=biases51_name, shape=biases51_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel52 = tf.get_variable(name=kernel52_name, shape=kernel52_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases52 = tf.get_variable(name=biases52_name, shape=biases52_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
kernel53 = tf.get_variable(name=kernel53_name, shape=kernel53_shape, dtype=tf.float32, initializer=tf.truncated_normal_initializer(stddev=1e-1)) | |
biases53 = tf.get_variable(name=biases53_name, shape=biases53_shape, dtype=tf.float32, initializer=tf.zeros_initializer()) | |
fc1w = tf.get_variable(name=fc1w_name, shape=fc1w_shape, dtype=tf.float32, initializer=tf.glorot_normal_initializer()) | |
fc1b = tf.get_variable(name=fc1b_name, shape=fc1b_shape, dtype=tf.float32, initializer=tf.ones_initializer()) | |
fc2w = tf.get_variable(name=fc2w_name, shape=fc2w_shape, dtype=tf.float32, initializer=tf.glorot_normal_initializer()) | |
fc2b = tf.get_variable(name=fc2b_name, shape=fc2b_shape, dtype=tf.float32, initializer=tf.ones_initializer()) | |
fc3w = tf.get_variable(name=fc3w_name, shape=fc3w_shape, dtype=tf.float32, initializer=tf.glorot_normal_initializer()) | |
fc3b = tf.get_variable(name=fc3b_name, shape=fc3b_shape, dtype=tf.float32, initializer=tf.ones_initializer()) | |
triplet_variables = [ | |
kernel11, biases11, kernel12, biases12, kernel21, biases21, kernel22, biases22, kernel31, biases31, kernel32, biases32, kernel33, biases33, kernel41, biases41, kernel42, biases42, kernel43, biases43, kernel51, biases51, kernel52, biases52, kernel53, biases53, fc1w, fc1b, fc2w, fc2b | |
] | |
classification_variables = triplet_variables + [fc3w, fc3b] | |
def extract_feature(image): | |
# conv 11 | |
kernel11 = tf.get_variable(kernel11_name) | |
conv11 = tf.nn.conv2d(image, kernel11, conv11_shape, conv11_padding) | |
biases11 = tf.get_variable(biases11_name) | |
out11 = tf.nn.bias_add(conv11, biases11) | |
activation11 = tf.nn.relu(out11) | |
# Conv 12 | |
kernel12 = tf.get_variable(kernel12_name) | |
conv12 = tf.nn.conv2d(activation11, kernel12, conv12_shape, conv12_padding ) | |
biases12 = tf.get_variable(biases12_name) | |
out12 = tf.nn.bias_add(conv12, biases12) | |
activation12 = tf.nn.relu(out12) | |
pool1 = tf.nn.max_pool( activation12 , pool1_ksize, pool1_strides, pool1_padding, name=pool1_name ) | |
# Conv 21 | |
kernel21 = tf.get_variable(kernel21_name) | |
conv21 = tf.nn.conv2d(pool1, kernel21, conv21_shape, conv21_padding) | |
biases21 = tf.get_variable(biases21_name) | |
out21 = tf.nn.bias_add(conv21, biases21) | |
activation21 = tf.nn.relu(out21) | |
# Conv 22 | |
kernel22 = tf.get_variable(kernel22_name) | |
conv22 = tf.nn.conv2d(activation21, kernel22, conv22_shape, conv22_padding) | |
biases22 = tf.get_variable(biases22_name) | |
out22 = tf.nn.bias_add(conv22, biases22) | |
activation22 = tf.nn.relu(out22) | |
pool2 = tf.nn.max_pool( activation22, pool2_ksize, pool2_strides, pool2_padding, name=pool2_name ) | |
# Conv 31 | |
kernel31 = tf.get_variable(kernel31_name) | |
conv31 = tf.nn.conv2d(pool2, kernel31, conv31_shape, conv31_padding) | |
biases31 = tf.get_variable(biases31_name) | |
out31 = tf.nn.bias_add(conv31, biases31) | |
activation31 = tf.nn.relu(out31) | |
# Conv 32 | |
kernel32 = tf.get_variable(kernel32_name) | |
conv32 = tf.nn.conv2d(activation31, kernel32, conv32_shape, conv32_padding) | |
biases32 = tf.get_variable(biases32_name) | |
out32 = tf.nn.bias_add(conv32, biases32) | |
activation32 = tf.nn.relu(out32) | |
# Conv 33 | |
kernel33 = tf.get_variable(kernel33_name) | |
conv33 = tf.nn.conv2d(activation32, kernel33, conv33_shape, conv33_padding) | |
biases33 = tf.get_variable(biases33_name) | |
out33 = tf.nn.bias_add(conv33, biases33) | |
activation33 = tf.nn.relu(out33) | |
pool3 = tf.nn.max_pool(activation33, pool3_ksize, pool3_strides, pool3_padding, name=pool3_name) | |
# Conv 41 | |
kernel41 = tf.get_variable(kernel41_name) | |
conv41 = tf.nn.conv2d(pool3, kernel41, conv41_shape, conv41_padding) | |
biases41 = tf.get_variable(biases41_name) | |
out41 = tf.nn.bias_add(conv41, biases41) | |
activation41 = tf.nn.relu(out41) | |
# Conv 42 | |
kernel42 = tf.get_variable(kernel42_name) | |
conv42 = tf.nn.conv2d(activation41, kernel42, conv42_shape, conv42_padding) | |
biases42 = tf.get_variable(biases42_name) | |
out42 = tf.nn.bias_add(conv42, biases42) | |
activation42 = tf.nn.relu(out42) | |
# Conv 43 | |
kernel43 = tf.get_variable(kernel43_name) | |
conv43 = tf.nn.conv2d(activation42, kernel43, conv43_shape, conv43_padding) | |
biases43 = tf.get_variable(biases43_name) | |
out43 = tf.nn.bias_add(conv43, biases43) | |
activation43 = tf.nn.relu(out43) | |
pool4 = tf.nn.max_pool(activation43, pool4_ksize, pool4_strides, pool4_padding, name=pool4_name ) | |
# Conv 51 | |
kernel51 = tf.get_variable(kernel51_name) | |
conv51 = tf.nn.conv2d(pool4, kernel51, conv51_shape, conv51_padding) | |
biases51 = tf.get_variable(biases51_name) | |
out51 = tf.nn.bias_add(conv51, biases51) | |
activation51 = tf.nn.relu(out51) | |
# Conv 52 | |
kernel52 = tf.get_variable(kernel52_name) | |
conv52 = tf.nn.conv2d(activation51, kernel52, conv52_shape, conv52_padding) | |
biases52 = tf.get_variable(biases52_name) | |
out52 = tf.nn.bias_add(conv52, biases52) | |
activation52 = tf.nn.relu(out52) | |
# Conv 53 | |
kernel53 = tf.get_variable(kernel53_name) | |
conv53 = tf.nn.conv2d(activation52, kernel53, conv53_shape, conv53_padding) | |
biases53 = tf.get_variable(biases53_name) | |
out53 = tf.nn.bias_add(conv53, biases53) | |
activation53 = tf.nn.relu(out53) | |
pool5 = tf.nn.max_pool(activation53, pool5_ksize, pool5_strides, pool5_padding, name=pool5_name) | |
# FC1 | |
# print(pool5) | |
print(np.prod(pool5.get_shape()[1:])) | |
fc1w = tf.get_variable(fc1w_name) | |
fc1b = tf.get_variable(fc1b_name) | |
pool5_flat = tf.reshape(pool5, [-1, fc1w_shape[0] ]) | |
fc1l_out = tf.nn.bias_add( tf.matmul( pool5_flat, fc1w ), fc1b ) | |
fc1l_activation = tf.nn.relu(fc1l_out) | |
# FC2 | |
fc2w = tf.get_variable(fc2w_name) | |
fc2b = tf.get_variable(fc2b_name) | |
fc2l_out = tf.nn.bias_add( tf.matmul( fc1l_activation, fc2w ), fc2b ) | |
fc2l_activation = tf.nn.relu(fc2l_out) | |
# FC3 | |
fc3w = tf.get_variable(fc3w_name) | |
fc3b = tf.get_variable(fc3b_name) | |
fc3l_out = tf.nn.bias_add( tf.matmul( fc2l_activation, fc3w ), fc3b ) | |
#fc3l_activation = tf.nn.softmax( fc3l_out ) | |
return fc2l_activation, fc3l_out | |
def triplet_loss(anchor, positive, negative, alpha): | |
"""Calculate the triplet loss according to the FaceNet paper | |
Args: | |
anchor: the embeddings for the anchor images. | |
positive: the embeddings for the positive images. | |
negative: the embeddings for the negative images. | |
Returns: | |
the triplet loss according to the FaceNet paper as a float tensor. | |
""" | |
with tf.variable_scope('triplet_loss'): | |
pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, positive)), 1) | |
neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, negative)), 1) | |
basic_loss = tf.add(tf.subtract(pos_dist,neg_dist), alpha) | |
loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0), 0) | |
return loss | |
# Load weights | |
def load_weights(sess): | |
with tf.variable_scope('vgg', reuse=True): | |
print(len(classification_variables)) | |
weights = np.load('vgg16_weights.npz') | |
weight_keys = sorted(weights.keys()) | |
for i, k in enumerate(weight_keys[:-6]): | |
print("Loading {} to {}".format( i, k )) | |
sess.run( | |
classification_variables[i].assign( weights[k] ) | |
) | |
with tf.variable_scope('vgg', reuse=tf.AUTO_REUSE): | |
# Feature extraction operation | |
image_features = extract_feature(IMAGE_INPUT) | |
query_features, query_output = extract_feature(X1) | |
positive_features, positive_output = extract_feature(X2) | |
negative_features, negative_output = extract_feature(X3) | |
_triplet_loss = triplet_loss( query_features, positive_features, negative_features, 0.1 ) | |
classification_loss1 = tf.losses.softmax_cross_entropy( onehot_labels=y1, logits=query_output ) | |
classification_loss2 = tf.losses.softmax_cross_entropy( onehot_labels=y2, logits=positive_output ) | |
classification_loss3 = tf.losses.softmax_cross_entropy( onehot_labels=y3, logits=negative_output ) | |
beta = 0.01 | |
regularization_strength = 0.2 | |
l2_loss = tf.add_n([ tf.nn.l2_loss(v) for v in tf.trainable_variables() if 'biases' not in v.name ]) * regularization_strength | |
all_loss = tf.reduce_sum([ classification_loss1, classification_loss2, classification_loss3, beta * _triplet_loss, l2_loss ], name='total_loss') | |
train_classification = tf.train.RMSPropOptimizer(0.00001).minimize(all_loss) #, var_list=classification_variables) | |
correct_prediction = tf.cast( tf.squeeze( [ tf.equal( tf.argmax( extract_feature(X1)[1] , axis=1), tf.argmax(y1, axis=1) ), | |
tf.equal( tf.argmax( extract_feature(X2)[1] , axis=1), tf.argmax(y2, axis=1) ), | |
tf.equal( tf.argmax( extract_feature(X3)[1] , axis=1), tf.argmax(y3, axis=1) ) ] ), tf.float32) | |
accuracy = tf.reduce_mean( correct_prediction ) | |
# Saver | |
saver = tf.train.Saver() | |
def train(max_epochs=10, train_sample_count=6000): | |
with tf.Session() as sess: | |
sess.run(tf.global_variables_initializer()) | |
# Load weights | |
load_weights(sess) | |
epochs = train_sample_count // BATCH_SIZE + 1 | |
# Tracking Loss | |
train_accuracy = [] | |
train_triplet_losses = [] | |
test_accuracy = [] | |
test_triplet_losses = [] | |
with tqdm(total=max_epochs) as pbar_global: | |
for k in range(max_epochs): | |
with tqdm(total=epochs) as pbar: | |
for i in range(epochs): | |
query, positive, negative, label1, label2, label3 = next(train_gen) | |
_, a, tl = sess.run([ train_classification, accuracy, _triplet_loss ], feed_dict={ X1: query, y1: label1, | |
X2: positive, X3: negative, y2: label2, y3: label3 | |
}) | |
pbar.update(1) | |
pbar.set_description("Train Acc: {0:.2f} - Triplet: {0:.2f}".format( a, tl )) | |
train_accuracy.append(a) | |
train_triplet_losses.append(tl) | |
tquery, tpositive, tnegative, tlabel1, tlabel2, tlabel3 = next(test_gen) | |
ta, ttl = sess.run([ accuracy, _triplet_loss], feed_dict= { X1: tquery, y1: tlabel1, X2: tpositive, y2: tlabel2, X3: tnegative, y3: tlabel3}) | |
test_accuracy.append(ta) | |
test_triplet_losses.append(ttl) | |
pbar_global.update(1) | |
pbar_global.set_description("Test Acc: {0:.2f} - Triplet: {0:.2f}".format(ta, ttl)) | |
np.save('./logs/train_accuracy_{}.npy'.format(k), np.array(train_accuracy)) | |
np.save('./logs/train_triplet_loss_{}.npy'.format(k), np.array(train_triplet_losses)) | |
np.save('./logs/test_accuracy_{}.npy'.format(k), np.array(test_accuracy)) | |
np.save('./logs/test_triplet_loss_{}.npy'.format(k), np.array(test_triplet_losses)) | |
saver.save(sess, './models/model_{}.ckpt'.format(k)) | |
# # Second session | |
def test(): | |
with tf.Session() as sess: | |
sess.run(tf.global_variables_initializer()) | |
saver.restore(sess, './models/model_0.ckpt') | |
tquery, tpositive, tnegative, tlabel1, tlabel2, tlabel3 = next(test_gen) | |
ta, ttl = sess.run([ accuracy, _triplet_loss], feed_dict= { X1: tquery, y1: tlabel1, X2: tpositive, y2: tlabel2, X3: tnegative, y3: tlabel3}) | |
print("Accuracy {} - Loss - {}".format(ta, ttl)) | |
def extract_features_from_images(images, labels, filename='features', batch_size=1000): | |
with tf.Session() as sess: | |
saver.restore(sess, './models/model_9.ckpt') | |
n_samples = len(images) | |
for i in tqdm(range(0, n_samples, batch_size )): | |
upper_limit = min(i + batch_size, n_samples) | |
features = sess.run(image_features, feed_dict={ IMAGE_INPUT: images[i: upper_limit] }) | |
np.save('./features/' + filename + '_{}.npy'.format(i), np.array(features[0])) | |
np.save('./features/' + filename + '_labels_{}.npy'.format(i), labels[ i: upper_limit ]) | |
if __name__ == '__main__': | |
#train() | |
# test() | |
extract_features_from_images( | |
to_extract_feature, to_extract_feature_labels | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment