Last active
November 2, 2022 11:30
-
-
Save prerakmody/1b6f349db1634846303fac27060805cc to your computer and use it in GitHub Desktop.
Netron.app example to visualize a tensorflow 2.x model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
pip install tensorflow | |
pip install tf2onnx keras2onnx onnxmltools | |
""" | |
import os | |
import pdb | |
import json | |
import traceback | |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
import tensorflow as tf # v2.4 | |
if len(tf.config.list_physical_devices('GPU')): | |
tf.config.experimental.set_memory_growth(tf.config.list_physical_devices('GPU')[0], True) | |
############################################################ | |
# FOCUSNET # | |
############################################################ | |
class ConvBlock3D(tf.keras.layers.Layer): | |
def __init__(self, filters, kernel_size=(3,3,3), strides=(1, 1, 1), padding='same' | |
, dilation_rate=(1,1,1) | |
, activation='relu' | |
, trainable=False | |
, dropout=None | |
, pool=False | |
, name=''): | |
super(ConvBlock3D, self).__init__(name='{}_ConvBlock3D'.format(name)) | |
self.pool = pool | |
self.conv_layer = tf.keras.Sequential(name='{}_Sequential'.format(name)) | |
for filter_id, filter_count in enumerate(filters): | |
self.conv_layer.add( | |
tf.keras.layers.Conv3D(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding | |
, dilation_rate=dilation_rate | |
, activation=activation | |
, kernel_regularizer=tf.keras.regularizers.l2(0.1) | |
, name='Conv_{}'.format(filter_id)) | |
) | |
self.conv_layer.add(tf.keras.layers.BatchNormalization(trainable=trainable, name='BNorm_{}'.format(filter_id))) | |
if filter_id == 0 and dropout is not None: | |
self.conv_layer.add(tf.keras.layers.Dropout(rate=dropout, name='DropOut_{}'.format(filter_id))) | |
if self.pool: | |
self.pool_layer = tf.keras.layers.MaxPooling3D((2,2,2), strides=(2,2,2), name='{}_Pool'.format(name)) | |
def call(self, x): | |
x = self.conv_layer(x) | |
if self.pool: | |
return x, self.pool_layer(x) | |
else: | |
return x | |
def get_config(self): | |
if self.pool: | |
return {'conv_layer': self.conv_layer, 'pool_layer': self.pool_layer} | |
else: | |
return {'conv_layer': self.conv_layer} | |
class ConvBlock3DSERes(tf.keras.layers.Layer): | |
""" | |
For channel-wise attention | |
""" | |
def __init__(self, filters, kernel_size=(3,3,3), strides=(1, 1, 1), padding='same' | |
, dilation_rate=(1,1,1) | |
, activation='relu' | |
, trainable=False | |
, dropout=None | |
, pool=False | |
, squeeze_ratio=None | |
, init=False | |
, name=''): | |
super(ConvBlock3DSERes, self).__init__(name='{}_ConvBlock3DSERes'.format(name)) | |
self.init = init | |
if self.init: | |
self.convblock_filterequalizer = tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=(1,1,1), padding='same' | |
, activation='relu', name='{}_ConnvBlockInit'.format(name)) | |
self.convblock_res = ConvBlock3D(filters=filters, kernel_size=kernel_size, strides=strides, padding=padding | |
, dilation_rate=dilation_rate | |
, activation=activation | |
, trainable=trainable | |
, dropout=dropout | |
, pool=False | |
, name=name | |
) | |
""" | |
Ref: https://github.com/imkhan2/se-resnet/blob/master/se_resnet.py | |
""" | |
self.squeeze_ratio = squeeze_ratio | |
if self.squeeze_ratio is not None: | |
self.seblock = tf.keras.Sequential(name='{}_SqueezeExcitation'.format(name)) | |
self.seblock.add(tf.keras.layers.GlobalAveragePooling3D()) | |
self.seblock.add(tf.keras.layers.Reshape(target_shape=(1,1,1,filters[0]))) | |
self.seblock.add(tf.keras.layers.Conv3D(filters=filters[0]//squeeze_ratio, kernel_size=(1,1,1), strides=(1,1,1), padding='same' | |
, activation='relu')) | |
self.seblock.add(tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=(1,1,1), padding='same' | |
, activation='sigmoid')) | |
self.pool = pool | |
if self.pool: | |
self.pool_layer = tf.keras.layers.MaxPooling3D((2,2,2), strides=(2,2,2), name='{}_Pool'.format(name)) | |
def call(self, x): | |
if self.init: | |
x = self.convblock_filterequalizer(x) | |
x_res = self.convblock_res(x) | |
if self.squeeze_ratio is not None: | |
x_se = self.seblock(x_res) # squeeze and then get excitation factor | |
x_res = tf.math.multiply(x_res, x_se) # excited block | |
y = x + x_res | |
if self.pool: | |
return y, self.pool_layer(y) | |
else: | |
return y | |
def get_config(self): | |
config = { | |
'convblock_res': self.convblock_res | |
, 'seblock': self.seblock | |
} | |
if self.init: | |
config['convblock_filterequalizer'] = self.convblock_filterequalizer | |
if self.pool: | |
config['pool_layer'] = self.pool_layer | |
return config | |
class UpConvBlock3D(tf.keras.layers.Layer): | |
def __init__(self, filters, kernel_size=(2,2,2), strides=(2, 2, 2), padding='same', trainable=False, name=''): | |
super(UpConvBlock3D, self).__init__(name='{}_UpConv3D'.format(name)) | |
self.upconv_layer = tf.keras.Sequential(name='{}_Sequential'.format(name)) | |
self.upconv_layer.add(tf.keras.layers.Conv3DTranspose(filters, kernel_size, strides, padding=padding | |
, activation='relu' | |
, kernel_regularizer=tf.keras.regularizers.l2(0.1) | |
, name='UpConv_{}'.format(self.name)) | |
) | |
# self.upconv_layer.add(tf.keras.layers.BatchNormalization(trainable=trainable)) | |
def call(self, x): | |
return self.upconv_layer(x) | |
def get_config(self): | |
return {'upconv_layer': self.upconv_layer} | |
class ModelFocusNetZDil1(tf.keras.Model): | |
def __init__(self, class_count, activation='softmax', deepsup=False, trainable=False, verbose=False): | |
super(ModelFocusNetZDil1, self).__init__(name='ModelFocusNetZDil1') | |
self.verbose = verbose | |
self.deepsup = deepsup | |
dropout = [None, 0.25, 0.25, 0.25, 0.25, 0.25, None, None] | |
filters = [[10,10], [20,20]] | |
dilation_xy = [1, 2, 3, 6, 12, 18] | |
dilation_z = [1, 1, 1, 1, 1 , 1] | |
# Feat Extraction (SE-Res Blocks) | |
self.convblock1 = ConvBlock3DSERes(filters=filters[0], kernel_size=(3,3,1), dilation_rate=(dilation_xy[0], dilation_xy[0], dilation_z[0]), trainable=trainable, dropout=dropout[0], pool=True , squeeze_ratio=2, name='Block1') # Dim/2 (e.g. 96/2=48, 240/2=120)(rp=(3,5,10),(3,5,10)) | |
self.convblock2 = ConvBlock3DSERes(filters=filters[0], dilation_rate=(dilation_xy[1], dilation_xy[1], dilation_z[1]), trainable=trainable, dropout=dropout[0], pool=False, squeeze_ratio=2, name='Block2') # Dim/2 (e.g. 96/2=48, 240/2=120)(rp=(14,18),(12,14)) | |
# Dense ASPP | |
self.convblock3 = ConvBlock3D(filters=filters[1][:], dilation_rate=(dilation_xy[2], dilation_xy[2], dilation_z[2]), trainable=trainable, dropout=dropout[1], pool=False, name='Block3_ASPP') # Dim/2 (e.g. 96/2=48, 240/2=120) (rp=(24,30),(16,18)) | |
self.convblock4 = ConvBlock3D(filters=filters[1][:], dilation_rate=(dilation_xy[3], dilation_xy[3], dilation_z[3]), trainable=trainable, dropout=dropout[2], pool=False, name='Block4_ASPP') # Dim/2 (e.g. 96/2=48, 240/2=120) (rp=(42,54),(20,22)) | |
self.convblock5 = ConvBlock3D(filters=filters[1][:], dilation_rate=(dilation_xy[4], dilation_xy[4], dilation_z[4]), trainable=trainable, dropout=dropout[3], pool=False, name='Block5_ASPP') # Dim/2 (e.g. 96/2=48, 240/2=120) (rp=(78,102),(24,26)) | |
self.convblock6 = ConvBlock3D(filters=filters[1][:], dilation_rate=(dilation_xy[5], dilation_xy[5], dilation_z[5]), trainable=trainable, dropout=dropout[4], pool=False, name='Block6_ASPP') # Dim/2 (e.g. 96/2=48, 240/2=120) (rp=(138,176),(28,30)) | |
self.convblock7 = ConvBlock3DSERes(filters=filters[1], dilation_rate=(dilation_xy[0], dilation_xy[0], dilation_z[0]), trainable=trainable, dropout=dropout[5], pool=False, squeeze_ratio=2, init=True, name='Block7') # Dim/2 (e.g. 96/2=48) (rp=(176,180),(32,44)) | |
# Upstream | |
self.convblock8 = ConvBlock3DSERes(filters=filters[1], dilation_rate=(dilation_xy[0], dilation_xy[0], dilation_z[0]), trainable=trainable, dropout=dropout[6], pool=False, squeeze_ratio=2, init=True, name='Block8') # Dim/2 (e.g. 96/2=48) | |
if self.deepsup: | |
self.convblock8_1 = tf.keras.layers.Conv3D(filters=class_count, strides=(1,1,1), kernel_size=(1,1,1), padding='same' | |
, dilation_rate=(1,1,1) | |
, activation=activation | |
, name='Block8_1') | |
self.upconvblock9 = UpConvBlock3D(filters=filters[0][0], trainable=trainable, name='Block9_1') # Dim/1 (e.g. 96/1 = 96) | |
self.convblock9 = ConvBlock3DSERes(filters=filters[0], dilation_rate=(dilation_xy[0], dilation_xy[0], dilation_z[0]), trainable=trainable, dropout=dropout[7], pool=False, squeeze_ratio=2, init=True, name='Block9') # Dim/1 (e.g. 96/1 = 96) | |
# Final | |
self.convblock10 = tf.keras.layers.Conv3D(filters=class_count, strides=(1,1,1), kernel_size=(1,1,1), padding='same' | |
, dilation_rate=(1,1,1) | |
, activation=activation | |
, name='Block10') | |
def call(self, x): | |
# Feat Extraction (SE-Res Blocks) | |
conv1, pool1 = self.convblock1(x) | |
conv2 = self.convblock2(pool1) | |
# Dense ASPP | |
conv3 = self.convblock3(conv2) | |
conv3_op = tf.concat([conv2, conv3], axis=-1) | |
conv4 = self.convblock4(conv3_op) | |
conv4_op = tf.concat([conv3_op, conv4], axis=-1) | |
conv5 = self.convblock5(conv4_op) | |
conv5_op = tf.concat([conv4_op, conv5], axis=-1) | |
conv6 = self.convblock6(conv5_op) | |
conv6_op = tf.concat([conv5_op, conv6], axis=-1) | |
conv7 = self.convblock7(conv6_op) | |
# Upstream | |
# Pixel-wise attention can be added here | |
conv8 = self.convblock8(tf.concat([pool1, conv7], axis=-1)) | |
if self.deepsup: | |
conv8_1 = self.convblock8_1(conv8) | |
up9 = self.upconvblock9(conv8) | |
# Pixel-wise attention can be added here | |
conv9 = self.convblock9(tf.concat([conv1, up9], axis=-1)) | |
# Final | |
conv10 = self.convblock10(conv9) | |
if self.verbose: | |
print (' ---------- Model: ', self.name) | |
print (' - x: ', x.shape) | |
print (' - conv1: ', conv1.shape) | |
print (' - conv2: ', conv2.shape) | |
print (' - conv3_op: ', conv3_op.shape) | |
print (' - conv4_op: ', conv4_op.shape) | |
print (' - conv5_op: ', conv5_op.shape) | |
print (' - conv6_op: ', conv6_op.shape) | |
print (' - conv7: ', conv7.shape) | |
print (' - conv8: ', conv8.shape) | |
print (' - conv9: ', conv9.shape) | |
print (' - conv10: ', conv10.shape) | |
if self.deepsup: | |
return conv8_1, conv10 | |
else: | |
return conv10 | |
def build_graph(self, dim): | |
x = tf.keras.Input(shape=(dim), name='{}-Input'.format(self.name)) | |
return tf.keras.Model(inputs=[x], outputs=self.call(x)) | |
def get_config(self): | |
config = { | |
'convblock1': self.convblock1 | |
, 'convblock2': self.convblock2 | |
, 'convblock3': self.convblock3 | |
, 'convblock4': self.convblock4 | |
, 'convblock5': self.convblock5 | |
, 'convblock6': self.convblock6 | |
, 'convblock7': self.convblock7 | |
, 'convblock8': self.convblock8 | |
, 'convblock9': self.convblock9 | |
, 'convblock10': self.convblock10 | |
} | |
if self.deepsup: | |
config['convblock8_1'] = self.convblock8_1 | |
return config | |
############################################################ | |
# UTILS # | |
############################################################ | |
@tf.function | |
def write_model_trace(model, X): | |
return model(X) | |
if __name__ == "__main__": | |
if 1: | |
print (' --------------------- [ModelFocusNetZDil1] --------------------- ') | |
# Using the summary() function | |
raw_shape = (140, 140, 40, 1) | |
model = ModelFocusNetZDil1(class_count=10) | |
model = model.build_graph(raw_shape) | |
model.summary(line_length=150) | |
# Using the save() function | |
_ = model(tf.ones((1,*raw_shape))) | |
model.save('ModelFocusNetZDil1') # Keras ModelSave format; unable to properly visualize .pb file in www.netron.app | |
model.save('ModelFocusNetZDil1.h5', save_format='h5') # Loads in www.netron.app for viz purposes, shows the high level blocks | |
# Using the .to_json() and .get_config() function | |
with open('ModelFocusNetZDil1.json', 'w') as fp: | |
json.dump(json.loads(model.to_json()), fp, indent=4) | |
# model.get_config() | |
# Using tf2onnx | |
import onnx | |
import tf2onnx | |
model_proto, external_tensor_storage = tf2onnx.convert.from_keras(model, [tf.TensorSpec((1,*raw_shape), tf.float32, name='ModelInput')]) # tensorspec ensures that the batch size shows up properly in tf2onnx | |
model_onnx = onnx.shape_inference.infer_shapes(model_proto) | |
tf2onnx.utils.save_protobuf('ModelFocusNetZDil1_tf2onxx.onnx', model_onnx) | |
# Using tensorboard | |
tf.summary.trace_on(graph=True, profiler=False) | |
_ = write_model_trace(model, tf.ones(shape=(1,*raw_shape), dtype=tf.float32)) | |
writer = tf.summary.create_file_writer(str('ModelFocusNetZDil1_TBoard')) | |
with writer.as_default(): | |
tf.summary.trace_export(name=model.name, step=0, profiler_outdir=None) | |
writer.flush() | |
print (' - Run command --> tensorboard --logdir=ModelFocusNetZDil1_TBoard --port=6100') | |
pdb.set_trace() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
pip install tensorflow | |
pip install tf2onnx | |
""" | |
import os | |
import pdb | |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
import tensorflow as tf # v2.4 | |
if len(tf.config.list_physical_devices('GPU')): | |
tf.config.experimental.set_memory_growth(tf.config.list_physical_devices('GPU')[0], True) | |
############################################################ | |
# INCEPTION MODEL # | |
############################################################ | |
# Model Ref: https://towardsdatascience.com/model-sub-classing-and-custom-training-loop-from-scratch-in-tensorflow-2-cc1d4f10fb4e | |
class ConvModule(tf.keras.layers.Layer): | |
def __init__(self, kernel_num, kernel_size, strides, padding='same'): | |
super(ConvModule, self).__init__() | |
self.conv = tf.keras.layers.Conv2D(kernel_num, kernel_size=kernel_size, strides=strides, padding=padding) | |
self.bn = tf.keras.layers.BatchNormalization() | |
def call(self, input_tensor): | |
x = self.conv(input_tensor) | |
x = self.bn(x) | |
x = tf.nn.relu(x) | |
return x | |
def get_config(self): | |
return {'conv': self.conv, 'bn':self.bn} | |
class InceptionModule(tf.keras.layers.Layer): | |
def __init__(self, kernel_size1x1, kernel_size3x3, name=''): | |
super(InceptionModule, self).__init__('InceptionModule_{}'.format(name)) | |
self.conv1 = ConvModule(kernel_size1x1, kernel_size=(1,1), strides=(1,1)) | |
self.conv2 = ConvModule(kernel_size3x3, kernel_size=(3,3), strides=(1,1)) | |
self.cat = tf.keras.layers.Concatenate() | |
def call(self, input_tensor): | |
x_1x1 = self.conv1(input_tensor) | |
x_3x3 = self.conv2(input_tensor) | |
x = self.cat([x_1x1, x_3x3]) | |
return x | |
def get_config(self): | |
return {'conv1':self.conv1, 'conv2':self.conv2, 'cat':self.cat} | |
class DownsampleModule(tf.keras.layers.Layer): | |
def __init__(self, kernel_size, name=''): | |
super(DownsampleModule, self).__init__(name='DownsampleModule_{}'.format(name)) | |
self.conv3 = ConvModule(kernel_size, kernel_size=(3,3), strides=(2,2), padding="valid") | |
self.pool = tf.keras.layers.MaxPooling2D(pool_size=(3, 3), strides=(2,2)) | |
self.cat = tf.keras.layers.Concatenate() | |
def call(self, input_tensor): | |
conv_x = self.conv3(input_tensor) | |
pool_x = self.pool(input_tensor) | |
return self.cat([conv_x, pool_x]) | |
def get_config(self): | |
return {'conv3':self.conv3, 'pool':self.pool, 'cat':self.cat} | |
class MiniInception(tf.keras.Model): | |
def __init__(self, num_classes=10): | |
super(MiniInception, self).__init__(name='MiniInception') | |
# the first conv module | |
self.conv_block = ConvModule(96, (3,3), (1,1)) | |
# 2 inception module and 1 downsample module | |
self.inception_block1 = InceptionModule(32, 32, name='Block1') | |
self.inception_block2 = InceptionModule(32, 48, name='Block2') | |
self.downsample_block1 = DownsampleModule(80, name='Block1') | |
# 4 inception module and 1 downsample module | |
self.inception_block3 = InceptionModule(112, 48, name='Block3') | |
self.inception_block4 = InceptionModule(96, 64, name='Block4') | |
self.inception_block5 = InceptionModule(80, 80, name='Block5') | |
self.inception_block6 = InceptionModule(48, 96, name='Block6') | |
self.downsample_block2 = DownsampleModule(96, name='Block2') | |
# 2 inception module | |
self.inception_block7 = InceptionModule(176, 160, name='Block7') | |
self.inception_block8 = InceptionModule(176, 160, name='Block8') | |
# average pooling | |
self.avg_pool = tf.keras.layers.AveragePooling2D((7,7)) | |
# model tail | |
self.flat = tf.keras.layers.Flatten() | |
self.classfier = tf.keras.layers.Dense(num_classes, activation='softmax') | |
def call(self, input_tensor): | |
# forward pass | |
x = self.conv_block(input_tensor) | |
x = self.inception_block1(x) | |
x = self.inception_block2(x) | |
x = self.downsample_block1(x) | |
x = self.inception_block3(x) | |
x = self.inception_block4(x) | |
x = self.inception_block5(x) | |
x = self.inception_block6(x) | |
x = self.downsample_block2(x) | |
x = self.inception_block7(x) | |
x = self.inception_block8(x) | |
x = self.avg_pool(x) | |
x = self.flat(x) | |
return self.classfier(x) | |
def build_graph(self, raw_shape): | |
x = tf.keras.layers.Input(shape=raw_shape) | |
return tf.keras.Model(inputs=[x], outputs=self.call(x)) | |
def get_config(self): | |
return { | |
'conv_block':self.conv_block | |
, 'inception_block1': self.inception_block1 | |
, 'inception_block2': self.inception_block2 | |
, 'downsample_block1': self.downsample_block1 | |
, 'inception_block3':self.inception_block3 | |
, 'inception_block4':self.inception_block4 | |
, 'inception_block5':self.inception_block5 | |
, 'inception_block6':self.inception_block6 | |
, 'downsample_block2': self.downsample_block2 | |
, 'inception_block7':self.inception_block7 | |
, 'inception_block8':self.inception_block8 | |
, 'avg_pool': self.avg_pool | |
, 'flat':self.flat | |
, 'classfier':self.classfier | |
} | |
############################################################ | |
# UTILS # | |
############################################################ | |
@tf.function | |
def write_model_trace(model, X): | |
return model(X) | |
if __name__ == "__main__": | |
if 1: | |
print (' --------------------- [MiniInception] --------------------- ') | |
# Using the summary() function | |
raw_shape = (32, 32, 3) | |
model = MiniInception() # tf.keras.Model | |
model = model.build_graph(raw_shape) # <class 'tensorflow.python.keras.engine.functional.Functional'> | |
model.summary(line_length=150) | |
# Using the save() function | |
_ = model(tf.ones((1,*raw_shape))) | |
model.save('MiniInception') # Keras ModelSave format; unable to properly visualize .pb file in www.netron.app | |
model.save('MiniInception.h5') # Loads in www.netron.app for viz purposes, shows the high level blocks | |
# Using the .to_json() and .get_config() function | |
with open('MiniInception.json', 'w') as fp: | |
json.dump(json.loads(model.to_json()), fp, indent=4) | |
# model.get_config() | |
# Using onnxmltools (does not support Conv3D, MaxPool3D, Conv3DTranspose) | |
import onnx | |
import onnxmltools | |
model_onnx = onnxmltools.convert_keras(model, model.name) # this is a ModelProto | |
model_onnx = onnx.shape_inference.infer_shapes(model_onnx) | |
onnxmltools.utils.save_model(model_onnx, 'MiniInception_onnxmltools.onnx') | |
# Using keras2onnx | |
import keras2onnx | |
model_onnx = keras2onnx.convert_keras(model, model.name) | |
model_onnx = onnx.shape_inference.infer_shapes(model_onnx) | |
keras2onnx.save_model(model_onnx, 'MiniInception_keras2onxx.onnx') | |
# Using tf2onnx | |
import tf2onnx | |
model_proto, external_tensor_storage = tf2onnx.convert.from_keras(model, [tf.TensorSpec((1,*raw_shape), tf.float32, name='ModelInput')]) # tensorspec ensures that the batch size shows up properly in tf2onnx | |
model_onnx = onnx.shape_inference.infer_shapes(model_proto) | |
tf2onnx.utils.save_protobuf('MiniInception_tf2onxx.onnx', model_onnx) | |
# Using tensorboard | |
tf.summary.trace_on(graph=True, profiler=False) | |
_ = write_model_trace(model, tf.ones(shape=(1,*raw_shape), dtype=tf.float32)) | |
writer = tf.summary.create_file_writer(str('MiniInception_TBoard')) | |
with writer.as_default(): | |
tf.summary.trace_export(name=model.name, step=0, profiler_outdir=None) | |
writer.flush() | |
print (' - Run command --> tensorboard --logdir=MiniInception_TBoard --port=6100') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import internal libraries | |
import src.config as config | |
# Import external libraries | |
import pdb | |
import time | |
import traceback | |
import numpy as np | |
import tensorflow as tf | |
import tensorflow_addons as tfa | |
import tensorflow_probability as tfp | |
print (' - tf : ', tf.__version__) # 2.9.1 | |
print (' - tfa: ', tfa.__version__) # 0.17.1 | |
print (' - tfp: ', tfp.__version__) # 0.17.0 | |
############################################################ | |
# 3D MODEL BLOCKS # | |
############################################################ | |
class ConvBlock3DTrial(tf.keras.layers.Layer): | |
""" | |
Performs a series of 3D convolutions which are residual in nature | |
""" | |
def __init__(self, filters, dilation_rates, kernel_size=(3,3,3), strides=(1,1,1), padding='same' | |
, activation=tf.nn.relu | |
, group_factor=4 | |
, trainable=False | |
, dropout=None | |
, residual=True | |
, init_filters=False | |
, bayesian=False | |
, spectral=False | |
, pool=None | |
, name=''): | |
super(ConvBlock3D, self).__init__(name='{}_ConvBlock3DTrial'.format(name)) | |
# Step 0 - Init | |
self.init_filters = init_filters | |
self.pool = pool | |
self.residual = residual | |
if type(filters) == int: | |
filters = [filters] | |
if spectral: | |
f = lambda x: tfa.layers.SpectralNormalization(x) | |
else: | |
f = lambda x: x | |
# Step 1 - Set the filters right so that the residual can be done | |
if self.init_filters: | |
self.init_layer = tf.keras.Sequential(name='{}_Conv1x1Seq'.format(self.name)) | |
if bayesian: | |
self.init_layer.add( | |
f( | |
tfp.layers.Convolution3DFlipout(filters=filters[0], kernel_size=(1,1,1), strides=strides, padding=padding | |
, dilation_rate=1 | |
, activation=None | |
, name='{}_Conv1x1Flip'.format(self.name)) | |
) | |
) | |
else: | |
self.init_layer.add( | |
f( | |
tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=strides, padding=padding | |
, dilation_rate=1 | |
, activation=None | |
, name='{}_Conv1x1'.format(self.name)) | |
) | |
) | |
self.init_layer.add(tfa.layers.GroupNormalization(groups=filters[0]//group_factor, trainable=trainable)) | |
self.init_layer.add(tf.keras.layers.Activation(activation)) | |
# Step 2 - Create residual block | |
self.conv_layer = tf.keras.Sequential(name='{}_ConvSeq'.format(self.name)) | |
for filter_id, filter_count in enumerate(filters): | |
# Step 2.0 - DropOut or not | |
# print (' - name: {} || dropout: {}'.format(self.name, dropout) ) | |
if dropout is not None: | |
self.conv_layer.add(tf.keras.layers.Dropout(rate=dropout, name='{}_DropOut_{}'.format(self.name, filter_id))) # before every conv layer (could also be after every layer?) | |
spectral = True | |
if spectral: | |
f = lambda x: tfa.layers.SpectralNormalization(x) | |
else: | |
f = lambda x: x | |
# Step 2.1 - Bayesian or not | |
if bayesian: | |
self.conv_layer.add( | |
f( | |
tfp.layers.Convolution3DFlipout(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding | |
, dilation_rate=dilation_rates[filter_id] | |
, activation=None | |
, name='{}_Conv3DFlip_{}'.format(self.name, filter_id)) | |
) | |
) | |
else: | |
self.conv_layer.add( | |
f( | |
tf.keras.layers.Conv3D(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding | |
, dilation_rate=dilation_rates[filter_id] | |
, activation=None | |
, name='{}_Conv3D_{}'.format(self.name, filter_id)) | |
) | |
) | |
self.conv_layer.add(tfa.layers.GroupNormalization(groups=filter_count//group_factor, trainable=trainable)) | |
# Step 2.2 - Residual or not | |
if self.residual: | |
if filter_id != len(filters) -1: # dont add activation in the last conv of the block | |
self.conv_layer.add(tf.keras.layers.Activation(activation)) | |
else: | |
self.conv_layer.add(tf.keras.layers.Activation(activation)) | |
# Step 2.1 - Finish residual block | |
if self.residual: | |
self.activation_layer = tf.keras.layers.Activation(activation) | |
# Step 3 - Learnt Pooling | |
if self.pool is not None: | |
self.pool_layer = f(tf.keras.layers.Conv3D(filters=filters[0], kernel_size=self.pool, strides=self.pool, padding=padding | |
, dilation_rate=(1,1,1) | |
, activation=None | |
, name='{}_Conv3DPooling'.format(self.name))) | |
def call(self, x): | |
# Step 1 | |
if self.init_filters: | |
x = self.init_layer(x) | |
# Step 2 | |
if self.residual: | |
x_ = self.conv_layer(x) # Conv-GN-ReLU -- Conv-GN | |
x = self.activation_layer(x_ + x) # RelU | |
else: | |
x = self.conv_layer(x) | |
# Step 3 | |
if self.pool: | |
x_pool = self.pool_layer(x) | |
return x, x_pool | |
else: | |
return x | |
class ConvBlock3D(tf.keras.layers.Layer): | |
""" | |
Performs a series of 3D convolutions which are residual in nature | |
""" | |
def __init__(self, filters, dilation_rates, kernel_size=(3,3,3), strides=(1,1,1), padding='same' | |
, activation=tf.nn.relu | |
, group_factor=4 | |
, trainable=False | |
, dropout=None | |
, residual=True | |
, init_filters=False | |
, bayesian=False | |
, spectral=False | |
, pool=None | |
, name=''): | |
super(ConvBlock3D, self).__init__(name='{}_ConvBlock3D'.format(name)) | |
# Step 0 - Init | |
self.init_filters = init_filters | |
self.pool = pool | |
self.residual = residual | |
if type(filters) == int: | |
filters = [filters] | |
# Step 1 - Set the filters right so that the residual can be done | |
if self.init_filters: | |
self.init_layer = tf.keras.Sequential(name='{}_Conv1x1Seq'.format(self.name)) | |
if bayesian: | |
self.init_layer.add( | |
# | |
tfp.layers.Convolution3DFlipout(filters=filters[0], kernel_size=(1,1,1), strides=strides, padding=padding | |
, dilation_rate=1 | |
, activation=None | |
, name='{}_Conv1x1Flip'.format(self.name)) | |
) | |
else: | |
self.init_layer.add( | |
tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=strides, padding=padding | |
, dilation_rate=1 | |
, activation=None | |
, name='{}_Conv1x1'.format(self.name)) | |
) | |
self.init_layer.add(tfa.layers.GroupNormalization(groups=filters[0]//group_factor, trainable=trainable)) | |
self.init_layer.add(tf.keras.layers.Activation(activation)) | |
# Step 2 - Create residual block | |
self.conv_layer = tf.keras.Sequential(name='{}_ConvSeq'.format(self.name)) | |
for filter_id, filter_count in enumerate(filters): | |
# Step 2.0 - DropOut or not | |
# print (' - name: {} || dropout: {}'.format(self.name, dropout) ) | |
if dropout is not None: | |
self.conv_layer.add(tf.keras.layers.Dropout(rate=dropout, name='{}_DropOut_{}'.format(self.name, filter_id))) # before every conv layer (could also be after every layer?) | |
# Step 2.1 - Bayesian or not | |
if bayesian: | |
self.conv_layer.add( | |
tfp.layers.Convolution3DFlipout(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding | |
, dilation_rate=dilation_rates[filter_id] | |
, activation=None | |
, name='{}_Conv3DFlip_{}'.format(self.name, filter_id)) | |
) | |
else: | |
self.conv_layer.add( | |
tf.keras.layers.Conv3D(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding | |
, dilation_rate=dilation_rates[filter_id] | |
, activation=None | |
, name='{}_Conv3D_{}'.format(self.name, filter_id)) | |
) | |
self.conv_layer.add(tfa.layers.GroupNormalization(groups=filter_count//group_factor, trainable=trainable)) | |
# Step 2.2 - Residual or not | |
if self.residual: | |
if filter_id != len(filters) -1: # dont add activation in the last conv of the block | |
self.conv_layer.add(tf.keras.layers.Activation(activation)) | |
else: | |
self.conv_layer.add(tf.keras.layers.Activation(activation)) | |
# Step 2.1 - Finish residual block | |
if self.residual: | |
self.activation_layer = tf.keras.layers.Activation(activation) | |
# Step 3 - Learnt Pooling | |
if self.pool is not None: | |
self.pool_layer = tf.keras.layers.Conv3D(filters=filters[0], kernel_size=self.pool, strides=self.pool, padding=padding | |
, dilation_rate=(1,1,1) | |
, activation=None | |
, name='{}_Conv3DPooling'.format(self.name)) | |
def call(self, x): | |
# Step 1 | |
if self.init_filters: | |
x = self.init_layer(x) | |
# Step 2 | |
if self.residual: | |
x_ = self.conv_layer(x) # Conv-GN-ReLU -- Conv-GN | |
x = self.activation_layer(x_ + x) # RelU | |
else: | |
x = self.conv_layer(x) | |
# Step 3 | |
if self.pool: | |
x_pool = self.pool_layer(x) | |
return x, x_pool | |
else: | |
return x | |
def get_config(self): | |
config = {} | |
if self.init_filters: | |
config['init_filters'] = self.init_layer | |
config['conv_layer'] = self.conv_layer | |
if self.residual: | |
config['residual'] = self.activation_layer | |
if self.pool: | |
config['pool'] = self.pool_layer | |
return config | |
class UpConvBlock(tf.keras.layers.Layer): | |
def __init__(self, filters, kernel_size=(2,2,2), strides=(2, 2, 2), padding='same', spectral=False, trainable=False, name=''): | |
super(UpConvBlock, self).__init__(name='{}_UpConvBlock'.format(name)) | |
if spectral: | |
f = lambda x: tfa.layers.SpectralNormalization(x) | |
else: | |
f = lambda x: x | |
self.upconv_layer = tf.keras.Sequential() | |
self.upconv_layer.add( | |
f( | |
tf.keras.layers.Conv3DTranspose(filters=filters, kernel_size=kernel_size, strides=kernel_size, padding=padding | |
, activation=None | |
, name='{}__ConvTranspose'.format(name)) | |
) | |
) | |
def call(self, x): | |
return self.upconv_layer(x) | |
def get_config(self): | |
return {'upconv_layer': self.upconv_layer} | |
class ConvBlockSERes(tf.keras.layers.Layer): | |
def __init__(self, filters, dilation_rates, kernel_size=(3,3,3), strides=(1, 1, 1), padding='same' | |
, activation=tf.nn.relu | |
, group_factor=4 | |
, trainable=False | |
, dropout=None | |
, init_filters=False | |
, bayesian=False | |
, spectral=False | |
, pool=None | |
, squeeze_ratio=2 | |
, name=''): | |
super(ConvBlockSERes, self).__init__(name='{}_ConvSERes'.format(name)) | |
# Step 0 - Init | |
self.pool = pool | |
if spectral: | |
f = lambda x: tfa.layers.SpectralNormalization(x) | |
else: | |
f = lambda x: x | |
# Step 1 - ConvBlock | |
assert len(filters) == len(dilation_rates) # eg. len([32,32,32]) = len([(1,1,1), (3,3,1), (5,5,1)]) | |
self.convblock_res = ConvBlock3D(filters=filters, dilation_rates=dilation_rates, kernel_size=kernel_size, strides=strides, padding=padding | |
, activation=activation | |
, group_factor=group_factor | |
, trainable=trainable | |
, dropout=dropout | |
, init_filters=init_filters | |
, bayesian=bayesian | |
, pool=None | |
, name='{}'.format(self.name) | |
) | |
# Step 2 - Squeeze and Excitation | |
## Ref: https://github.com/imkhan2/se-resnet/blob/master/se_resnet.py | |
self.seblock = tf.keras.Sequential(name='{}_SERes'.format(name)) | |
self.seblock.add(tf.keras.layers.GlobalAveragePooling3D()) | |
self.seblock.add(tf.keras.layers.Reshape(target_shape=(1,1,1,filters[0]))) | |
self.seblock.add(tf.keras.layers.Conv3D(filters=filters[0]//squeeze_ratio, kernel_size=(1,1,1), strides=(1,1,1), padding='same' | |
, activation=tf.nn.relu)) | |
self.seblock.add(tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=(1,1,1), padding='same' | |
, activation=tf.nn.sigmoid)) | |
# Step 3 - Pooling | |
# Step 3 - Learnt Pooling | |
if self.pool is not None: | |
self.pool_layer = f(tf.keras.layers.Conv3D(filters=filters[0], kernel_size=self.pool, strides=self.pool, padding=padding | |
, dilation_rate=(1,1,1) | |
, activation=None | |
, name='{}_Conv3DPooling'.format(self.name))) | |
def call(self, x): | |
# Step 1 - Conv Block | |
x_res = self.convblock_res(x) | |
# Step 2.1 - Squeeze and Excitation | |
x_se = self.seblock(x_res) # squeeze and then get excitation factor | |
# Step 2.2 | |
y = x_res + tf.math.multiply(x_res, x_se) # excited block | |
# Step 3 - Pooling | |
if self.pool is not None: | |
return y, self.pool_layer(y) | |
else: | |
return y | |
def get_config(self): | |
config = {'convblock_res': self.convblock_res, 'seblock': self.seblock} | |
if self.pool: | |
config['pool_layer'] = self.pool_layer | |
return config | |
############################################################ | |
# 3D BACKENDS # | |
############################################################ | |
class FocusNetBackend(tf.keras.layers.Layer): | |
""" | |
Folows "FocusNet: Imbalanced Large and Small Organ Segmentation with an End-to-End Deep Neural Network for Head and Neck CT Images" | |
""" | |
def __init__(self, filters, dil_rates, trainable=False, verbose=False): | |
super(FocusNetBackend, self).__init__(name='FocusNetBackend') | |
self.convblock1 = ConvBlock3D(filters=filters[0] , dilation_rates=dil_rates[0], init_filters=True , bayesian=False, trainable=trainable, pool=(2,2,2), name='Block1') | |
self.convblock2 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=True , bayesian=False, trainable=trainable, pool=None , name='Block2') | |
self.convblock3 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=False, bayesian=False, trainable=trainable, pool=None , name='Block3') | |
def call(self, x): | |
conv1, pool1 = self.convblock1(x) | |
conv2 = pool2 = self.convblock2(pool1) | |
conv3 = self.convblock3(pool2) | |
return conv1, pool1, conv2, pool2, conv3 | |
class OrganNetBackend(tf.keras.layers.Layer): | |
""" | |
Folows "A Novel Hybrid Convolutional Neural Network for Accurate Organ Segmentation in 3D Head and Neck CT Images" | |
""" | |
def __init__(self, filters, dil_rates, pooling='double', trainable=False, verbose=True): | |
super(OrganNetBackend, self).__init__(name='OrganNetBackend') | |
self.verbose = verbose | |
self.pooling = pooling | |
self.convblock1 = ConvBlock3D(filters=filters[0] , dilation_rates=dil_rates[0], init_filters=True , bayesian=False, trainable=trainable, pool=(2,2,1), name='Block1') | |
self.convblock2 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=True , bayesian=False, trainable=trainable, pool=(2,2,2), name='Block2') | |
if pooling == 'double': | |
self.convblock3 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=False , bayesian=False, trainable=trainable, pool=None , name='Block3NoPool') | |
elif pooling == 'triple': | |
self.convblock3 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=False , bayesian=False, trainable=trainable, pool=(2,2,2) , name='Block3YesPool') | |
def call(self, x): | |
conv1, pool1 = self.convblock1(x) | |
conv2, pool2 = self.convblock2(pool1) | |
if self.pooling == 'double': | |
conv3 = self.convblock3(pool2) | |
pool3 = None | |
elif self.pooling == 'triple': | |
conv3, pool3 = self.convblock3(pool2) | |
if self.verbose: | |
print (' - [OrganNetBackend] x: ', x.shape) | |
print (' - [OrganNetBackend] conv1: {} | pool1: {}'.format(conv1.shape, pool1.shape)) | |
print (' - [OrganNetBackend] conv2: {} | pool2: {}'.format(conv2.shape, pool2.shape)) | |
if self.pooling == 'double': | |
print (' - [OrganNetBackend] conv3: {} | pool3: {}'.format(conv3.shape, pool3)) | |
elif self.pooling == 'triple': | |
print (' - [OrganNetBackend] conv3: {} | pool3: {}'.format(conv3.shape, pool3.shape)) | |
return conv1, pool1, conv2, pool2, conv3, pool3 | |
def get_config(self): | |
config = {'convblock1': self.convblock1, 'convblock2': self.convblock2, 'convblock3': self.convblock3} | |
return config | |
def build_graph(self, dim): | |
x = tf.keras.Input(shape=(None,), name='{}-Input'.format(self.name)) | |
return tf.keras.Model(inputs=[x], outputs=self.call(x)) | |
class HDC(tf.keras.layers.Layer): | |
""" | |
Ref: Understanding Convolutions for Semantic Segmentation (https://arxiv.org/abs/1702.08502) | |
: https://gist.github.com/prerakmody/ac04e3ee4ee67cf66a4e6251d673993c | |
""" | |
def __init__(self, filters, dil_rates, dropout=None, bayesian=False, trainable=False, verbose=False): | |
super(HDC, self).__init__(name='HDC') | |
self.verbose = verbose | |
self.convblock4 = ConvBlockSERes(filters=filters[0], init_filters=False, dilation_rates=dil_rates[0], dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block4') | |
self.convblock5 = ConvBlockSERes(filters=filters[1], init_filters=False, dilation_rates=dil_rates[1], dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block5') | |
self.convblock6 = ConvBlockSERes(filters=filters[2], init_filters=False, dilation_rates=dil_rates[2], dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block6') | |
self.convblock7 = lambda x: x | |
def call(self, x): | |
conv4 = self.convblock4(x) | |
conv5 = self.convblock5(conv4) | |
conv6 = self.convblock6(conv5) | |
conv7 = self.convblock7(conv6) | |
if self.verbose: | |
print (' - [HDC] conv4: ', conv4.shape) | |
print (' - [HDC] conv5: ', conv5.shape) | |
print (' - [HDC] conv6: ', conv6.shape) | |
print (' - [HDC] conv7: ', conv7.shape) | |
return conv4, conv5, conv6, conv7 | |
def get_config(self): | |
config = {'convblock4': self.convblock4, 'convblock5': self.convblock5, 'convblock6': self.convblock6, 'convblock7': self.convblock7} | |
return config | |
class nonHDC(tf.keras.layers.Layer): | |
""" | |
Ref: Understanding Convolutions for Semantic Segmentation (https://arxiv.org/abs/1702.08502) | |
: https://gist.github.com/prerakmody/ac04e3ee4ee67cf66a4e6251d673993c | |
""" | |
def __init__(self, filters, dil_rates, dropout=None, bayesian=False, trainable=False, verbose=False): | |
super(nonHDC, self).__init__(name='nonHDC') | |
self.convblock4 = ConvBlockSERes(filters=filters[0], dilation_rates=dil_rates[0], init_filters=False, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block4') | |
self.convblock5 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=False, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block5') | |
self.convblock6 = ConvBlockSERes(filters=filters[2], dilation_rates=dil_rates[2], init_filters=False, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block6') | |
self.convblock7 = ConvBlockSERes(filters=filters[3], dilation_rates=dil_rates[3], init_filters=False, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block7') | |
def call(self, x): | |
conv4 = self.convblock4(x) | |
conv5 = self.convblock5(conv4) | |
conv6 = self.convblock6(conv5) | |
conv7 = self.convblock7(conv6) | |
return conv4, conv5, conv6, conv7 | |
class commonHead(tf.keras.layers.Layer): | |
def __init__(self, filters, pooling, dil_rates, filters_upsample, ksize_upsample, class_count, deepsup, dropout=None, bayesian=False, activation=tf.nn.softmax, trainable=False, verbose=False): | |
super(commonHead, self).__init__(name='commonHead') | |
# Step 0 - Init | |
self.deepsup = deepsup | |
self.pooling = pooling | |
if self.pooling == 'triple': | |
self.upconvblock8 = UpConvBlock(filters=filters_upsample[0], kernel_size=ksize_upsample[0], name='UpBlock8') | |
self.convblock8 = ConvBlockSERes(filters=filters[0], dilation_rates=dil_rates[0], init_filters=True, trainable=trainable, pool=None, name='Block8') | |
if filters_upsample[0] is not None: | |
self.upconvblock9 = UpConvBlock(filters=filters_upsample[0], kernel_size=ksize_upsample[0], name='UpBlock9') | |
else: | |
self.upconvblock9 = lambda x: x | |
if filters_upsample[1] is not None: | |
self.upconvblock10 = UpConvBlock(filters=filters_upsample[1], kernel_size=ksize_upsample[1], name='UpBlock10') | |
else: | |
self.upconvblock10 = lambda x: x | |
if not bayesian and dropout is None: | |
print (' - [models2.py][commonHead] bayesian: ', bayesian, ' || dropout: ', dropout) | |
print (' - [models2.py][commonHead] filters: ', filters) | |
self.convblock9 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=True, trainable=trainable, pool=None, name='Block9') | |
self.convblock10 = ConvBlockSERes(filters=filters[2], dilation_rates=dil_rates[2], init_filters=True, trainable=trainable, pool=None, name='Block10') | |
self.convblock11 = tf.keras.layers.Conv3D(filters=class_count, strides=(1,1,1), kernel_size=(1,1,1), padding='same' | |
, dilation_rate=(1,1,1) | |
, activation=activation | |
, name='Block11' | |
) | |
else: | |
self.convblock9 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=True, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block9-Flip') | |
self.convblock10 = ConvBlockSERes(filters=filters[2], dilation_rates=dil_rates[2], init_filters=True, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block10-Flip') | |
self.convblock11 = tfp.layers.Convolution3DFlipout(filters=class_count, strides=(1,1,1), kernel_size=(1,1,1), padding='same' | |
, dilation_rate=(1,1,1) | |
, activation=activation | |
, name='Block11-Flip' | |
) | |
def call(self, conv7, conv3, conv2, conv1): | |
if self.pooling == 'triple': | |
upconv8 = self.upconvblock8(conv7) | |
conv8 = self.convblock8(tf.concat([conv3, upconv8], axis=-1)) | |
else: | |
conv8 = self.convblock8(tf.concat([conv3, conv7], axis=-1)) | |
upconv9 = self.upconvblock9(conv8) | |
conv9 = self.convblock9(tf.concat([conv2, upconv9], axis=-1)) | |
upconv10 = self.upconvblock10(conv9) | |
conv10 = self.convblock10(tf.concat([conv1, upconv10], axis=-1)) | |
conv11 = self.convblock11(conv10) | |
return conv8, conv9, conv10, conv11 | |
def get_config(self): | |
config = {} | |
if self.pooling == 'triple': | |
config['upconvblock8'] = self.upconvblock8 | |
config['convblock8'] = self.convblock8 | |
config['upconvblock9'] = self.upconvblock9 | |
config['convblock9'] = self.convblock9 | |
config['upconvblock10'] = self.upconvblock10 | |
config['convblock10'] = self.convblock10 | |
config['convblock11'] = self.convblock11 | |
return config | |
############################################################ | |
# 3D MODELS # | |
############################################################ | |
class OrganNet(tf.keras.Model): | |
def __init__(self, class_count, pooling='double', hdc=True, dropout=None, bayesian=True, deepsup=False, bayesianhead=False, activation=tf.nn.softmax, trainable=False, verbose=False): | |
super(OrganNet, self).__init__(name='OrganNet') | |
self.pooling = pooling | |
if 0: | |
backend_filters = [[16,16], [32,32]] | |
backend_dil_rates = [[(1,1,1),(1,1,1)], [(1,1,1),(1,1,1)]] | |
hdc_core_filters = [[32,32,32,32], [32,32,32,32], [32,32,32,32]] | |
hdc_core_dil_rates = [[(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)]] | |
nonhdc_core_filters = [[32,32,32], [32,32,32], [32,32,32], [32,32,32]] | |
nonhdc_core_dil_rates = [[(2,2,1), (2,2,1), (2,2,1)], [(3,3,1), (3,3,1), (3,3,1)], [(6,6,1), (6,6,1), (6,6,1)], [(12,12,1), (12,12,1), (12,12,1)]] | |
if class_count != 1: | |
head_filters = [[32,32], [16,16], [class_count, class_count]] | |
else: | |
head_filters = [[32,32], [16,16], [8, 8]] | |
head_dil_rates = [[(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)]] | |
head_filters_upsample = [32,16] | |
head_filters_ksize = [(2,2,2), (2,2,1)] | |
elif 1: | |
backend_filters = [[32,32], [48,48]] | |
backend_dil_rates = [[(1,1,1),(1,1,1)], [(1,1,1),(1,1,1)]] | |
hdc_core_filters = [[48,48,48,48], [48,48,48,48], [48,48,48,48]] | |
hdc_core_dil_rates = [[(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)]] | |
nonhdc_core_filters = [[48,48,48], [48,48,48], [48,48,48], [48,48,48]] | |
nonhdc_core_dil_rates = [[(2,2,1), (2,2,1), (2,2,1)], [(3,3,1), (3,3,1), (3,3,1)], [(6,6,1), (6,6,1), (6,6,1)], [(12,12,1), (12,12,1), (12,12,1)]] | |
if class_count != 1: | |
head_filters = [[48,48], [32,32], [class_count, class_count]] | |
else: | |
head_filters = [[48,48], [32,32], [8, 8]] | |
head_dil_rates = [[(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)]] | |
head_filters_upsample = [48,48] | |
head_filters_ksize = [(2,2,2), (2,2,1)] | |
print (' - [models2.py][OrganNet] pooling : ', pooling) | |
print (' - [models2.py][OrganNet] bayesian : ', bayesian) | |
print (' - [models2.py][OrganNet] dropout : ', dropout) | |
print (' - [models2.py][OrganNet] bayesianhead : ', bayesianhead) | |
self.backend = OrganNetBackend(filters=backend_filters, dil_rates=backend_dil_rates, pooling=pooling, trainable=trainable, verbose=verbose) | |
if not bayesianhead: | |
if hdc: | |
self.core = HDC(filters=hdc_core_filters, dil_rates=hdc_core_dil_rates, dropout=dropout, bayesian=bayesian, trainable=trainable, verbose=verbose) | |
else: | |
self.core = nonHDC(filters=nonhdc_core_filters, dil_rates=nonhdc_core_dil_rates, dropout=dropout, bayesian=bayesian, trainable=trainable, verbose=verbose) | |
self.head = commonHead(filters=head_filters, pooling=pooling, dil_rates=head_dil_rates, filters_upsample=head_filters_upsample, ksize_upsample=head_filters_ksize, class_count=class_count, deepsup=deepsup | |
, activation=activation | |
, trainable=trainable, verbose=verbose) | |
else: | |
if hdc: | |
self.core = HDC(filters=hdc_core_filters, dil_rates=hdc_core_dil_rates, dropout=None, bayesian=False, trainable=trainable, verbose=verbose) | |
else: | |
self.core = nonHDC(filters=nonhdc_core_filters, dil_rates=nonhdc_core_dil_rates, dropout=None, bayesian=False, trainable=trainable, verbose=verbose) | |
self.head = commonHead(filters=head_filters, pooling=pooling, dil_rates=head_dil_rates, filters_upsample=head_filters_upsample, ksize_upsample=head_filters_ksize, class_count=class_count, deepsup=deepsup | |
, dropout=dropout, bayesian=bayesian | |
, activation=activation | |
, trainable=trainable, verbose=verbose) | |
def call(self, x): | |
if 1: | |
# Step 1 - Backend | |
conv1, pool1, conv2, pool2, conv3, pool3 = self.backend(x) | |
# Step 2 - Core | |
if self.pooling == 'double': | |
conv4, conv5, conv6, conv7 = self.core(conv3) | |
elif self.pooling == 'triple': | |
conv4, conv5, conv6, conv7 = self.core(pool3) | |
# Step 3 - Head | |
conv8, conv9, conv10, conv11 = self.head(conv7, conv3, conv2, conv1) | |
return conv11 | |
else: | |
# Step 1 - Backend | |
conv1, pool1 = self.backend.convblock1(x) | |
conv2, pool2 = self.backend.convblock2(pool1) | |
if self.backend.pooling == 'double': | |
conv3 = self.backend.convblock3(pool2) | |
pool3 = None | |
elif self.backend.pooling == 'triple': | |
conv3, pool3 = self.backend.convblock3(pool2) | |
# Step 2 - Core | |
if self.pooling == 'double': | |
conv4 = self.core.convblock4(conv3) | |
conv5 = self.core.convblock5(conv4) | |
conv6 = self.core.convblock6(conv5) | |
conv7 = self.core.convblock7(conv6) | |
# conv4, conv5, conv6, conv7 = self.core(conv3) | |
elif self.pooling == 'triple': | |
conv4 = self.core.convblock4(pool3) | |
conv5 = self.core.convblock5(conv4) | |
conv6 = self.core.convblock6(conv5) | |
conv7 = self.core.convblock7(conv6) | |
# conv4, conv5, conv6, conv7 = self.core(pool3) | |
# Step 3 - Head | |
if self.head.pooling == 'triple': | |
upconv8 = self.head.upconvblock8(conv7) | |
conv8 = self.head.convblock8(tf.concat([conv3, upconv8], axis=-1)) | |
else: | |
conv8 = self.head.convblock8(tf.concat([conv3, conv7], axis=-1)) | |
upconv9 = self.head.upconvblock9(conv8) | |
conv9 = self.head.convblock9(tf.concat([conv2, upconv9], axis=-1)) | |
upconv10 = self.head.upconvblock10(conv9) | |
conv10 = self.head.convblock10(tf.concat([conv1, upconv10], axis=-1)) | |
conv11 = self.head.convblock11(conv10) | |
# conv8, conv9, conv10, conv11 = self.head(conv7, conv3, conv2, conv1) | |
return conv11 | |
def get_config(self): | |
config = {'backend': self.backend, 'core': self.core, 'head': self.head} | |
return config | |
def build_graph(self, dim): | |
x = tf.keras.Input(shape=(dim), name='{}-Input'.format(self.name)) | |
return tf.keras.Model(inputs=[x], outputs=self.call(x)) | |
class FocusNet(tf.keras.Model): | |
def __init__(self, class_count, hdc=False, dropout=None, bayesian=True, deepsup=False, trainable=False, verbose=False): | |
super(FocusNet, self).__init__(name='FocusNet') | |
if 1: | |
backend_filters = [[16,16], [32,32]] | |
backend_dil_rates = [[(1,1,1),(1,1,1)], [(1,1,1),(1,1,1)]] | |
hdc_core_filters = [[32,32,32,32], [32,32,32,32], [32,32,32,32]] | |
hdc_core_dil_rates = [[(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)]] | |
nonhdc_core_filters = [[32,32,32], [32,32,32], [32,32,32], [32,32,32]] | |
nonhdc_core_dil_rates = [[(2,2,1), (2,2,1), (2,2,1)], [(3,3,1), (3,3,1), (3,3,1)], [(6,6,1), (6,6,1), (6,6,1)], [(12,12,1), (12,12,1), (12,12,1)]] | |
head_filters = [[32,32], [16,16], [class_count, class_count]] | |
head_dil_rates = [[(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)]] | |
head_filters_upsample = [None,16] | |
head_filters_ksize = [None, (2,2,2)] | |
print (' - [models2.py][FocusNet] bayesian: ', bayesian) | |
self.backend = FocusNetBackend(filters=backend_filters, dil_rates=backend_dil_rates, trainable=trainable, verbose=verbose) | |
if hdc: | |
self.core = HDC(filters=hdc_core_filters, dil_rates=hdc_core_dil_rates, dropout=dropout, bayesian=bayesian, trainable=trainable, verbose=verbose) | |
else: | |
self.core = nonHDC(filters=nonhdc_core_filters, dil_rates=nonhdc_core_dil_rates, dropout=dropout, bayesian=bayesian, trainable=trainable, verbose=verbose) | |
self.head = commonHead(filters=head_filters, dil_rates=head_dil_rates, filters_upsample=head_filters_upsample, ksize_upsample=head_filters_ksize, class_count=class_count, deepsup=deepsup, trainable=trainable, verbose=verbose) | |
def call(self, x): | |
# Step - Backend | |
conv1, pool1, conv2, pool2, conv3 = self.backend(x) | |
# Step 2 - Core | |
conv4, conv5, conv6, conv7 = self.core(conv3) | |
# Step 3 - Head | |
conv8, conv9, conv10, conv11 = self.head(conv7, conv3, conv2, conv1) | |
return conv11 | |
def get_config(self): | |
config = {'backend': self.backend, 'core': self.core, 'head': self.head} | |
return config | |
def build_graph(self, dim): | |
x = tf.keras.Input(shape=(dim), name='{}-Input'.format(self.name)) | |
return tf.keras.Model(inputs=[x], outputs=self.call(x)) | |
############################################################ | |
# UTILS # | |
############################################################ | |
@tf.function | |
def write_model_trace(model, X): | |
return model(X) | |
############################################################ | |
# MAIN # | |
############################################################ | |
if __name__ == "__main__": | |
X = tf.random.normal((2,140,140,40,1)) | |
if 0: | |
print ('\n ------------------- FocusNet ------------------- ') | |
model = FocusNet(class_count=10) | |
y_predict = model(X, training=True) | |
print (' - y_predict: ', y_predict.shape) | |
model.summary() # ~ nonBayes(550K), Bayes(900K) params | |
print (model.losses) | |
elif 0: | |
print ('\n ------------------- OrganNet ------------------- ') | |
model = OrganNet(class_count=10) | |
y_predict = model(X, training=True) | |
print (' - y_predict: ', y_predict.shape) | |
model.summary() # ~ nonBayes(550K), Bayes(900K) params | |
print (model.losses) | |
elif 0: | |
print ('\n ------------------- OrganNet (bayesian=False, dropout=0.3) ------------------- ') | |
model = OrganNet(class_count=10, bayesian=False, dropout=0.3) | |
y_predict = model(X, training=True) | |
print (' - y_predict: ', y_predict.shape) | |
model.summary() # ~ nonBayes(550K), Bayes(900K) params | |
print (model.losses) | |
# OrganNet (for prostate) | |
elif 0: | |
X = tf.random.normal((2,200,200,28,1)) | |
print ('\n ------------------- OrganNet (class_count=1, bayesian=False, dropout=None, activation=tf.nn.sigmoid) ------------------- ') | |
model = OrganNet(class_count=1, hdc=True, bayesian=False, bayesianhead=False, dropout=None, activation=tf.nn.sigmoid, verbose=True) | |
y_predict = model(X, training=True) | |
print (' - y_predict: ', y_predict.shape) | |
model.summary() # ~ nonBayes(550K), Bayes(900K) params | |
print (model.losses) | |
# OrganNet(pooling=triple) (for prostate) | |
elif 0: | |
X = tf.random.normal((2,200,200,28,1)) | |
print ('\n ------------------- OrganNet (class_count=1, bayesian=False, dropout=None, activation=tf.nn.sigmoid) ------------------- ') | |
model = OrganNet(class_count=1, pooling='triple', hdc=True, bayesian=False, bayesianhead=False, dropout=None, activation=tf.nn.sigmoid, verbose=True) | |
y_predict = model(X, training=True) | |
print (' - y_predict: ', y_predict.shape) | |
model.summary() # ~ nonBayes(550K), Bayes(900K) params | |
print (model.losses) | |
# OrganNet(pooling=triple) (for prostate) and for visualization in netron.app | |
elif 1: | |
raw_shape = (200,200,28,1) | |
print ('\n ------------------- OrganNet (class_count=1, pooling=triple, hdc=True, bayesian=False, dropout=None, activation=tf.nn.sigmoid) ------------------- ') | |
model = OrganNet(class_count=1, pooling='triple', hdc=True, bayesian=False, bayesianhead=False, dropout=None, activation=tf.nn.sigmoid, verbose=True) | |
model = model.build_graph(raw_shape) | |
model.summary(line_length=150) | |
# Using the save() function | |
_ = model(tf.ones((1,*raw_shape))) | |
model.save('OrganNetPool3.h5', save_format='h5') # Loads in www.netron.app for viz purposes, shows the high level blocks | |
# model.save('OrganNetPool3', save_format='tf') | |
# Using the .to_json() function | |
# import json | |
# with open('OrganNetPool3-Functional.json', 'w') as fp: | |
# json.dump(json.loads(model.to_json()), fp, indent=4) | |
# Using the .get_config() function | |
# import json | |
# with open('OrganNetPool3.json', 'w') as fp: | |
# json.dump(json.loads(model.get_config()), fp, indent=4) | |
tf.keras.utils.plot_model(model, 'OrganNetPool3.png', show_shapes=True, expand_nested=True) # pip install pydot graphviz | |
# tf.keras.utils.model_to_dot(model, show_shapes=True, expand_nested=True, subgraph=True) | |
# # Using tf2onnx | |
# import onnx | |
# import tf2onnx | |
# model_proto, external_tensor_storage = tf2onnx.convert.from_keras(model=model, input_signature=[tf.TensorSpec((1,*raw_shape), tf.float32, name='ModelInput')]) # tensorspec ensures that the batch size shows up properly in tf2onnx | |
# model_onnx = onnx.shape_inference.infer_shapes(model_proto) | |
# tf2onnx.utils.save_protobuf('OrganNetPool3.onnx', model_onnx) | |
# # Using tensorboard | |
# tf.summary.trace_on(graph=True, profiler=False) | |
# _ = write_model_trace(model, tf.ones(shape=(1,*raw_shape), dtype=tf.float32)) | |
# writer = tf.summary.create_file_writer(str('OrganNetPool3')) | |
# with writer.as_default(): | |
# tf.summary.trace_export(name=model.name, step=0, profiler_outdir=None) | |
# writer.flush() | |
# print (' - Run command --> tensorboard --logdir=OrganNetPool3 --port=6100') | |
pass | |
# OrganNet(pooling=triple) (for prostate) and for visualization in netron.app | |
elif 0: | |
raw_shape = (200,200,28,1) | |
print ('\n ------------------- OrganNet (class_count=1, pooling=triple, hdc=True, bayesian=True, dropout=None, activation=tf.nn.sigmoid) ------------------- ') | |
model = OrganNet(class_count=1, pooling='triple', hdc=True, bayesian=True, bayesianhead=False, dropout=None, activation=tf.nn.sigmoid, verbose=True) | |
model = model.build_graph(raw_shape) | |
model.summary(line_length=150) | |
# Using the save() function | |
_ = model(tf.ones((1,*raw_shape))) | |
model.save('OrganNetBayesPool3.h5', save_format='h5') # Loads in www.netron.app for viz purposes, shows the high level blocks | |
# model.save('OrganNetBayesPool3.h5', save_format='tf') | |
# Using tf2onnx | |
# import onnx | |
# import tf2onnx | |
# model_proto, external_tensor_storage = tf2onnx.convert.from_keras(model=model, input_signature=[tf.TensorSpec((1,*raw_shape), tf.float32, name='ModelInput')]) # tensorspec ensures that the batch size shows up properly in tf2onnx | |
# model_onnx = onnx.shape_inference.infer_shapes(model_proto) | |
# tf2onnx.utils.save_protobuf('OrganNetBayesPool3.onnx', model_onnx) | |
# Using tensorboard | |
# tf.summary.trace_on(graph=True, profiler=False) | |
# _ = write_model_trace(model, tf.ones(shape=(1,*raw_shape), dtype=tf.float32)) | |
# writer = tf.summary.create_file_writer(str('OrganNetBayesPool3')) | |
# with writer.as_default(): | |
# tf.summary.trace_export(name=model.name, step=0, profiler_outdir=None) | |
# writer.flush() | |
# print (' - Run command --> tensorboard --logdir=OrganNetBayesPool3 --port=6100') | |
pass | |
# OrgaNetBayesianHead | |
elif 0: | |
print ('\n ------------------- OrganNet (bayesian=True, dropout=None, bayesianhead=True) ------------------- ') | |
model = OrganNet(class_count=10, bayesian=True, dropout=None, bayesianhead=True) | |
y_predict = model(X, training=True) | |
print (' - y_predict: ', y_predict.shape) | |
model.summary() # ~ nonBayes(550K), and this is 580K params | |
print (model.losses) | |
pdb.set_trace() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment