Skip to content

Instantly share code, notes, and snippets.

@ucalyptus2
Last active June 21, 2020 06:19
Show Gist options
  • Save ucalyptus2/916be325e9ae8c99c01aa79cb4006cd0 to your computer and use it in GitHub Desktop.
Save ucalyptus2/916be325e9ae8c99c01aa79cb4006cd0 to your computer and use it in GitHub Desktop.
import matplotlib.pyplot as plt
import os
import tensorflow as tf
from keras.utils import Sequence
from skimage.io import imread
from sklearn.utils import shuffle
import numpy as np
tf.logging.set_verbosity(tf.logging.ERROR)
from IPython import get_ipython
from albumentations import (Blur, Compose, HorizontalFlip, HueSaturationValue,
IAAEmboss, IAASharpen, JpegCompression, OneOf,
RandomBrightness, RandomBrightnessContrast,
RandomContrast, RandomCrop, RandomGamma,
RandomRotate90, RGBShift, ShiftScaleRotate,
Transpose, VerticalFlip, ElasticTransform, GridDistortion, OpticalDistortion)
import albumentations as albu
from albumentations import Resize
class DataGeneratorFolder(Sequence):
def __init__(self, root_dir='/content/drive/My Drive/road_segmentation/testing', image_folder='input/', mask_folder='output/',
batch_size=1, image_size=768, nb_y_features=1,
augmentation=None,
suffle=True):
self.image_filenames = os.listdir(os.path.join(root_dir, image_folder))
self.image_filenames = [os.path.join(root_dir, image_folder) + img for img in self.image_filenames]
self.mask_names = os.listdir(os.path.join(root_dir, mask_folder))
self.mask_names = [os.path.join(root_dir, mask_folder) + mask for mask in self.mask_names]
self.batch_size = batch_size
self.currentIndex = 0
self.augmentation = augmentation
self.image_size = image_size
self.nb_y_features = nb_y_features
self.indexes = None
self.suffle = suffle
def __len__(self):
"""
Calculates size of batch
"""
return int(np.ceil(len(self.image_filenames) / (self.batch_size)))
def on_epoch_end(self):
"""Updates indexes after each epoch"""
if self.suffle == True:
self.image_filenames, self.mask_names = shuffle(self.image_filenames, self.mask_names)
def read_image_mask(self, image_name, mask_name):
return imread(image_name) / 255, (imread(mask_name, as_gray=True) > 0).astype(np.int8)
def __getitem__(self, index):
"""
Generate one batch of data
"""
# Generate indexes of the batch
data_index_min = int(index * self.batch_size)
data_index_max = int(min((index + 1) * self.batch_size, len(self.image_filenames)))
indexes = self.image_filenames[data_index_min:data_index_max]
this_batch_size = len(indexes) # The last batch can be smaller than the others
# Defining dataset
X = np.empty((this_batch_size, self.image_size, self.image_size, 3), dtype=np.float32)
y = np.empty((this_batch_size, self.image_size, self.image_size, self.nb_y_features), dtype=np.uint8)
for i, sample_index in enumerate(indexes):
X_sample, y_sample = self.read_image_mask(self.image_filenames[index * self.batch_size + i],
self.mask_names[index * self.batch_size + i])
# if augmentation is defined, we assume its a train set
if self.augmentation is not None:
# Augmentation code
augmented = self.augmentation(self.image_size)(image=X_sample, mask=y_sample)
image_augm = augmented['image']
mask_augm = augmented['mask'].reshape(self.image_size, self.image_size, self.nb_y_features)
X[i, ...] = np.clip(image_augm, a_min=0, a_max=1)
y[i, ...] = mask_augm
# if augmentation isnt defined, we assume its a test set.
# Because test images can have different sizes we resize it to be divisable by 32
elif self.augmentation is None and self.batch_size == 1:
X_sample, y_sample = self.read_image_mask(self.image_filenames[index * 1 + i],
self.mask_names[index * 1 + i])
augmented = Resize(height=(X_sample.shape[0] // 32) * 32, width=(X_sample.shape[1] // 32) * 32)(
image=X_sample, mask=y_sample)
X_sample, y_sample = augmented['image'], augmented['mask']
return X_sample.reshape(1, X_sample.shape[0], X_sample.shape[1], 3).astype(
np.float32), y_sample.reshape(1, X_sample.shape[0], X_sample.shape[1], self.nb_y_features).astype(
np.uint8)
return X, y
# ## Data augmentation - albumentations
def aug_with_crop(image_size=256, crop_prob=1):
return Compose([
RandomCrop(width=image_size, height=image_size, p=crop_prob),
HorizontalFlip(p=0.5),
VerticalFlip(p=0.5),
RandomRotate90(p=0.5),
Transpose(p=0.5),
ShiftScaleRotate(shift_limit=0.01, scale_limit=0.04, rotate_limit=0, p=0.25),
RandomBrightnessContrast(p=0.5),
RandomGamma(p=0.25),
IAAEmboss(p=0.25),
Blur(p=0.01, blur_limit=3),
OneOf([
ElasticTransform(p=0.5, alpha=120, sigma=120 * 0.05, alpha_affine=120 * 0.03),
GridDistortion(p=0.5),
OpticalDistortion(p=1, distort_limit=2, shift_limit=0.5)
], p=0.8)
], p=1)
test_generator = DataGeneratorFolder(root_dir='/content/drive/My Drive/road_segmentation/testing',
image_folder='input/',
mask_folder='output/',
batch_size=1,
nb_y_features=1, augmentation=aug_with_crop)
Xtest, ytest = test_generator.__getitem__(0)
plt.imshow(Xtest[0])
plt.show()
plt.imshow(ytest[0, :, :, 0],cmap='gray')
plt.show()
# setting generators
test_generator = DataGeneratorFolder(root_dir='/content/drive/My Drive/road_segmentation/testing',
image_folder='input/',
mask_folder='output/',
batch_size=1, augmentation=aug_with_crop,
nb_y_features=1)
train_generator = DataGeneratorFolder(root_dir='/content/drive/My Drive/road_segmentation/training',
image_folder='input/',
mask_folder='output/',
augmentation=aug_with_crop,
batch_size=4,
image_size=512,
nb_y_features=1)
# ## Callbacks
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, TensorBoard
# reduces learning rate on plateau
lr_reducer = ReduceLROnPlateau(factor=0.1,
cooldown=10,
patience=10, verbose=1,
min_lr=0.1e-5)
mode_autosave = ModelCheckpoint("./weights/road_crop.efficientnetb0imgsize.h5", monitor='val_iou_score',
mode='max', save_best_only=True, verbose=1, period=10)
# stop learining as metric on validatopn stop increasing
early_stopping = EarlyStopping(patience=10, verbose=1, mode='auto')
# tensorboard for monitoring logs
tensorboard = TensorBoard(log_dir='./logs/tenboard', histogram_freq=0,
write_graph=True, write_images=False)
callbacks = [mode_autosave, lr_reducer, tensorboard, early_stopping]
from segmentation_models import Unet
from keras.optimizers import Adam
from segmentation_models.losses import bce_jaccard_loss, bce_dice_loss
from segmentation_models.metrics import iou_score
def plot_training_history(history):
"""
Plots model training history
"""
fig, (ax_loss, ax_acc) = plt.subplots(1, 2, figsize=(15, 5))
ax_loss.plot(history.epoch, history.history["loss"], label="Train loss")
ax_loss.plot(history.epoch, history.history["val_loss"], label="Validation loss")
ax_loss.legend()
ax_acc.plot(history.epoch, history.history["iou_score"], label="Train iou")
ax_acc.plot(history.epoch, history.history["val_iou_score"], label="Validation iou")
ax_acc.legend()
plt.savefig('PlotsModelTrainingHistory.pdf',format='pdf')
plt.show()
model = Unet(backbone_name='efficientnetb0', encoder_weights='imagenet', encoder_freeze=False)
model.compile(optimizer=Adam(),
loss=bce_jaccard_loss, metrics=[iou_score])
history = model.fit_generator(train_generator, shuffle=True,
epochs=50, use_multiprocessing=False,
validation_data=test_generator,
verbose=1, callbacks=callbacks)
# plotting history
plot_training_history(history)
# ## Inference and model quality check
# #### Single image
def iou_metric(y_true_in, y_pred_in):
labels = y_true_in
y_pred = y_pred_in
temp1 = np.histogram2d(labels.flatten(), y_pred.flatten(), bins=([0, 0.5, 1], [0, 0.5, 1]))
intersection = temp1[0]
area_true = np.histogram(labels, bins=[0, 0.5, 1])[0]
area_pred = np.histogram(y_pred, bins=[0, 0.5, 1])[0]
area_true = np.expand_dims(area_true, -1)
area_pred = np.expand_dims(area_pred, 0)
# Compute union
union = area_true + area_pred - intersection
# Exclude background from the analysis
intersection = intersection[1:, 1:]
intersection[intersection == 0] = 1e-9
union = union[1:, 1:]
union[union == 0] = 1e-9
iou = intersection / union
return iou
def plot_mask_gt_image(mask, groud_truth, img):
fig, axs = plt.subplots(1, 3, figsize=(20, 10))
axs[0].imshow(mask, cmap="Blues")
axs[1].imshow(groud_truth, cmap="Blues")
axs[2].imshow(img)
plt.show()
def iou_metric_batch(y_true_in, y_pred_in):
y_pred_in = y_pred_in
batch_size = y_true_in.shape[0]
metric = []
for batch in range(batch_size):
value = iou_metric(y_true_in[batch], y_pred_in[batch])
metric.append(value)
return np.mean(metric)
# to get single image and prediction quality
Xtest, y_test = test_generator.__getitem__(1)
predicted = model.predict(np.expand_dims(Xtest[0], axis=0)).reshape(1472, 1472)
print('IOU', iou_metric(y_test[0].reshape(1472, 1472), predicted))
plot_mask_gt_image(predicted, y_test.squeeze(0).squeeze(-1), Xtest.squeeze(0))
# ## Overall quality
scores = model.evaluate_generator(test_generator)
metrics = [iou_score]
print("Loss: {:.5}".format(scores[0]))
for metric, value in zip(metrics, scores[1:]):
print("mean {}: {:.5}".format(metric.__name__, value))
# ## Adding TTA (test time augmentation)
from tta_wrapper import tta_segmentation
from keras.models import load_model
model = load_model('./weights/road_crop.efficientnetb0imgsize.h5',
custom_objects={'binary_crossentropy + jaccard_loss': bce_jaccard_loss,
'iou_score': iou_score})
tta_model = tta_segmentation(model, h_flip=True, merge='mean').compile(optimizer=Adam(), loss=bce_jaccard_loss,
metrics=[iou_score])
scores = tta_model.evaluate_generator(test_generator)
metrics = [iou_score]
print("Loss: {:.5}".format(scores[0]))
for metric, value in zip(metrics, scores[1:]):
print("mean {}: {:.5}".format(metric.__name__, value))
# ### By adjusting threshold you can further improve your score
def draw_get_best_threshold(ious, thresholds):
"""
Returns threshold_best, iou_best
"""
threshold_best_index = np.argmax(ious)
iou_best = ious[threshold_best_index]
threshold_best = thresholds[threshold_best_index]
plt.plot(thresholds, ious)
plt.plot(threshold_best, iou_best, "xr", label="Best threshold")
plt.xlabel("Threshold")
plt.ylabel("IoU")
plt.title("Threshold vs IoU ({}, {})".format(threshold_best, iou_best))
plt.legend()
return threshold_best, iou_best
preds = []
y_val = []
for i in (range(0, test_generator.__len__())):
Xtest, y_test = test_generator.__getitem__(i)
preds.append(tta_model.predict(Xtest).reshape(1472, 1472))
y_val.append(y_test)
preds = np.stack(preds, axis=0)
y_val = np.stack(y_val, axis=0)
thresholds = list(np.linspace(0.1, 0.9, 10))
ious = np.array([iou_metric_batch(y_val, np.int32(preds > threshold)) for threshold in (thresholds)])
best_threshold, best_iou = draw_get_best_threshold(ious, thresholds)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment