Created
June 10, 2016 09:24
-
-
Save denadai2/c2fb6b7e9f7453db2e51da6abc27fb8c to your computer and use it in GitHub Desktop.
Deep Learning Report Task1 - Scene recognition from images
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from keras.models import Sequential | |
import numpy as np | |
import pandas as pd | |
from keras.utils.np_utils import to_categorical | |
from keras.layers import Dense, Dropout, Activation, Flatten | |
from keras.optimizers import Adam, SGD | |
from keras.layers.convolutional import Convolution2D, MaxPooling2D, ZeroPadding2D | |
from keras.callbacks import ModelCheckpoint, EarlyStopping | |
from PIL import Image | |
from keras.layers.normalization import BatchNormalization | |
import argparse | |
def shuffle_in_unison_inplace(a, b): | |
""" | |
Shuffle two arrays together | |
@param a: numpy array | |
@param b: numpy array | |
@return: two numpy arrays shuffled | |
""" | |
assert len(a) == len(b) | |
p = np.random.permutation(len(a)) | |
return a[p], b[p] | |
def myGenerator(file_list, y_data, batch_size=32, vgg_norm=False, horizontal_flip=False): | |
""" | |
Generator for keras | |
@param file_list: file list of the images | |
@param y_data: labels for the classification task | |
@param batch_size: batch size for the ANN | |
@param vgg_norm: Normalize for VGG | |
@param horizontal_flip: add the images horizontally flipped | |
""" | |
file_list, y_data = shuffle_in_unison_inplace(file_list, y_data) | |
num = len(file_list) | |
while 1: | |
for i in range(int(num/batch_size)): | |
X_train = extract_images(file_list[i*batch_size:(i+1)*batch_size], vgg_norm, False) | |
yield X_train, y_data[i*batch_size:(i+1)*batch_size] | |
if horizontal_flip: | |
X_train = extract_images(file_list[i*batch_size:(i+1)*batch_size], vgg_norm, True) | |
yield X_train, y_data[i*batch_size:(i+1)*batch_size] | |
def flip_axis(x, axis): | |
""" | |
Flip numpy array image | |
@param x: image | |
@param axis: axis where to flip | |
@return: numpy array image | |
""" | |
x = np.asarray(x).swapaxes(axis, 0) | |
x = x[::-1, ...] | |
x = x.swapaxes(0, axis) | |
return x | |
def extract_images(file_list, vgg_norm=False, horizontal_flip=False): | |
""" | |
Read all the images and create a numpy array with all the images | |
@param file_list: file list of the images | |
@param vgg_norm: Normalize for VGG | |
@param horizontal_flip: flip the image horizontally | |
@return: numpy array of all the images | |
""" | |
img_list = [] | |
for f in file_list: | |
f = f.replace('/frames/', '/frames/crop/') | |
image = extract_image(f, vgg_norm) | |
if horizontal_flip: | |
image = flip_axis(image, 2) | |
img_list.append(image[None, ...]) | |
images = np.vstack(img_list) | |
return images | |
def extract_image(filename, vgg_norm=False): | |
""" | |
Read an image and return a numpy array (width, height, channels) | |
ref https://blog.eduardovalle.com/2015/08/25/input-images-theano/ | |
@param filename: filename of the image | |
@param vgg_norm: Normalize for VGG | |
@return: numpy array (width, height, channels) | |
""" | |
image = Image.open(filename, 'r') | |
if len(image.getbands()) < 3: | |
image = image.convert('RGB') | |
im = np.fromstring(image.tobytes(), dtype='uint8') | |
im = im.reshape(image.size[1], image.size[0], 3) | |
in_data = np.asarray(im, dtype='float32') | |
if vgg_norm: | |
in_data[:, :, 0] -= 103.939 | |
in_data[:, :, 1] -= 116.779 | |
in_data[:, :, 2] -= 123.68 | |
else: | |
in_data /= 255. | |
in_data = in_data.transpose(2, 0, 1) | |
return in_data | |
def create_network(WIDTH, HEIGHT, CLASSES, weights_path=None): | |
""" | |
Custom network | |
@param WIDTH: input image's width | |
@param HEIGHT: input image's height | |
@param CLASSES: number of classes for the classification task | |
@param weights_path: filepath to save the weights | |
@return: Keras network | |
""" | |
model = Sequential() | |
model.add(ZeroPadding2D((1, 1), input_shape=(3, WIDTH, HEIGHT))) | |
model.add(Convolution2D(16, 3, 3, activation='relu', init='he_normal')) | |
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2))) | |
model.add(BatchNormalization()) | |
model.add(ZeroPadding2D((1, 1))) | |
model.add(Convolution2D(32, 3, 3, activation='relu', init='he_normal')) | |
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2))) | |
model.add(BatchNormalization()) | |
model.add(ZeroPadding2D((1, 1))) | |
model.add(Convolution2D(64, 3, 3, activation='relu', init='he_normal')) | |
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2))) | |
model.add(BatchNormalization()) | |
model.add(ZeroPadding2D((1, 1))) | |
model.add(Convolution2D(128, 3, 3, activation='relu', init='he_normal')) | |
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2))) | |
model.add(BatchNormalization()) | |
model.add(Flatten()) | |
model.add(Dense(512, activation='relu', init='he_normal')) | |
model.add(BatchNormalization()) | |
model.add(Dense(512, activation='relu', init='he_normal')) | |
model.add(Dropout(0.5)) | |
model.add(BatchNormalization()) | |
model.add(Dense(CLASSES, activation='softmax')) | |
if weights_path: | |
model.load_weights(weights_path) | |
sgd = Adam(lr=0.001) | |
model.compile(optimizer=sgd, loss='categorical_crossentropy', metrics=['accuracy']) | |
return model | |
def VGG_16(CLASSES, weights_path=None): | |
""" | |
VGG configuration | |
@param CLASSES: number of classes for the classification task | |
@param weights_path: path to load the weights | |
@return: Keras network | |
""" | |
model = Sequential() | |
model.add(ZeroPadding2D((1,1),input_shape=(3, 224, 224))) | |
model.add(Convolution2D(64, 3, 3, activation='relu', trainable=False)) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(64, 3, 3, activation='relu', trainable=False)) | |
model.add(MaxPooling2D((2,2), strides=(2,2))) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(128, 3, 3, activation='relu', trainable=False)) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(128, 3, 3, activation='relu', trainable=False)) | |
model.add(MaxPooling2D((2,2), strides=(2,2))) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(256, 3, 3, activation='relu', trainable=False)) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(256, 3, 3, activation='relu', trainable=False)) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(256, 3, 3, activation='relu', trainable=False)) | |
model.add(MaxPooling2D((2,2), strides=(2,2))) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(512, 3, 3, activation='relu', trainable=False)) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(512, 3, 3, activation='relu', trainable=False)) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(512, 3, 3, activation='relu', trainable=False)) | |
model.add(MaxPooling2D((2,2), strides=(2,2))) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(512, 3, 3, activation='relu', trainable=False)) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(512, 3, 3, activation='relu', trainable=False)) | |
model.add(ZeroPadding2D((1,1))) | |
model.add(Convolution2D(512, 3, 3, activation='relu', trainable=False)) | |
model.add(MaxPooling2D((2,2), strides=(2,2))) | |
model.add(Flatten()) | |
model.add(Dense(4096, activation='relu', trainable=False)) | |
model.add(Dropout(0.5)) | |
model.add(Dense(4096, activation='relu')) | |
model.add(Dropout(0.5)) | |
model.add(Dense(1000, activation='softmax')) | |
if weights_path: | |
model.load_weights(weights_path) | |
model.layers.pop() | |
model.add(Dense(CLASSES, activation='softmax')) | |
sgd = SGD(lr=0.1, decay=1e-6, momentum=0.9, nesterov=True) | |
model.compile(optimizer=sgd, loss='categorical_crossentropy', metrics=['accuracy']) | |
return model | |
def make_argument_parser(): | |
""" | |
Creates an ArgumentParser to read the options for this script from | |
sys.argv | |
:return: | |
""" | |
parser = argparse.ArgumentParser( | |
description="Deep simulation" | |
) | |
parser.add_argument('-name', | |
help='Name of the model (in order to save the weights with a name)') | |
parser.add_argument('-vgg', | |
help='Do you want to use the pre-tained VGG?', | |
type=bool, | |
default=False) | |
parser.add_argument('-cont', | |
help='Load weights path and continue the training', | |
default=None) | |
parser.add_argument('-test', | |
help='Load weights path and test', | |
default=None) | |
return parser | |
def main(): | |
parser = make_argument_parser() | |
args = parser.parse_args() | |
df = pd.read_csv('train.txt', names=['filename', 'label'], delimiter=' ') | |
df_test = pd.read_csv('test.txt', names=['filename', 'label'], delimiter=' ') | |
'''df_united = df.append(df_test) | |
msk = np.random.rand(len(df_united)) < 0.8 | |
df = df_united[msk].reset_index() | |
df_test = df_united[~msk].reset_index()''' | |
df = df.reindex(np.random.permutation(df.index)) | |
file_list = df['filename'].values | |
label_list = to_categorical(df[['label']].values) | |
WIDTH = 224 | |
HEIGHT = 224 | |
batch_size = 88 | |
num_examples = len(df['label'].values) | |
EPOCHS = 60 | |
CLASSES = len(list(set(df['label']))) | |
horizontal_flip = True | |
validation_idx = int(num_examples*0.2) | |
file_list_validation = file_list[:validation_idx] | |
label_list_validation = label_list[:validation_idx] | |
file_list_train = file_list[validation_idx:] | |
label_list_train = label_list[validation_idx:] | |
n_train = len(file_list_train) | |
weights_path = args.test | |
if args.cont: | |
weights_path = args.cont | |
if args.vgg: | |
model = VGG_16(CLASSES, 'vgg16_weights.h5') | |
else: | |
model = create_network(WIDTH, HEIGHT, CLASSES, weights_path) | |
if not args.test: | |
from keras.utils.visualize_util import plot | |
plot(model, to_file='model.png') | |
callback = ModelCheckpoint('weights/weights_'+args.name+'.{epoch:02d}-{val_loss:.2f}.hdf5', | |
monitor='val_loss') | |
callback_early = EarlyStopping(monitor='val_loss', patience=3, verbose=0, mode='auto') | |
if horizontal_flip: | |
n_train *= 2 | |
model.fit_generator(myGenerator(file_list_train, label_list_train, batch_size=batch_size, vgg_norm=args.vgg, | |
horizontal_flip=horizontal_flip), | |
n_train, | |
EPOCHS, | |
nb_val_samples=len(file_list_validation), | |
validation_data=myGenerator(file_list_validation, label_list_validation, | |
batch_size=batch_size), | |
callbacks=[callback, callback_early]) | |
print("READING TEST DATASET") | |
X_test = extract_images(df_test['filename'].values) | |
y_test = to_categorical(df_test[['label']].values) | |
score = model.evaluate(X_test, y_test, batch_size=batch_size) | |
print(score) | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment