Created
July 16, 2024 11:28
-
-
Save dyd1234/e2fbcdb7388c1c2e02d8407901bad7b1 to your computer and use it in GitHub Desktop.
Darknet -53 and CSPDarknet-53 SCN with different implementations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
from collections import OrderedDict | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from torchviz import make_dot | |
#-------------------------------------------------# | |
# MISH Activation Function | |
#-------------------------------------------------# | |
# class Mish(nn.Module): # Already implemented in PyTorch | |
# def __init__(self): | |
# super(Mish, self).__init__() | |
# def forward(self, x): | |
# return x * torch.tanh(F.softplus(x)) | |
#---------------------------------------------------# | |
# Convolutional Block -> Convolution + Normalization + Activation Function | |
# Conv2d + BatchNormalization + Mish | |
#---------------------------------------------------# | |
class BasicConv(nn.Module): # Basic Convolution | |
def __init__(self, in_channels, out_channels, kernel_size, stride=1): | |
super(BasicConv, self).__init__() | |
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, kernel_size // 2, bias=False) | |
self.bn = nn.BatchNorm2d(out_channels) | |
self.activation = nn.Mish() | |
def forward(self, x): | |
x = self.conv(x) | |
x = self.bn(x) | |
x = self.activation(x) | |
return x | |
#---------------------------------------------------# | |
# CSPDarkNet building block components | |
# Internally stacked residual blocks | |
#---------------------------------------------------# | |
class Resblock(nn.Module): | |
def __init__(self, channels, hidden_channels=None): | |
super(Resblock, self).__init__() | |
if hidden_channels is None: | |
hidden_channels = channels | |
self.block = nn.Sequential( | |
BasicConv(channels, hidden_channels, 1), | |
BasicConv(hidden_channels, channels, 3) | |
) | |
def forward(self, x): | |
return x + self.block(x) | |
#--------------------------------------------------------------------# | |
# CSPDarkNet building block | |
# First use ZeroPadding2D and a stride of 2x2 convolution block to compress height and width | |
# Then establish a large residual side shortconv, which bypasses many residual structures | |
# The main part will loop through num_blocks, and the internal loop consists of residual structures | |
# For the entire CSPDarkNet building block, it is a large residual block + multiple internal small residual blocks | |
#--------------------------------------------------------------------# | |
class Resblock_body(nn.Module): | |
def __init__(self, in_channels, out_channels, num_blocks, first): | |
super(Resblock_body, self).__init__() | |
#----------------------------------------------------------------# | |
# Use a stride of 2x2 convolution block to compress height and width | |
#----------------------------------------------------------------# | |
self.downsample_conv = BasicConv(in_channels, out_channels, 3, stride=2) | |
if first: | |
#--------------------------------------------------------------------------# | |
# Then establish a large residual side self.split_conv0, which bypasses many residual structures | |
#--------------------------------------------------------------------------# | |
self.split_conv0 = BasicConv(out_channels, out_channels, 1) | |
#----------------------------------------------------------------# | |
# The main part will loop through num_blocks, and the internal loop consists of residual structures | |
#----------------------------------------------------------------# | |
self.split_conv1 = BasicConv(out_channels, out_channels, 1) | |
self.blocks_conv = nn.Sequential( | |
Resblock(channels=out_channels, hidden_channels=out_channels // 2), | |
BasicConv(out_channels, out_channels, 1) | |
) | |
self.concat_conv = BasicConv(out_channels * 2, out_channels, 1) | |
else: | |
#--------------------------------------------------------------------------# | |
# Then establish a large residual side self.split_conv0, which bypasses many residual structures | |
#--------------------------------------------------------------------------# | |
self.split_conv0 = BasicConv(out_channels, out_channels // 2, 1) | |
#----------------------------------------------------------------# | |
# The main part will loop through num_blocks, and the internal loop consists of residual structures | |
#----------------------------------------------------------------# | |
self.split_conv1 = BasicConv(out_channels, out_channels // 2, 1) | |
self.blocks_conv = nn.Sequential( | |
*[Resblock(out_channels // 2) for _ in range(num_blocks)], | |
BasicConv(out_channels // 2, out_channels // 2, 1) | |
) | |
self.concat_conv = BasicConv(out_channels, out_channels, 1) | |
def forward(self, x): | |
x = self.downsample_conv(x) | |
x0 = self.split_conv0(x) | |
x1 = self.split_conv1(x) | |
x1 = self.blocks_conv(x1) | |
#------------------------------------# | |
# Stack the large residual side back | |
#------------------------------------# | |
x = torch.cat([x1, x0], dim=1) | |
#------------------------------------# | |
# Finally integrate the number of channels | |
#------------------------------------# | |
x = self.concat_conv(x) | |
return x | |
#---------------------------------------------------# | |
# CSPDarkNet53 main body | |
# Input is a 416x416x3 image | |
# Outputs are three effective feature layers | |
#---------------------------------------------------# | |
class CSPDarkNet(nn.Module): # modify the part | |
def __init__(self, layers): | |
super(CSPDarkNet, self).__init__() | |
self.inplanes = 32 | |
# 416,416,3 -> 416,416,32 | |
self.conv1 = BasicConv(3, self.inplanes, kernel_size=3, stride=1) | |
self.feature_channels = [64, 128, 256, 512, 1024] | |
self.stages = nn.ModuleList([ | |
# 416,416,32 -> 208,208,64 | |
Resblock_body(self.inplanes, self.feature_channels[0], layers[0], first=True), | |
# 208,208,64 -> 104,104,128 | |
Resblock_body(self.feature_channels[0], self.feature_channels[1], layers[1], first=False), | |
# 104,104,128 -> 52,52,256 | |
Resblock_body(self.feature_channels[1], self.feature_channels[2], layers[2], first=False), | |
# 52,52,256 -> 26,26,512 | |
Resblock_body(self.feature_channels[2], self.feature_channels[3], layers[3], first=False), | |
# 26,26,512 -> 13,13,1024 | |
Resblock_body(self.feature_channels[3], self.feature_channels[4], layers[4], first=False) | |
]) | |
self.num_features = 1 | |
for m in self.modules(): | |
if isinstance(m, nn.Conv2d): | |
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels | |
m.weight.data.normal_(0, math.sqrt(2. / n)) | |
elif isinstance(m, nn.BatchNorm2d): | |
m.weight.data.fill_(1) | |
m.bias.data.zero_() | |
def forward(self, x): | |
x = self.conv1(x) | |
x = self.stages[0](x) | |
x = self.stages[1](x) | |
out3 = self.stages[2](x) | |
out4 = self.stages[3](out3) | |
out5 = self.stages[4](out4) | |
return out3, out4, out5 | |
def darknet53(pretrained): | |
model = CSPDarkNet([1, 2, 8, 8, 4]) # The same number as before | |
if pretrained: | |
model.load_state_dict(torch.load("model_data/CSPdarknet53_backbone_weights.pth")) | |
return model | |
model = darknet53(pretrained=False) | |
input_tensor = torch.randn(1, 3, 416, 416) | |
# out3, out4, out5 = model(input_tensor) | |
_, _, out5 = model(input_tensor) | |
make_dot((out5), params=dict(model.named_parameters())).render("cspdarknet53", format="png") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### SCN functions | |
### Have some torch imports above | |
from typing import List, Tuple | |
def create_param_combination_conv2d(dimensions: int, in_channels: int, out_channels: int, kernel_size: int = 3) -> nn.ParameterList: | |
""" | |
This function is used to create a weight tensor list for a single conv2d layer without biases. | |
The weight tensors are meant to be used for calculating the final weight of the layer via linear combination. | |
""" | |
weight_list = nn.ParameterList() | |
for _ in range(dimensions): | |
weight = Parameter(torch.empty((out_channels, in_channels, kernel_size, kernel_size))) | |
init.kaiming_uniform_(weight, a=math.sqrt(5)) # to initialize the weights | |
weight_list.append(weight) | |
return weight_list | |
def create_param_combination_linear(dimensions: int, in_features: int, out_features: int) -> Tuple[nn.ParameterList, nn.ParameterList]: | |
""" | |
This function is used to create a weight tensor list for a single linear layer with biases. | |
The weight tensors are meant to be used for calculating the final weight of the layer via linear combination. | |
""" | |
weight_list = nn.ParameterList() | |
bias_list = nn.ParameterList() | |
for _ in range(dimensions): | |
weight = Parameter(torch.empty((out_features, in_features))) | |
init.kaiming_uniform_(weight, a=math.sqrt(5)) | |
weight_list.append(weight) | |
bias = Parameter(torch.empty(out_features)) | |
fan_in, _ = init._calculate_fan_in_and_fan_out(weight) | |
bound = 1 / math.sqrt(fan_in) | |
init.uniform_(bias, -bound, bound) | |
bias_list.append(bias) | |
return weight_list, bias_list | |
def calculate_weighted_sum(param_list: List[Parameter], coefficients: torch.Tensor) -> torch.Tensor: | |
""" | |
Calculate the weighted sum (linear combination) which is the final weight used during inference. | |
""" | |
weighted_list = [a * b for a, b in zip(param_list, coefficients)] | |
return torch.sum(torch.stack(weighted_list), dim=0) | |
def execute_hyper_conv2d(x: torch.Tensor, weight_list: List[Parameter], coefficients: torch.Tensor, stride: int = 0, padding: int = 0) -> torch.Tensor: | |
""" | |
Execute one hyper-conv2d layer. | |
""" | |
weights = calculate_weighted_sum(weight_list, coefficients) | |
return F.conv2d(x, weight=weights, stride=stride, padding=padding) | |
def execute_hyper_linear(x: torch.Tensor, weight_list: List[Parameter], bias_list: List[Parameter], coefficients: torch.Tensor) -> torch.Tensor: | |
""" | |
Execute one hyper-linear layer. | |
""" | |
weights = calculate_weighted_sum(weight_list, coefficients) | |
biases = calculate_weighted_sum(bias_list, coefficients) | |
return F.linear(x, weight=weights, bias=biases) | |
# Add some other parts to freeze weights | |
# Do it later | |
### define the CSP darknet 53 with hypernets | |
import math | |
from collections import OrderedDict | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
# import torchvision.transforms.functional as TF | |
import random | |
import torchvision.transforms.functional as TF | |
import math | |
from torchviz import make_dot | |
#-------------------------------------------------# | |
# MISH Activation Function | |
#-------------------------------------------------# | |
# class Mish(nn.Module): # Already implemented in PyTorch | |
# def __init__(self): | |
# super(Mish, self).__init__() | |
# def forward(self, x): | |
# return x * torch.tanh(F.softplus(x)) | |
#---------------------------------------------------# | |
# Convolutional Block -> Convolution + Normalization + Activation Function | |
# Conv2d + BatchNormalization + Mish | |
#---------------------------------------------------# | |
class BasicConv_SCN(nn.Module): # Basic Convolution | |
def __init__(self, dimensions, in_channels, out_channels, kernel_size, stride=1, is_scn=False): | |
super(BasicConv_SCN, self).__init__() | |
# set some class vaeiables | |
# self.dimensions = dimensions # | |
self.is_scn = is_scn # very useful | |
self.kernel_size = kernel_size # yes, the padding will be kernel_size//2 eaiser with this version | |
self.stride = stride | |
# set other steps | |
if self.is_scn: # | |
self.conv_weight_list = create_param_combination_conv2d(dimensions, in_channels, out_channels, kernel_size=self.kernel_size) | |
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, kernel_size // 2, bias=False) | |
self.bn = nn.BatchNorm2d(out_channels) | |
self.activation = nn.Mish() | |
else: | |
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, kernel_size // 2, bias=False) | |
self.bn = nn.BatchNorm2d(out_channels) | |
self.activation = nn.Mish() | |
def forward(self, x, hyper_x): # hyperx = Beta, no dimension needed | |
if self.is_scn: | |
# x = self.conv(x) | |
x = execute_hyper_conv2d(x, self.conv_weight_list, hyper_x, stride=self.stride, padding=self.kernel_size//2) # must be right | |
else: | |
x = self.conv(x) | |
x = self.bn(x) | |
x = self.activation(x) | |
return x | |
#---------------------------------------------------# | |
# Modify later | |
# FullyConnected Block -> Convolution + Normalization + Activation Function | |
# Conv2d + BatchNormalization + Mish | |
#---------------------------------------------------# | |
class OutputLayer_SCN(nn.Module): | |
def __init__(self, dimensions, feature_in, num_classes : int, is_scn=False): | |
super(OutputLayer_SCN, self).__init__() | |
self.is_scn = is_scn # very useful | |
self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) | |
# x = torch.flatten(x, 1) # Flatten the output of avgpool | |
if self.is_scn: | |
self.fc_weight_list, self.linear_bias_list = create_param_combination_linear(dimensions, feature_in, num_classes) | |
self.fc = nn.Linear(feature_in, num_classes) # | |
else: | |
self.fc = nn.Linear(feature_in, num_classes) # | |
def forward(self, x, hyper_x): # | |
x = self.avgpool(x) | |
x = torch.flatten(x, 1) | |
if self.is_scn: | |
x = execute_hyper_linear(x, self.fc_weight_list, self.linear_bias_list, hyper_x) # the real code for using the SCN with other layers | |
else: | |
x = self.fc(x) | |
return x | |
#---------------------------------------------------# | |
# CSPDarkNet building block components | |
# Internally stacked residual blocks | |
#---------------------------------------------------# | |
class Resblock_SCN(nn.Module): # OK | |
def __init__(self, dimensions, channels, hidden_channels=None, is_scn=False): | |
super(Resblock_SCN, self).__init__() # | |
if hidden_channels is None: | |
hidden_channels = channels | |
# Only SCN the basic | |
# do not use nn.sequential, the implementation is not working for that | |
# self.block = nn.Sequential( # the resblock, similar to The YOLO v3 | |
# # BasicConv(dimensions, channels, hidden_channels, 1), | |
# # BasicConv(hidden_channels, channels, 3) | |
# BasicConv_SCN(dimensions, channels, hidden_channels, 1, is_scn=is_scn), | |
# BasicConv_SCN(dimensions, hidden_channels, channels, 3, is_scn=is_scn) | |
# ) | |
# Only SCN the basic | |
self.conv1 = BasicConv_SCN(dimensions, channels, hidden_channels, 1, is_scn=is_scn) | |
self.conv2 = BasicConv_SCN(dimensions, hidden_channels, channels, 3, is_scn=is_scn) | |
def forward(self, x, hyper_x): # | |
# return x + self.block(x, hyper_x) | |
out = self.conv1(x, hyper_x) | |
out = self.conv2(out, hyper_x) | |
return x + out | |
#--------------------------------------------------------------------# | |
# CSPDarkNet building block | |
# First use ZeroPadding2D and a stride of 2x2 convolution block to compress height and width | |
# Then establish a large residual side shortconv, which bypasses many residual structures | |
# The main part will loop through num_blocks, and the internal loop consists of residual structures | |
# For the entire CSPDarkNet building block, it is a large residual block + multiple internal small residual blocks | |
#--------------------------------------------------------------------# | |
class Resblock_body_SCN(nn.Module): | |
def __init__(self, dimensions, in_channels, out_channels, num_blocks, first, is_scn=False): | |
super(Resblock_body_SCN, self).__init__() | |
#----------------------------------------------------------------# | |
# Use a stride of 2x2 convolution block to compress height and width | |
#----------------------------------------------------------------# | |
self.downsample_conv = BasicConv_SCN(dimensions, in_channels, out_channels, 3, stride=2, is_scn=is_scn) | |
if first: # the first part | |
#--------------------------------------------------------------------------# | |
# Then establish a large residual side self.split_conv0, which bypasses many residual structures | |
#--------------------------------------------------------------------------# | |
self.split_conv0 = BasicConv_SCN(dimensions, out_channels, out_channels, 1, is_scn=is_scn) | |
#----------------------------------------------------------------# | |
# The main part will loop through num_blocks, and the internal loop consists of residual structures | |
#----------------------------------------------------------------# | |
self.split_conv1 = BasicConv_SCN(dimensions, out_channels, out_channels, 1, is_scn=is_scn) | |
self.resblock = Resblock_SCN(dimensions, out_channels, hidden_channels=out_channels // 2, is_scn=is_scn) | |
self.basic_conv = BasicConv_SCN(dimensions, out_channels, out_channels, 1, is_scn=is_scn) | |
self.concat_conv = BasicConv_SCN(dimensions, out_channels * 2, out_channels, 1, is_scn=is_scn) | |
else: | |
#--------------------------------------------------------------------------# | |
# Then establish a large residual side self.split_conv0, which bypasses many residual structures | |
#--------------------------------------------------------------------------# | |
self.split_conv0 = BasicConv_SCN(dimensions, out_channels, out_channels // 2, 1, is_scn=is_scn) # | |
#----------------------------------------------------------------# | |
# The main part will loop through num_blocks, and the internal loop consists of residual structures | |
#----------------------------------------------------------------# | |
self.split_conv1 = BasicConv_SCN(dimensions, out_channels, out_channels // 2, 1, is_scn=is_scn) | |
# should be OK I guess | |
self.resblocks = nn.ModuleList([Resblock_SCN(dimensions, out_channels // 2, is_scn=is_scn) for _ in range(num_blocks)]) | |
self.basic_conv = BasicConv_SCN(dimensions, out_channels // 2, out_channels // 2, 1, is_scn=is_scn) | |
self.concat_conv = BasicConv_SCN(dimensions, out_channels, out_channels, 1, is_scn=is_scn) | |
def forward(self, x, hyper_x): | |
x = self.downsample_conv(x, hyper_x) | |
x0 = self.split_conv0(x, hyper_x) | |
x1 = self.split_conv1(x, hyper_x) | |
if hasattr(self, 'resblock'): # if first block, judge if we have the resblock OK | |
x1 = self.resblock(x1, hyper_x) # Add the following stuff | |
x1 = self.basic_conv(x1, hyper_x) | |
else: # if not first block | |
for resblock in self.resblocks: | |
x1 = resblock(x1, hyper_x) # Add the following stuff | |
x1 = self.basic_conv(x1, hyper_x) | |
#------------------------------------# | |
# Stack the large residual side back | |
#------------------------------------# | |
x = torch.cat([x1, x0], dim=1) | |
#------------------------------------# | |
# Finally integrate the number of channels | |
#------------------------------------# | |
x = self.concat_conv(x, hyper_x) | |
return x | |
#---------------------------------------------------# | |
# CSPDarkNet53 main body | |
# Input is a 416x416x3 image | |
# Outputs are three effective feature layers | |
#---------------------------------------------------# | |
class CSPDarkNet_SCN(nn.Module): | |
def __init__(self, layers, SCN_layers, num_classes, dimensions=1): | |
super(CSPDarkNet_SCN, self).__init__() | |
self.inplanes = 32 | |
self.hyper_stack = nn.Sequential( # hypernet | |
nn.Linear(2, 64), | |
nn.ReLU(), | |
nn.Linear(64, dimensions), | |
nn.Softmax(dim=0) | |
) | |
# 416,416,3 -> 416,416,32 | |
self.conv1 = BasicConv_SCN(dimensions, 3, self.inplanes, kernel_size=3, stride=1, is_scn=SCN_layers[0]) | |
self.feature_channels = [64, 128, 256, 512, 1024] | |
self.stages = nn.ModuleList([ | |
# 416,416,32 -> 208,208,64 | |
Resblock_body_SCN(dimensions, self.inplanes, self.feature_channels[0], layers[0], first=True, is_scn=SCN_layers[1]), | |
# 208,208,64 -> 104,104,128 | |
Resblock_body_SCN(dimensions, self.feature_channels[0], self.feature_channels[1], layers[1], first=False, is_scn=SCN_layers[2]), | |
# 104,104,128 -> 52,52,256 | |
Resblock_body_SCN(dimensions, self.feature_channels[1], self.feature_channels[2], layers[2], first=False, is_scn=SCN_layers[3]), | |
# 52,52,256 -> 26,26,512 | |
Resblock_body_SCN(dimensions, self.feature_channels[2], self.feature_channels[3], layers[3], first=False, is_scn=SCN_layers[4]), | |
# 26,26,512 -> 13,13,1024 | |
Resblock_body_SCN(dimensions, self.feature_channels[3], self.feature_channels[4], layers[4], first=False, is_scn=SCN_layers[5]) | |
]) | |
# and another fc layer | |
self.fc_block = OutputLayer_SCN(dimensions, self.feature_channels[4], num_classes=num_classes, is_scn=SCN_layers[6]) | |
## add remaing functions | |
## check if this is going to work | |
self.num_features = 1 | |
for m in self.modules(): | |
if isinstance(m, nn.Conv2d): | |
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels | |
m.weight.data.normal_(0, math.sqrt(2. / n)) | |
elif isinstance(m, nn.BatchNorm2d): | |
m.weight.data.fill_(1) | |
m.bias.data.zero_() | |
def forward(self, x, hyper_x): | |
hyper_output = self.hyper_stack(hyper_x) | |
x = self.conv1(x, hyper_output) | |
x = self.stages[0](x, hyper_output) | |
x = self.stages[1](x, hyper_output) | |
out3 = self.stages[2](x, hyper_output) | |
out4 = self.stages[3](out3, hyper_output) | |
out5 = self.stages[4](out4, hyper_output) | |
out6 = self.fc_block(out5, hyper_output) | |
# return out3, out4, out5, out6 | |
return out6 | |
def cspdarknet53(pretrained): | |
scn_segment = [True, True, True, True, True, True, True] # still 7 parts | |
model = CSPDarkNet_SCN([1, 2, 8, 8, 4], scn_segment, num_classes=class_num) # The same number as before | |
if pretrained: | |
model.load_state_dict(torch.load("model_data/CSPdarknet53_backbone_weights.pth")) | |
return model | |
def transform_angle(angle): # why dont we have this in the SCN class? because you use the video info? | |
cos = math.cos(angle / 180 * math.pi) | |
sin = math.sin(angle / 180 * math.pi) | |
return torch.Tensor([cos, sin]) | |
model = cspdarknet53(pretrained=False) | |
input_tensor = torch.randn(1, 3, 416, 416) | |
angle = random.uniform(0, 360) # uniformly set an angle from 0-2*pi | |
hyper_inputs = transform_angle(angle) # rotated image with random angle | |
# out3, out4, out5 = model(input_tensor) | |
# _, _, _, out6 = model(input_tensor, hyper_inputs) | |
out6 = model(input_tensor, hyper_inputs) | |
make_dot((out6), params=dict(model.named_parameters())).render("cspdarknet53_SCN", format="png") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import torch | |
import torchvision | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from typing import Optional, List, Tuple, Union | |
from torch import nn, Tensor | |
from torch.nn.parameter import Parameter, UninitializedParameter | |
from torch.nn import init | |
# DarkNet-53 model | |
class DarkNet53_SCN2(nn.Module): # | |
def __init__(self, scn_list : List , num_classes, dimensions=1): | |
super(DarkNet53_SCN2, self).__init__() | |
self.dimensions = dimensions # set how many dimensions to make | |
self.inplanes = 32 | |
self.num_classes = num_classes # | |
self.hyper_stack = nn.Sequential( # hypernet | |
nn.Linear(2, 64), | |
nn.ReLU(), | |
nn.Linear(64, dimensions), | |
nn.Softmax(dim=0) | |
) | |
self.scn_list = scn_list # set which layers (or blocks) should use SCN architecture | |
# Initial convolution layer | |
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False) | |
self.conv0_weight_list = self.create_param_combination_conv2d(3, 32, kernel_size=3) | |
self.bn1 = nn.BatchNorm2d(32) | |
self.relu1 = nn.LeakyReLU(0.1) | |
# First layer 2 basic blocks | |
self.ds_conv1 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1, bias=False) | |
self.ds_conv1_weight_list = self.create_param_combination_conv2d(32, 64, kernel_size=3) | |
self.ds_bn1 = nn.BatchNorm2d(64) | |
self.ds_relu1 = nn.LeakyReLU(0.1) | |
self.residual1_0_conv1 = nn.Conv2d(64, 32, kernel_size=1, stride=1, padding=0, bias=False) | |
self.residual1_0_conv1_weight_list = self.create_param_combination_conv2d(64, 32, kernel_size=1) | |
self.residual1_0_bn1 = nn.BatchNorm2d(32) | |
self.residual1_0_relu1 = nn.LeakyReLU(0.1) | |
self.residual1_0_conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1, bias=False) | |
self.residual1_0_conv2_weight_list = self.create_param_combination_conv2d(32, 64, kernel_size=3) | |
self.residual1_0_bn2 = nn.BatchNorm2d(64) | |
self.residual1_0_relu2 = nn.LeakyReLU(0.1) | |
# Second layer | |
self.ds_conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1, bias=False) | |
self.ds_conv2_weight_list = self.create_param_combination_conv2d(64, 128, kernel_size=3) | |
self.ds_bn2 = nn.BatchNorm2d(128) | |
self.ds_relu2 = nn.LeakyReLU(0.1) | |
self.residual2_0_conv1 = nn.Conv2d(128, 64, kernel_size=1, stride=1, padding=0, bias=False) | |
self.residual2_0_conv1_weight_list = self.create_param_combination_conv2d(128, 64, kernel_size=1) | |
self.residual2_0_bn1 = nn.BatchNorm2d(64) | |
self.residual2_0_relu1 = nn.LeakyReLU(0.1) | |
self.residual2_0_conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False) | |
self.residual2_0_conv2_weight_list = self.create_param_combination_conv2d(64, 128, kernel_size=3) | |
self.residual2_0_bn2 = nn.BatchNorm2d(128) | |
self.residual2_0_relu2 = nn.LeakyReLU(0.1) | |
self.residual2_1_conv1 = nn.Conv2d(128, 64, kernel_size=1, stride=1, padding=0, bias=False) | |
self.residual2_1_conv1_weight_list = self.create_param_combination_conv2d(128, 64, kernel_size=1) | |
self.residual2_1_bn1 = nn.BatchNorm2d(64) | |
self.residual2_1_relu1 = nn.LeakyReLU(0.1) | |
self.residual2_1_conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False) | |
self.residual2_1_conv2_weight_list = self.create_param_combination_conv2d(64, 128, kernel_size=3) | |
self.residual2_1_bn2 = nn.BatchNorm2d(128) | |
self.residual2_1_relu2 = nn.LeakyReLU(0.1) | |
# Third layer | |
self.ds_conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1, bias=False) | |
self.ds_conv3_weight_list = self.create_param_combination_conv2d(128, 256, kernel_size=3) | |
self.ds_bn3 = nn.BatchNorm2d(256) | |
self.ds_relu3 = nn.LeakyReLU(0.1) | |
self.residual3_0_conv1 = nn.Conv2d(256, 128, kernel_size=1, stride=1, padding=0, bias=False) | |
self.residual3_0_conv1_weight_list = self.create_param_combination_conv2d(256, 128, kernel_size=1) | |
self.residual3_0_bn1 = nn.BatchNorm2d(128) | |
self.residual3_0_relu1 = nn.LeakyReLU(0.1) | |
self.residual3_0_conv2 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=False) | |
self.residual3_0_conv2_weight_list = self.create_param_combination_conv2d(128, 256, kernel_size=3) | |
self.residual3_0_bn2 = nn.BatchNorm2d(256) | |
self.residual3_0_relu2 = nn.LeakyReLU(0.1) | |
for i in range(1, 8): # is this OK? I will modify this | |
setattr(self, f"residual3_{i}_conv1", nn.Conv2d(256, 128, kernel_size=1, stride=1, padding=0, bias=False)) | |
setattr(self, f"residual3_{i}_conv1_weight_list", self.create_param_combination_conv2d(256, 128, kernel_size=1)) | |
setattr(self, f"residual3_{i}_bn1", nn.BatchNorm2d(128)) | |
setattr(self, f"residual3_{i}_relu1", nn.LeakyReLU(0.1)) | |
setattr(self, f"residual3_{i}_conv2", nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=False)) | |
setattr(self, f"residual3_{i}_conv2_weight_list",self.create_param_combination_conv2d(128, 256, kernel_size=3)) | |
setattr(self, f"residual3_{i}_bn2", nn.BatchNorm2d(256)) | |
setattr(self, f"residual3_{i}_relu2", nn.LeakyReLU(0.1)) | |
# Fourth layer | |
self.ds_conv4 = nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1, bias=False) | |
self.ds_conv4_weight_list = self.create_param_combination_conv2d(256, 512, kernel_size=3) | |
self.ds_bn4 = nn.BatchNorm2d(512) | |
self.ds_relu4 = nn.LeakyReLU(0.1) | |
self.residual4_0_conv1 = nn.Conv2d(512, 256, kernel_size=1, stride=1, padding=0, bias=False) | |
self.residual4_0_conv1_weight_list = self.create_param_combination_conv2d(512, 256, kernel_size=1) | |
self.residual4_0_bn1 = nn.BatchNorm2d(256) | |
self.residual4_0_relu1 = nn.LeakyReLU(0.1) | |
self.residual4_0_conv2 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=False) | |
self.residual4_0_conv2_weight_list = self.create_param_combination_conv2d(256, 512, kernel_size=3) | |
self.residual4_0_bn2 = nn.BatchNorm2d(512) | |
self.residual4_0_relu2 = nn.LeakyReLU(0.1) | |
for i in range(1, 8): # Only a depper implementation | |
setattr(self, f"residual4_{i}_conv1", nn.Conv2d(512, 256, kernel_size=1, stride=1, padding=0, bias=False)) | |
setattr(self, f"residual4_{i}_conv1_weight_list", self.create_param_combination_conv2d(512, 256, kernel_size=1)) | |
setattr(self, f"residual4_{i}_bn1", nn.BatchNorm2d(256)) | |
setattr(self, f"residual4_{i}_relu1", nn.LeakyReLU(0.1)) | |
setattr(self, f"residual4_{i}_conv2", nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=False)) | |
setattr(self, f"residual4_{i}_conv2_weight_list", self.create_param_combination_conv2d(256, 512, kernel_size=3)) | |
setattr(self, f"residual4_{i}_bn2", nn.BatchNorm2d(512)) | |
setattr(self, f"residual4_{i}_relu2", nn.LeakyReLU(0.1)) | |
# Fifth layer | |
self.ds_conv5 = nn.Conv2d(512, 1024, kernel_size=3, stride=2, padding=1, bias=False) | |
self.ds_conv5_weight_list = self.create_param_combination_conv2d(512, 1024, kernel_size=3) | |
self.ds_bn5 = nn.BatchNorm2d(1024) | |
self.ds_relu5 = nn.LeakyReLU(0.1) | |
self.residual5_0_conv1 = nn.Conv2d(1024, 512, kernel_size=1, stride=1, padding=0, bias=False) | |
self.residual5_0_conv1_weight_list = self.create_param_combination_conv2d(1024, 512, kernel_size=1) | |
self.residual5_0_bn1 = nn.BatchNorm2d(512) | |
self.residual5_0_relu1 = nn.LeakyReLU(0.1) | |
self.residual5_0_conv2 = nn.Conv2d(512, 1024, kernel_size=3, stride=1, padding=1, bias=False) | |
self.residual5_0_conv2_weight_list = self.create_param_combination_conv2d(512, 1024, kernel_size=3) | |
self.residual5_0_bn2 = nn.BatchNorm2d(1024) | |
self.residual5_0_relu2 = nn.LeakyReLU(0.1) | |
for i in range(1, 4): | |
setattr(self, f"residual5_{i}_conv1", nn.Conv2d(1024, 512, kernel_size=1, stride=1, padding=0, bias=False)) | |
setattr(self, f"residual5_{i}_conv1_weight_list", self.create_param_combination_conv2d(1024, 512, kernel_size=1)) | |
setattr(self, f"residual5_{i}_bn1", nn.BatchNorm2d(512)) | |
setattr(self, f"residual5_{i}_relu1", nn.LeakyReLU(0.1)) | |
setattr(self, f"residual5_{i}_conv2", nn.Conv2d(512, 1024, kernel_size=3, stride=1, padding=1, bias=False)) | |
setattr(self, f"residual5_{i}_conv2_weight_list", self.create_param_combination_conv2d(512, 1024, kernel_size=3)) | |
setattr(self, f"residual5_{i}_bn2", nn.BatchNorm2d(1024)) # | |
setattr(self, f"residual5_{i}_relu2", nn.LeakyReLU(0.1)) | |
self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) | |
self.fc = nn.Linear(1024, self.num_classes) | |
self.fc_weight_list, self.linear_bias_list = self.create_param_combination_linear(1024, self.num_classes) | |
self.smax = nn.Softmax(dim=1) # defined but not used | |
# dont do this, it is stupid! | |
# self.parameter_list = [ # arrange a way to do partial SCN | |
# ] | |
def create_param_combination_conv2d(self, in_channels, out_channels, kernel_size=3): | |
""" | |
This function is used to create weight tensor list for a single conv2d layer without biases. | |
The weight tensors are meant to be used for calculate the final weight of the layer via linear combination | |
""" | |
weight_list = nn.ParameterList() | |
bias_list = nn.ParameterList() | |
for _ in range(self.dimensions): | |
weight = Parameter(torch.empty((out_channels, in_channels, kernel_size, kernel_size))) | |
init.kaiming_uniform_(weight, a=math.sqrt(5)) # to initialize the stuff | |
weight_list.append(weight) | |
# bias = Parameter(torch.empty(out_channels)) | |
# fan_in, _ = init._calculate_fan_in_and_fan_out(weight) | |
# bound = 1 / math.sqrt(fan_in) | |
# init.uniform_(bias, -bound, bound) | |
# bias_list.append(bias) | |
return weight_list | |
def create_param_combination_linear(self, in_features, out_features): | |
""" | |
This function is used to create weight tensor list for a single linear layer with biases. | |
The weight tensors are meant to be used for calculate the final weight of the layer via linear combination | |
""" | |
weight_list = nn.ParameterList() | |
bias_list = nn.ParameterList() | |
for _ in range(self.dimensions): | |
weight = Parameter(torch.empty((out_features, in_features))) | |
init.kaiming_uniform_(weight, a=math.sqrt(5)) | |
weight_list.append(weight) | |
bias = Parameter(torch.empty(out_features)) | |
fan_in, _ = init._calculate_fan_in_and_fan_out(weight) | |
bound = 1 / math.sqrt(fan_in) | |
init.uniform_(bias, -bound, bound) | |
bias_list.append(bias) | |
return weight_list, bias_list | |
def calculate_weighted_sum(self, param_list: List, coefficients: Tensor): | |
""" | |
Calculate the weighted sum (linear combination) which is the final weight used during inference | |
""" | |
weighted_list = [a * b for a, b in zip(param_list, coefficients)] | |
return torch.sum(torch.stack(weighted_list), dim=0) | |
def execute_hyper_conv2d(self, x, weight_list: List, coefficients: Tensor, stride=0, padding=0): | |
""" | |
Execute one hyper-conv2d layer | |
""" | |
weights = self.calculate_weighted_sum(weight_list, coefficients) | |
return F.conv2d(x, weight=weights, stride=stride, padding=padding) | |
def execute_hyper_linear(self, x, weight_list: List, bias_list: List, coefficients): | |
""" | |
Execute one hyper-linear layer | |
""" | |
weights = self.calculate_weighted_sum(weight_list, coefficients) | |
biases = self.calculate_weighted_sum(bias_list, coefficients) | |
return F.linear(x, weight=weights, bias=biases) | |
def forward(self, x, hyper_x): # 先全部SCN了,再看看部分SCN的情况 | |
hyper_output = self.hyper_stack(hyper_x) | |
# Initial convolution layer | |
if self.scn_list[0] == 0: # use one4all | |
x = self.conv1(x) # do not use SCN | |
else: # use SCN | |
x = self.execute_hyper_conv2d(x, self.conv0_weight_list, hyper_output, stride=1, padding=1) | |
x = self.bn1(x) | |
x = self.relu1(x) | |
# First layer | |
if self.scn_list[1] == 0: # use one4all | |
x = self.ds_conv1(x) | |
x = self.ds_bn1(x) | |
x = self.ds_relu1(x) | |
residual = x | |
out = self.residual1_0_conv1(x) | |
out = self.residual1_0_bn1(out) | |
out = self.residual1_0_relu1(out) | |
out = self.residual1_0_conv2(out) | |
out = self.residual1_0_bn2(out) | |
out = self.residual1_0_relu2(out) | |
x = out + residual | |
else: # use SCN | |
# x = self.execute_hyper_conv2d(x, self.ds_conv1_weight_list, hyper_output, stride=2, padding=1) | |
x = self.ds_conv1(x) | |
x = self.ds_bn1(x) | |
x = self.ds_relu1(x) | |
residual = x | |
# out = self.residual1_0_conv1(x) | |
out = self.execute_hyper_conv2d(x, self.residual1_0_conv1_weight_list, hyper_output, stride=1, padding=0) | |
out = self.residual1_0_bn1(out) | |
out = self.residual1_0_relu1(out) | |
# out = self.residual1_0_conv2(out) | |
out = self.execute_hyper_conv2d(out, self.residual1_0_conv2_weight_list, hyper_output, stride=1, padding=1) | |
out = self.residual1_0_bn2(out) | |
out = self.residual1_0_relu2(out) | |
x = out + residual | |
# Second layer | |
if self.scn_list[2] == 0: # use one4all | |
x = self.ds_conv2(x) | |
x = self.ds_bn2(x) | |
x = self.ds_relu2(x) | |
for i in range(2): | |
residual = x | |
out = getattr(self, f"residual2_{i}_conv1")(x) | |
out = getattr(self, f"residual2_{i}_bn1")(out) | |
out = getattr(self, f"residual2_{i}_relu1")(out) | |
out = getattr(self, f"residual2_{i}_conv2")(out) | |
out = getattr(self, f"residual2_{i}_bn2")(out) | |
out = getattr(self, f"residual2_{i}_relu2")(out) | |
x = out + residual | |
else: | |
# x = self.ds_conv2(x) | |
x = self.execute_hyper_conv2d(x, self.ds_conv2_weight_list, hyper_output, stride=2, padding=1) | |
x = self.ds_bn2(x) | |
x = self.ds_relu2(x) | |
for i in range(2): | |
residual = x | |
# out = getattr(self, f"residual2_{i}_conv1")(x) | |
out = self.execute_hyper_conv2d(x, getattr(self, f"residual2_{i}_conv1_weight_list"), hyper_output, stride=1, padding=0) | |
out = getattr(self, f"residual2_{i}_bn1")(out) | |
out = getattr(self, f"residual2_{i}_relu1")(out) | |
# out = getattr(self, f"residual2_{i}_conv2")(out) | |
out = self.execute_hyper_conv2d(out, getattr(self, f"residual2_{i}_conv2_weight_list"), hyper_output, stride=1, padding=1) | |
out = getattr(self, f"residual2_{i}_bn2")(out) | |
out = getattr(self, f"residual2_{i}_relu2")(out) | |
x = out + residual | |
# Third layer | |
if self.scn_list[3] == 0: | |
x = self.ds_conv3(x) | |
x = self.ds_bn3(x) | |
x = self.ds_relu3(x) | |
for i in range(8): | |
residual = x | |
out = getattr(self, f"residual3_{i}_conv1")(x) | |
out = getattr(self, f"residual3_{i}_bn1")(out) | |
out = getattr(self, f"residual3_{i}_relu1")(out) | |
out = getattr(self, f"residual3_{i}_conv2")(out) | |
out = getattr(self, f"residual3_{i}_bn2")(out) | |
out = getattr(self, f"residual3_{i}_relu2")(out) | |
x = out + residual | |
else: | |
# x = self.ds_conv3(x) | |
x = self.execute_hyper_conv2d(x, self.ds_conv3_weight_list, hyper_output, stride=2, padding=1) | |
x = self.ds_bn3(x) | |
x = self.ds_relu3(x) | |
for i in range(8): | |
residual = x | |
# out = getattr(self, f"residual3_{i}_conv1")(x) | |
out = self.execute_hyper_conv2d(x, getattr(self, f"residual3_{i}_conv1_weight_list"), hyper_output, stride=1, padding=0) | |
out = getattr(self, f"residual3_{i}_bn1")(out) | |
out = getattr(self, f"residual3_{i}_relu1")(out) | |
# out = getattr(self, f"residual3_{i}_conv2")(out) | |
out = self.execute_hyper_conv2d(out, getattr(self, f"residual3_{i}_conv2_weight_list"), hyper_output, stride=1, padding=1) | |
out = getattr(self, f"residual3_{i}_bn2")(out) | |
out = getattr(self, f"residual3_{i}_relu2")(out) | |
x = out + residual | |
# Fourth layer | |
if self.scn_list[4] == 0: | |
x = self.ds_conv4(x) | |
x = self.ds_bn4(x) | |
x = self.ds_relu4(x) | |
for i in range(8): | |
residual = x | |
out = getattr(self, f"residual4_{i}_conv1")(x) | |
out = getattr(self, f"residual4_{i}_bn1")(out) | |
out = getattr(self, f"residual4_{i}_relu1")(out) | |
out = getattr(self, f"residual4_{i}_conv2")(out) | |
out = getattr(self, f"residual4_{i}_bn2")(out) | |
out = getattr(self, f"residual4_{i}_relu2")(out) | |
x = out + residual | |
else: | |
# x = self.ds_conv4(x) | |
x = self.execute_hyper_conv2d(x, self.ds_conv4_weight_list, hyper_output, stride=2, padding=1) | |
x = self.ds_bn4(x) | |
x = self.ds_relu4(x) | |
for i in range(8): | |
residual = x | |
# out = getattr(self, f"residual4_{i}_conv1")(x) | |
out = self.execute_hyper_conv2d(x, getattr(self, f"residual4_{i}_conv1_weight_list"), hyper_output, stride=1, padding=0) | |
out = getattr(self, f"residual4_{i}_bn1")(out) | |
out = getattr(self, f"residual4_{i}_relu1")(out) | |
# out = getattr(self, f"residual4_{i}_conv2")(out) | |
out = self.execute_hyper_conv2d(out, getattr(self, f"residual4_{i}_conv2_weight_list"), hyper_output, stride=1, padding=1) | |
out = getattr(self, f"residual4_{i}_bn2")(out) | |
out = getattr(self, f"residual4_{i}_relu2")(out) | |
x = out + residual | |
# Fifth layer | |
if self.scn_list[5] == 0: | |
x = self.ds_conv5(x) | |
x = self.ds_bn5(x) | |
x = self.ds_relu5(x) | |
for i in range(4): | |
residual = x | |
out = getattr(self, f"residual5_{i}_conv1")(x) | |
out = getattr(self, f"residual5_{i}_bn1")(out) | |
out = getattr(self, f"residual5_{i}_relu1")(out) | |
out = getattr(self, f"residual5_{i}_conv2")(out) | |
out = getattr(self, f"residual5_{i}_bn2")(out) | |
out = getattr(self, f"residual5_{i}_relu2")(out) | |
x = out + residual | |
else: | |
# x = self.ds_conv5(x) | |
x = self.execute_hyper_conv2d(x, self.ds_conv5_weight_list, hyper_output, stride=2, padding=1) | |
x = self.ds_bn5(x) | |
x = self.ds_relu5(x) | |
for i in range(4): | |
residual = x | |
# out = getattr(self, f"residual5_{i}_conv1")(x) | |
out = self.execute_hyper_conv2d(x, getattr(self, f"residual5_{i}_conv1_weight_list"), hyper_output, stride=1, padding=0) | |
out = getattr(self, f"residual5_{i}_bn1")(out) | |
out = getattr(self, f"residual5_{i}_relu1")(out) | |
# out = getattr(self, f"residual5_{i}_conv2")(out) | |
out = self.execute_hyper_conv2d(out, getattr(self, f"residual5_{i}_conv2_weight_list"), hyper_output, stride=1, padding=1) | |
out = getattr(self, f"residual5_{i}_bn2")(out) | |
out = getattr(self, f"residual5_{i}_relu2")(out) | |
x = out + residual | |
if self.scn_list[6] == 0: # set the last meaning of the | |
# Adjust output layer for training | |
x = self.avgpool(x) | |
x = torch.flatten(x, 1) # Flatten the output of avgpool | |
x = self.fc(x) # Pass through the final fully connected layer | |
# x = self.smax(x) # Defined in loss function of PyTorch | |
else: | |
# Adjust output layer for training | |
x = self.avgpool(x) | |
x = torch.flatten(x, 1) # Flatten the output of avgpool | |
# x = self.fc(x) # Pass through the final fully connected layer | |
x = self.execute_hyper_linear(x, self.fc_weight_list, self.linear_bias_list, hyper_output) # the real code for using the SCN with other layers | |
# x = self.smax(x) # Defined in loss function of PyTorch | |
return x | |
def darknet53(num_classes): | |
scn_list = [1, 1, 0, 0, 0, 0, 0] # do it later for making partial scn | |
model = DarkNet53_SCN2(scn_list ,num_classes=num_classes) # 100 classes for mini-imagenet | |
print(scn_list) | |
print(model.scn_list[1]) | |
return model | |
# # Testing the model | |
# if __name__ == "__main__": | |
# inputs = torch.rand((8, 3, 224, 224)).cuda() | |
# model = darknet53(num_classes=100).cuda().train() | |
# # outputs = model(inputs, ) | |
# # print(outputs.shape) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment