Skip to content

Instantly share code, notes, and snippets.

@bmabir17
Last active July 19, 2021 14:20
Show Gist options
  • Save bmabir17/f37bc164c3f45c1c1b6ec3db0aeb231d to your computer and use it in GitHub Desktop.
Save bmabir17/f37bc164c3f45c1c1b6ec3db0aeb231d to your computer and use it in GitHub Desktop.
Transfer Learning Models for Training Classifier
import numpy as np
from PIL import Image
from pathlib import Path
from collections import defaultdict
import keras
from tensorflow.keras.utils import Sequence
import logging
import matplotlib.pyplot as plt
import cv2
from random import shuffle
from tensorflow.keras.utils import to_categorical
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s %(levelname)s:%(message)s')
logger = logging.getLogger(__name__)
class SmartDataGenerator(Sequence):
def __init__(self, data, batch_size, target_resolution, n_channels, n_classes, balancing_augmentation,
class_balancing = True, shuffle=True, resize_technique = Image.BICUBIC):
"""initialization"""
self.target_resolution = target_resolution
self.n_channels = n_channels
self.batch_size = batch_size
self.list_IDs, self.labels = data
self.n_classes = n_classes
self.shuffle = shuffle
self.resize_technique = resize_technique
self.class_balancing = class_balancing
self.b_augment = balancing_augmentation
self.balance_class()
self.on_epoch_end()
def __len__(self):
ln = int(np.floor(len(self.list_IDs) / self.batch_size))
return ln
def __getitem__(self, pivot_index):
"""Generate one batch of data"""
indexes = self.indexes[pivot_index * self.batch_size : (pivot_index+1)*self.batch_size]
list_IDs_tmp = [self.list_IDs[k] for k in indexes]
X, y = self.__data_generation(list_IDs_tmp)
return X, y
def read_img(self, path):
"""read, resize and convert into numpy array"""
img_obj = Image.open(path)
img_obj = img_obj.resize(self.target_resolution, Image.BICUBIC)
img = np.array(img_obj)
return img
def __data_generation(self, list_IDs_temp):
"""Generates data from disk files"""
X = np.empty((self.batch_size, *self.target_resolution, self.n_channels ))
y = np.empty((self.batch_size), dtype=int)
for i, img_ID in enumerate(list_IDs_temp):
if not Path(img_ID).exists() and Path(img_ID).name == '__aug__':
tmp_img_ID = Path(img_ID).parent
img = self.read_img(tmp_img_ID)
img = self.b_augment(image=img)["image"]
img = img.reshape((img.shape[0], img.shape[1], self.n_channels))
img = img.astype('float32') / 255
X[i,] = img
y[i] = self.labels[str(img_ID)]
elif Path(img_ID).exists():
img = self.read_img(img_ID)
img = img.reshape((img.shape[0], img.shape[1], self.n_channels ))
img = img.astype('float32') / 255
X[i,] = img
y[i] = self.labels[str(img_ID)]
else:
raise Exception('File not found at {}th iteration. Please check {}'.format(i, img_ID))
y_cat = to_categorical(y, num_classes=self.n_classes)
return X, y_cat
def balance_class(self):
if self.class_balancing == False:
logging.info('Skipping data balancing')
return
else:
logging.info('-------------------\n\nStarting Datasets balancing using SMOT...\n')
# first track all file name in its belonging class
cls_filename_map = defaultdict(list)
for path in self.list_IDs:
file_name = Path(path)
cls = self.labels[str(path)]
cls_filename_map[cls].append(file_name)
# calculate how many synthetic file is needed in each class
total = 0
max_cls_file_num = 0
cls_file_count_map = {}
for cls in cls_filename_map.keys():
file_num = len(cls_filename_map[cls])
cls_file_count_map[cls] = file_num
total += file_num
if max_cls_file_num < file_num:
max_cls_file_num = file_num
dominate_pct = 4 # imbalce pct of the dominate class. dominate_pct = 0 means all other class are equal
new_estimated_total = int(len(cls_filename_map) * (max_cls_file_num - (max_cls_file_num * dominate_pct / 100.0)))
logging.info('Found {} data. More {} synthetic data will be generated (new total = {}).\t{}/class\n'
.format(total, new_estimated_total-total, new_estimated_total, (new_estimated_total/len(cls_filename_map))))
for cls, cls_file_count in cls_file_count_map.items():
current_cls_filelist = cls_filename_map[cls]
current_cls_file_count = len(current_cls_filelist)
if current_cls_file_count < max_cls_file_num:
more_needed = int((new_estimated_total / len(cls_filename_map)) - current_cls_file_count)
logging.info('class-{} will need more {}.\tcurrently have {}'.format(cls, more_needed, current_cls_file_count))
for i in range(more_needed):
index = i % current_cls_file_count
tmp_file = current_cls_filelist[index]
new_filename = Path(tmp_file) / '__aug__' # __aug__ is our defined flag to distinguish from real
self.list_IDs.append(new_filename)
self.labels[str(new_filename)] = cls
else:
logging.info(
'class-{} need not any synthetic data. Already it has {}'.format(cls, current_cls_file_count))
def on_epoch_end(self):
"""Updates indexes after each epoch"""
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
class TukiTaki:
def get_class_name_from_hotencoding(hot_encoding):
import json
i = np.argmax(hot_encoding, axis=0)
with open('Reverse_class_index_mapping.txt') as f:
data = json.load(f)
key = str(i)
return data[key]
class KfoldMaker:
def __init__(self, dataset_dir, image_extensions):
self.dataset_dir = dataset_dir
self.image_extensions = image_extensions
self.all_path_list, self.all_labels_dic = self.scan_dataset()
def generate_folds(self, K):
np.random.shuffle(self.all_path_list)
# filename_grpups = self.person_wise_grouping(K)
num_validation_samples = len(self.all_path_list) // K
fold_list = []
logging.info('\n\nDividing dataset into {} folds...'.format(K))
for fold in range(K):
# For general case
validation_data_x = self.all_path_list[num_validation_samples*fold : num_validation_samples*(fold+1)]
training_data_x = self.all_path_list[ : num_validation_samples*fold] \
+ self.all_path_list[num_validation_samples*(fold+1) : ]
validation_data_y = {}
training_data_y = {}
for x in validation_data_x:
validation_data_y[str(x)] = self.all_labels_dic[str(x)]
for x in training_data_x:
training_data_y[str(x)] = self.all_labels_dic[str(x)]
fold_list.append(((training_data_x, training_data_y), (validation_data_x, validation_data_y)))
status_bar = ['_'for x in range(K)]
status_bar[fold] = '#'
logging.info('Generated K-fold metadata for fold: {} {}'.format(str(fold), ''.join(status_bar)))
# import pickle
# with open('tmp/k_fold_metadata_temporary.pickle', 'wb') as fp:
# pickle.dump(fold_list, fp)
return fold_list
def scan_dataset(self):
from pathlib import Path
root_path = Path(self.dataset_dir)
images_path_list = []
for extension in self.image_extensions:
img_list = list(root_path.rglob('*.'+extension))
images_path_list.extend(img_list)
logging.debug('{} images found with {} extension'.format(len(img_list), extension))
logging.info('Found {} images with {} extensions\n'.format(len(images_path_list), self.image_extensions))
label_dic = {}
cls_id_counter = -1
cls_id_map = {}
reverse_cls_id_map = {}
for index, path in enumerate(images_path_list):
cls_name = path.parent.name
if cls_name not in cls_id_map:
cls_id_counter += 1
cls_id_map[cls_name] = cls_id_counter
reverse_cls_id_map[cls_id_counter] = cls_name
label_dic[str(path)] = cls_id_counter
# save mapping between original class name and new assigned index
# it may be useful during testing/prediction
import json
with open('Reverse_class_index_mapping.txt', 'w') as file:
file.write(json.dumps(reverse_cls_id_map))
logging.info('Detected {} correctly labeled images inside {}'.format(len(images_path_list), self.dataset_dir))
logging.info('Total {} class found. {}\n'.format(len(cls_id_map), cls_id_map))
return images_path_list, label_dic
def build_mobilenetv2_model(config):
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.optimizers import Adam
input_shape = (config.input_size, config.input_size, config.n_channels)
# create MobileNetv2 model with ImageNet weights and with no prediction layer
base_model=MobileNetV2(weights='imagenet',input_shape=input_shape,include_top=False)
base_model.trainable=False
base_model.summary()
prediction_layer = tf.keras.layers.Dense(config.n_classes, activation= 'softmax')
# Add prediction layers
inputs = tf.keras.Input(shape=input_shape)
x = base_model(inputs, training=False)
x = layers.GlobalMaxPooling2D()(x)
x = layers.Dense(1024,activation='relu')(x)
x = layers.Dense(1024,activation='relu')(x)
x = layers.Dense(512,activation='relu')(x)
x = layers.Dense(256,activation='relu')(x)
x = layers.Dense(128,activation='relu')(x)
outputs = prediction_layer(x)
model = tf.keras.Model(inputs, outputs)
adam = Adam(lr=config.learning_rate)
model.compile(optimizer= adam, loss='categorical_crossentropy', metrics=[
tf.keras.metrics.BinaryAccuracy(name='accuracy'),
tf.keras.metrics.TruePositives(name='true_pos'),
tf.keras.metrics.FalsePositives(name='false_pos'),
tf.keras.metrics.TrueNegatives(name='true_neg'),
tf.keras.metrics.FalseNegatives(name='false_neg'),
tf.keras.metrics.Precision(name='precision'),
tf.keras.metrics.Recall(name='recall'),
tf.keras.metrics.AUC(name='auc'),
tf.keras.metrics.AUC(name='prc', curve='PR')])
model.summary()
return model
def build_resnet_model(config):
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.optimizers import Adam
# create model
base_model=ResNet50(weights='imagenet',input_shape=(config.input_size, config.input_size, config.n_channels),include_top=False)
base_model.trainable=False
x = base_model.output
x = layers.GlobalMaxPooling2D()(x)
x = layers.Dense(1024,activation='relu')(x)
x = layers.Dense(1024,activation='relu')(x)
x = layers.Dense(512,activation='relu')(x)
x = layers.Dense(config.n_classes, activation= 'softmax')(x)
model = Model(inputs = base_model.input, outputs = x)
adam = Adam(lr=config.learning_rate)
model.compile(optimizer= adam, loss='categorical_crossentropy', metrics=[
tf.keras.metrics.BinaryAccuracy(name='accuracy'),
tf.keras.metrics.TruePositives(name='true_pos'),
tf.keras.metrics.FalsePositives(name='false_pos'),
tf.keras.metrics.TrueNegatives(name='true_neg'),
tf.keras.metrics.FalseNegatives(name='false_neg'),
tf.keras.metrics.Precision(name='precision'),
tf.keras.metrics.Recall(name='recall'),
tf.keras.metrics.AUC(name='auc'),
tf.keras.metrics.AUC(name='prc', curve='PR')])
return model
from DatasetTools import SmartDataGenerator, KfoldMaker
from PIL import Image
from albumentations import (
Compose, HorizontalFlip,
RandomBrightness, RandomContrast,
)
from model import build_simple_model,build_resnet_model,build_resnet_model2,build_mobilenetv2_model,build_mobilenetv2_model2
import tensorflow as tf
from tensorflow.python.keras import backend as K
from tensorflow.keras.callbacks import EarlyStopping,ModelCheckpoint
from tensorflow.keras.models import load_model
import wandb
from wandb.keras import WandbCallback
# 1. Start a new run
wandb.init(project='projectName', entity='userName')
2. Save model inputs and hyperparameters
config = wandb.config
config.epochs = 30
config.num_validation=100
config.batch_size=16
config.n_channels=3
config.n_classes=2
# config.k_fold=5
config.cleaned=True
config.learning_rate=1e-06
config.model_type='mobilenet' # 'baseModel', 'resnet', 'mobilenet'
if(config.model_type=='mobilenet'):
config.input_size=224 #For mobilenet
elif(config.model_type=='baseModel'):
config.input_size=256 #For basic_model
config.dataset_dir = 'data/train_clean1'
config.model_save_path=f"saved_model/{config.model_type}/e{config.epochs}_es_s{config.input_size}_cleanData"
# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
# Parameters
params = {'batch_size': config.batch_size,
'target_resolution': (config.input_size, config.input_size),
'n_channels': config.n_channels,
'n_classes': config.n_classes,
'balancing_augmentation': Compose([RandomContrast(limit=0.2, p=0.5), RandomBrightness(limit=0.2, p=0.5)] ),
# 'balancing_augmentation': Compose([] ),
'shuffle': True,
'resize_technique': Image.BICUBIC}
# NOTE: here KfoldMaker is only used to scan files and generate file list
k_obj = KfoldMaker(dataset_dir=config.dataset_dir, image_extensions=['jpeg'])
num_validation = config.num_validation # number of validation set you want
X = k_obj.all_path_list
Y = k_obj.all_labels_dic
# print(Y)
train_x = X[ : -num_validation]
val_x = X[ num_validation : ]
training_gen = SmartDataGenerator(data=(train_x, Y), **params)
validation_gen = SmartDataGenerator(data=(val_x, Y), **params)
# simple early stopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1,patience=5)
mc = ModelCheckpoint('best_model.h5', monitor='val_accuracy', mode='max', verbose=1, save_best_only=True)
if(config.model_type=='mobilenet'):
model = build_mobilenetv2_model2(config)
elif(config.model_type=='baseModel'):
model = build_simple_model(config)
log = model.fit(x=training_gen,
validation_data=validation_gen,
workers=9, # realtime loading with parallel processing
epochs=config.epochs, verbose=1,
callbacks=[
es,
mc,
WandbCallback(
save_model=True
)])
# load the Best saved model
saved_model = load_model('best_model.h5')
saved_model.save(config.model_save_path)
model_json = saved_model.to_json()
with open(f"{config.model_save_path}/arch.json", "w") as json_file:
json_file.write(model_json)
saved_model.save_weights(f"{config.model_save_path}/weights.h5")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment