Created
August 5, 2020 06:57
-
-
Save jinyup100/488ff49932672ce7291717033eb60d73 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import os | |
import onnx | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import cv2 | |
from torch.autograd import Variable | |
# Class for Region Proposal Neural Network | |
class RPN(nn.Module): | |
"Region Proposal Network" | |
def __init__(self): | |
super(RPN, self).__init__() | |
def forward(self, z_f, x_f): | |
raise NotImplementedError | |
class DepthwiseXCorr(nn.Module): | |
"Depthwise Correlation Layer" | |
def __init__(self, in_channels, hidden, out_channels, kernel_size=3, hidden_kernel_size=5): | |
super(DepthwiseXCorr, self).__init__() | |
self.conv_kernel = nn.Sequential( | |
nn.Conv2d(in_channels, hidden, kernel_size=kernel_size, bias=False), | |
nn.BatchNorm2d(hidden), | |
nn.ReLU(inplace=True), | |
) | |
self.conv_search = nn.Sequential( | |
nn.Conv2d(in_channels, hidden, kernel_size=kernel_size, bias=False), | |
nn.BatchNorm2d(hidden), | |
nn.ReLU(inplace=True), | |
) | |
self.head = nn.Sequential( | |
nn.Conv2d(hidden, hidden, kernel_size=1, bias=False), | |
nn.BatchNorm2d(hidden), | |
nn.ReLU(inplace=True), | |
nn.Conv2d(hidden, out_channels, kernel_size=1) | |
) | |
def forward(self, kernel, search): | |
kernel = self.conv_kernel(kernel) | |
search = self.conv_search(search) | |
feature = xcorr_depthwise(search, kernel) | |
out = self.head(feature) | |
return out | |
class DepthwiseRPN(RPN): | |
def __init__(self, anchor_num=5, in_channels=256, out_channels=256): | |
super(DepthwiseRPN, self).__init__() | |
self.cls = DepthwiseXCorr(in_channels, out_channels, 2 * anchor_num) | |
self.loc = DepthwiseXCorr(in_channels, out_channels, 4 * anchor_num) | |
def forward(self, z_f, x_f): | |
cls = self.cls(z_f, x_f) | |
loc = self.loc(z_f, x_f) | |
return cls, loc | |
class MultiRPN(RPN): | |
def __init__(self, anchor_num, in_channels, weighted=True): | |
super(MultiRPN, self).__init__() | |
self.weighted = weighted | |
for i in range(len(in_channels)): | |
self.add_module('rpn'+str(i+2), | |
DepthwiseRPN(anchor_num, in_channels[i], in_channels[i])) | |
if self.weighted: | |
self.cls_weight = nn.Parameter(torch.ones(len(in_channels))) | |
self.loc_weight = nn.Parameter(torch.ones(len(in_channels))) | |
def forward(self, z_fs, x_fs): | |
cls = [] | |
loc = [] | |
rpn2 = self.rpn2 | |
z_f2 = z_fs[0] | |
x_f2 = x_fs[0] | |
c2,l2 = rpn2(z_f2, x_f2) | |
cls.append(c2) | |
loc.append(l2) | |
rpn3 = self.rpn3 | |
z_f3 = z_fs[1] | |
x_f3 = x_fs[1] | |
c3,l3 = rpn3(z_f3, x_f3) | |
cls.append(c3) | |
loc.append(l3) | |
rpn4 = self.rpn4 | |
z_f4 = z_fs[2] | |
x_f4 = x_fs[2] | |
c4,l4 = rpn4(z_f4, x_f4) | |
cls.append(c4) | |
loc.append(l4) | |
def _softmax(a) : | |
c = torch.max(a) | |
exp_a = torch.exp(a-c) | |
sum_exp_a = torch.sum(exp_a) | |
y = exp_a / sum_exp_a | |
return y | |
#if self.weighted: | |
#cls_weight = F.softmax(self.cls_weight, 0) | |
#loc_weight = F.softmax(self.loc_weight, 0) | |
cls_weight = _softmax(self.cls_weight) | |
loc_weight = _softmax(self.loc_weight) | |
def avg(lst): | |
return sum(lst) / len(lst) | |
def weighted_avg(lst, weight): | |
s = 0 | |
fixed_len = 3 | |
for i in range(3): | |
s += lst[i] * weight[i] | |
return s | |
#if self.weighted: | |
weighted_avg_cls = weighted_avg(cls, cls_weight) | |
weighted_avg_loc = weighted_avg(loc, loc_weight) | |
#clsloc = [weighted_avg_cls, weighted_avg_loc] | |
return weighted_avg_cls, weighted_avg_loc | |
#else: | |
# avg_cls = avg(cls) | |
# avg_loc = avg(loc) | |
# #clsloc = [avg_cls, avg_loc] | |
# return avg_cls, avg_loc | |
# End of class for RPN | |
def conv3x3(in_planes, out_planes, stride=1, dilation=1): | |
"3x3 convolution with padding" | |
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, | |
padding=dilation, bias=False, dilation=dilation) | |
def xcorr_depthwise(x, kernel): | |
""" | |
Deptwise convolution for input and weights with the same shapes | |
Elementwise multiplication -> GlobalAveragePooling -> scalar mul on (kernel_h * kernel_w) | |
""" | |
batch = kernel.size(0) | |
channel = kernel.size(1) | |
x = x.view(1, batch*channel, x.size(2), x.size(3)) | |
kernel = kernel.view(batch*channel, 1, kernel.size(2), kernel.size(3)) | |
conv = nn.Conv2d(batch*channel, batch*channel, kernel_size=(kernel.size(2), kernel.size(3)), bias=False, groups=batch*channel) | |
conv.weight = nn.Parameter(kernel) | |
out = conv(x) | |
out = out.view(batch, channel, out.size(2), out.size(3)) | |
out = out.detach() | |
return out | |
class RPNBuilder(nn.Module): | |
def __init__(self): | |
super(RPNBuilder, self).__init__() | |
# Build Adjusted Layer Builder | |
self.rpn_head = MultiRPN(anchor_num=5,in_channels=[256, 256, 256],weighted=True) | |
def forward(self, zf, xf): | |
# Get Feature | |
cls, loc = self.rpn_head(zf, xf) | |
return cls, loc | |
# Pre-trained Weights to the Tracker Model | |
current_path = os.getcwd() | |
"Load path should be the directory of the pre-trained siamrpn_r50_l234_dwxcorr.pth" | |
"The download link to siamrpn_r50_l234_dwxcorr.pth is shown in the description" | |
load_path = os.path.join(current_path, "siamrpn_r50_l234_dwxcorr.pth") | |
pretrained_dict = torch.load(load_path,map_location=torch.device('cpu') ) | |
pretrained_dict_head = pretrained_dict | |
# Export the torch MultiRPN model to ONNX model | |
rpn_head = RPNBuilder() | |
rpn_head.eval() | |
rpn_head.state_dict().keys() | |
rpn_head_dict = rpn_head.state_dict() | |
# Load the pre-trained weights | |
pretrained_dict_head = {k: v for k, v in pretrained_dict_head.items() if k in rpn_head_dict} | |
pretrained_dict_head.keys() | |
rpn_head_dict.update(pretrained_dict_head) | |
rpn_head.load_state_dict(rpn_head_dict) | |
rpn_head.eval() | |
zfs = np.load('zfs.npy') | |
zfs = np.load('xfs.npy') | |
# Export the torch head model to ONNX model | |
batch_size = 1 | |
torch.onnx.export(rpn_head, (torch.Tensor(np.random.rand(*zfs.shape)), torch.Tensor(np.random.rand(*xfs.shape))), "rpn_head.onnx", export_params=True, opset_version=11, | |
do_constant_folding=True, input_names = ['input_1', 'input_2'], output_names = ['output_1', 'output_2']) | |
# Load the saved rpn_head model using ONNX | |
onnx_rpn_head_model = onnx.load("rpn_head.onnx") | |
# Check whether the rpn_head model has been successfully imported | |
onnx.checker.check_model(onnx_rpn_head_model) | |
print(onnx.checker.check_model(onnx_rpn_head_model)) | |
onnx.helper.printable_graph(onnx_rpn_head_model.graph) | |
print(onnx.helper.printable_graph(onnx_rpn_head_model.graph)) | |
rpn_head = cv2.dnn.readNetFromONNX('rpn_head.onnx') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment