Created
January 5, 2022 12:59
-
-
Save YCAyca/0d77303f998042d6f7a369de97c1770f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Created on Mon Jan 3 14:34:44 2022 | |
@author: aktas | |
""" | |
# import necessary layers | |
from tensorflow.keras.layers import Input, Conv2D , Dropout, MaxPool2D, Flatten, Dense, Activation, BatchNormalization | |
from tensorflow.keras import Model | |
from tensorflow.keras.preprocessing.image import ImageDataGenerator | |
from tensorflow.keras.regularizers import l2 | |
import tensorflow as tf | |
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint | |
import os | |
import matplotlib.pyplot as plt | |
import sys | |
from tensorflow.keras.callbacks import CSVLogger | |
MODEL_FNAME = "trained_model.h5" | |
base_dir = "dataset" | |
tmp_model_name = "tmp.h5" | |
INPUT_SIZE = 224 | |
BATCH_SIZE = 16 | |
physical_devices = tf.config.list_physical_devices() | |
print("DEVICES : \n", physical_devices) | |
print('Using:') | |
print('\t\u2022 Python version:',sys.version) | |
print('\t\u2022 TensorFlow version:', tf.__version__) | |
print('\t\u2022 tf.keras version:', tf.keras.__version__) | |
print('\t\u2022 Running on GPU' if tf.test.is_gpu_available() else '\t\u2022 GPU device not found. Running on CPU') | |
count = 0 | |
previous_acc = 0 | |
if not os.path.exists(MODEL_FNAME): | |
""" Create Shallow Model""" | |
input = Input(shape =(INPUT_SIZE,INPUT_SIZE,3)) | |
weight_initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None) | |
bias_initializer=tf.keras.initializers.Zeros() | |
x = Conv2D (filters =32, kernel_size =3, padding ='same', kernel_regularizer=l2(0.001), kernel_initializer=weight_initializer,bias_initializer=bias_initializer)(input) | |
x = BatchNormalization()(x) | |
x = Activation('relu')(x) | |
x = MaxPool2D(pool_size =2, strides =2, padding ='same')(x) | |
x = Conv2D (filters =64, kernel_size =3, padding ='same', kernel_regularizer=l2(0.001), kernel_initializer=weight_initializer,bias_initializer=bias_initializer)(x) | |
x = BatchNormalization()(x) | |
x = Activation('relu')(x) | |
x = Conv2D (filters =64, kernel_size =3, padding ='same', kernel_regularizer=l2(0.001), kernel_initializer=weight_initializer,bias_initializer=bias_initializer)(x) | |
x = BatchNormalization()(x) | |
x = Activation('relu')(x) | |
x = MaxPool2D(pool_size =2, strides =2, padding ='same')(x) | |
x = Conv2D (filters =128, kernel_size =3, padding ='same', kernel_regularizer=l2(0.001),kernel_initializer=weight_initializer,bias_initializer=bias_initializer)(x) | |
x = BatchNormalization()(x) | |
x = Activation('relu')(x) | |
x = Conv2D (filters =128, kernel_size =3, padding ='same', kernel_regularizer=l2(0.001),kernel_initializer=weight_initializer,bias_initializer=bias_initializer)(x) | |
x = BatchNormalization()(x) | |
x = Activation('relu')(x) | |
x = MaxPool2D(pool_size =2, strides =2, padding ='same')(x) | |
x = Conv2D (filters =128, kernel_size =3, padding ='same', kernel_regularizer=l2(0.001),kernel_initializer=weight_initializer,bias_initializer=bias_initializer)(x) | |
x = BatchNormalization()(x) | |
x = Activation('relu')(x) | |
x = Conv2D (filters =128, kernel_size =3, padding ='same', kernel_regularizer=l2(0.001),kernel_initializer=weight_initializer,bias_initializer=bias_initializer)(x) | |
x = BatchNormalization()(x) | |
x = Activation('relu')(x) | |
x = MaxPool2D(pool_size =2, strides =2, padding ='same')(x) | |
x = Flatten()(x) | |
x = Dropout(0.5)(x) | |
x = Dense(units = 64, activation ='relu', kernel_regularizer=l2(0.001), kernel_initializer=weight_initializer,bias_initializer=bias_initializer)(x) | |
output = Dense(units=1,activation='sigmoid')(x) | |
# creating the model | |
model = Model (inputs=input, outputs =output) | |
# to be sure GPU memory is cleaned after last train | |
m = model | |
m.save(tmp_model_name) | |
del m | |
tf.keras.backend.clear_session() | |
model.summary() | |
""" Prepare the Dataset for Training""" | |
train_dir = os.path.join(base_dir, 'train') | |
val_dir = os.path.join(base_dir, 'validation') | |
train_batches = ImageDataGenerator(rescale = 1 / 255.,horizontal_flip=True, rotation_range=90,brightness_range=[0.2,1.2],zoom_range=[0.5,1.5]).flow_from_directory(train_dir, | |
target_size=(INPUT_SIZE,INPUT_SIZE), | |
shuffle=True, | |
seed=42, | |
class_mode = "binary", | |
batch_size=BATCH_SIZE) | |
val_batches = ImageDataGenerator(rescale = 1 / 255.).flow_from_directory(val_dir, | |
target_size=(INPUT_SIZE,INPUT_SIZE), | |
shuffle=True, | |
seed=42, | |
class_mode = "binary", | |
batch_size=BATCH_SIZE) | |
""" Train """ | |
class CustomLearningRateScheduler(tf.keras.callbacks.Callback): | |
def __init__(self, schedule): | |
super(CustomLearningRateScheduler, self).__init__() | |
self.schedule = schedule | |
self.weights_monitor = open("weights.txt", "w+") | |
# learning rate scheduler is called at the end of each epoch. learning rate decreases if needed | |
def on_epoch_end(self, epoch, logs=None): | |
if not hasattr(self.model.optimizer, "lr"): | |
raise ValueError('Optimizer must have a "lr" attribute.') | |
# Get the current learning rate from model's optimizer. | |
lr = float(tf.keras.backend.get_value(self.model.optimizer.learning_rate)) | |
# Call schedule function to get the scheduled learning rate. | |
# keys = list(logs.keys()) | |
# print("keys",keys) | |
val_acc = logs.get("val_binary_accuracy") | |
scheduled_lr = self.schedule(lr, val_acc) | |
# Set the value back to the optimizer before this epoch starts | |
tf.keras.backend.set_value(self.model.optimizer.lr, scheduled_lr) | |
#first and last convolutional layers' and dense layer's weights are saved at the beginning of each epoch | |
def on_epoch_begin(self, epoch, logs=None): | |
epoch_str = "beginning of epoch : "+ str(epoch) | |
self.weights_monitor.write(epoch_str) | |
self.weights_monitor.write(str(model.get_layer("conv2d").weights)) | |
self.weights_monitor.write(str(model.get_layer("conv2d_1").weights)) | |
self.weights_monitor.write(str(model.get_layer("dense_1").weights)) | |
def learning_rate_scheduler(lr, val_acc): | |
global count | |
global previous_acc | |
if val_acc <= previous_acc: | |
# print("acc ", val_acc, "previous acc ", previous_acc) | |
count += 1 | |
else: | |
previous_acc = val_acc | |
count = 0 | |
if count >= 10: | |
print("acc doesnt improve for 10 epoch, learnin rate decreased by /10") | |
count = 0 | |
lr /= 10 | |
print("new learning rate:", lr) | |
return lr | |
#compile the model by determining loss function Binary Cross Entropy, optimizer as SGD | |
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.00001), | |
loss=tf.keras.losses.BinaryCrossentropy(), | |
metrics=[tf.keras.metrics.BinaryAccuracy()], | |
sample_weight_mode=[None]) | |
#if validation accuracy doesnt improve for 15 epoch, stop training | |
early_stopping = EarlyStopping(monitor='val_binary_accuracy', patience=15) | |
#save the model if a better validation accuracy then previous better accuracy is obtained | |
checkpointer = ModelCheckpoint(filepath=MODEL_FNAME, verbose=1, save_best_only=True) | |
# write accuracy and loss history to the log.csv | |
csv_logger = CSVLogger('log.csv', append=True, separator=' ') | |
history=model.fit(train_batches, | |
validation_data = val_batches, | |
epochs = 200, | |
verbose = 1, | |
shuffle = True, | |
callbacks = [checkpointer,early_stopping,CustomLearningRateScheduler(learning_rate_scheduler), | |
csv_logger]) | |
""" Plot the train and validation Loss """ | |
plt.plot(history.history['loss']) | |
plt.plot(history.history['val_loss']) | |
plt.title('model loss') | |
plt.ylabel('loss') | |
plt.xlabel('epoch') | |
plt.legend(['train', 'validation'], loc='upper left') | |
plt.show() | |
""" Plot the train and validation Accuracy """ | |
plt.plot(history.history['binary_accuracy']) | |
plt.plot(history.history['val_binary_accuracy']) | |
plt.title('model accuracy') | |
plt.ylabel('accuracy') | |
plt.xlabel('epoch') | |
plt.legend(['train', 'validation'], loc='upper left') | |
plt.show() | |
print("End of Training") | |
else: | |
""" Test """ | |
test_dir = os.path.join(base_dir, 'test') | |
test_batches = ImageDataGenerator(rescale = 1 / 255.).flow_from_directory(test_dir, | |
target_size=(INPUT_SIZE,INPUT_SIZE), | |
class_mode='binary', | |
shuffle=False, | |
seed=42, | |
batch_size=1) | |
model = tf.keras.models.load_model(MODEL_FNAME) | |
model.summary() | |
# Evaluate on test data | |
scores = model.evaluate(test_batches) | |
print("metric names",model.metrics_names) | |
print(model.metrics_names[0], scores[0]) | |
print(model.metrics_names[1], scores[1]) | |
tf.keras.backend.clear_session() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment