Created
March 29, 2021 08:49
-
-
Save ghamarian/291af52ea6ffc9a16f69dfbad2789a7e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import logging | |
import os | |
import os.path | |
import threading | |
from functools import partial | |
from typing import List, Sequence | |
import cv2 | |
from math import log, pi | |
from cvxopt import blas, lapack, solvers, matrix, sqrt, mul, cos, sin | |
import numpy as np | |
from gs.proto import Configuration, Problem, SubscribeObjectProfilesReply | |
from gs_vision.scene import GradingImage, Projection | |
from .base import ProductHandler, ProductHandlerFactory, ProductHandlerSession | |
logger = logging.getLogger(__name__) | |
dev_data_dir = os.path.join( | |
os.path.abspath( | |
os.path.dirname( | |
os.path.dirname( | |
os.path.dirname(os.path.dirname(__file__))))), | |
'data', 'dev') | |
# Load an image | |
# image = cv2.imread("./pear3.png") | |
SIZE = (512, 384) | |
RECT = (0, 100, 450, 250) | |
def grabcut(img): | |
mask = np.zeros(SIZE[::-1], np.uint8) | |
bgdModel = np.zeros((1, 65)) | |
fgdModel = np.zeros((1, 65)) | |
orig_size = img.shape[:2][::-1] | |
img = cv2.resize(img, SIZE) | |
cv2.grabCut(img, mask, RECT, bgdModel, fgdModel, | |
2, mode=cv2.GC_INIT_WITH_RECT) | |
out_mask = np.where((mask == 2) | (mask == 0), 0, 2).astype('uint8') | |
return cv2.resize(out_mask, orig_size, interpolation=cv2.INTER_NEAREST) | |
# Loewner-John ellipsoid | |
# | |
# minimize log det A^-1 | |
# subject to xk'*A*xk - 2*xk'*b + b'*A^1*b <= 1, k=1,...,m | |
# | |
# 5 variables x = (A[0,0], A[1,0], A[1,1], b[0], b[1]) | |
def F(X, m, x=None, z=None): | |
if x is None: | |
return m, matrix([1.0, 0.0, 1.0, 0.0, 0.0]) | |
# Factor A as A = L*L'. Compute inverse B = A^-1. | |
A = matrix([x[0], x[1], x[1], x[2]], (2, 2)) | |
L = +A | |
try: | |
lapack.potrf(L) | |
except: | |
return None | |
B = +L | |
lapack.potri(B) | |
B[0, 1] = B[1, 0] | |
# f0 = -log det A | |
f = matrix(0.0, (m+1, 1)) | |
f[0] = -2.0 * (log(L[0, 0]) + log(L[1, 1])) | |
# fk = xk'*A*xk - 2*xk'*b + b*A^-1*b - 1 | |
# = (xk - c)' * A * (xk - c) - 1 where c = A^-1*b | |
c = x[3:] | |
lapack.potrs(L, c) | |
for k in range(m): | |
f[k+1] = (X[k, :].T - c).T * A * (X[k, :].T - c) - 1.0 | |
# gradf0 = (-A^-1, 0) = (-B, 0) | |
Df = matrix(0.0, (m+1, 5)) | |
Df[0, 0], Df[0, 1], Df[0, 2] = -B[0, 0], -2.0*B[1, 0], -B[1, 1] | |
# gradfk = (xk*xk' - A^-1*b*b'*A^-1, 2*(-xk + A^-1*b)) | |
# = (xk*xk' - c*c', 2*(-xk+c)) | |
Df[1:, 0] = X[:m, 0]**2 - c[0]**2 | |
Df[1:, 1] = 2.0 * (mul(X[:m, 0], X[:m, 1]) - c[0]*c[1]) | |
Df[1:, 2] = X[:m, 1]**2 - c[1]**2 | |
Df[1:, 3] = 2.0 * (-X[:m, 0] + c[0]) | |
Df[1:, 4] = 2.0 * (-X[:m, 1] + c[1]) | |
if z is None: | |
return f, Df | |
# hessf0(Y, y) = (A^-1*Y*A^-1, 0) = (B*YB, 0) | |
H0 = matrix(0.0, (5, 5)) | |
H0[0, 0] = B[0, 0]**2 | |
H0[1, 0] = 2.0 * B[0, 0] * B[1, 0] | |
H0[2, 0] = B[1, 0]**2 | |
H0[1, 1] = 2.0 * (B[0, 0] * B[1, 1] + B[1, 0]**2) | |
H0[2, 1] = 2.0 * B[1, 0] * B[1, 1] | |
H0[2, 2] = B[1, 1]**2 | |
# hessfi(Y, y) | |
# = ( A^-1*Y*A^-1*b*b'*A^-1 + A^-1*b*b'*A^-1*Y*A^-1 | |
# - A^-1*y*b'*A^-1 - A^-1*b*y'*A^-1, | |
# -2*A^-1*Y*A^-1*b + 2*A^-1*y ) | |
# = ( B*Y*c*c' + c*c'*Y*B - B*y*c' - c*y'*B, -2*B*Y*c + 2*B*y ) | |
# = ( B*(Y*c-y)*c' + c*(Y*c-y)'*B, -2*B*(Y*c - y) ) | |
H1 = matrix(0.0, (5, 5)) | |
H1[0, 0] = 2.0 * c[0]**2 * B[0, 0] | |
H1[1, 0] = 2.0 * (c[0] * c[1] * B[0, 0] + c[0]**2 * B[1, 0]) | |
H1[2, 0] = 2.0 * c[0] * c[1] * B[1, 0] | |
H1[3:, 0] = -2.0 * c[0] * B[:, 0] | |
H1[1, 1] = 2.0 * c[0]**2 * B[1, 1] + 4.0 * c[0]*c[1]*B[1, 0] + \ | |
2.0 * c[1]**2 + B[0, 0] | |
H1[2, 1] = 2.0 * (c[1]**2 * B[1, 0] + c[0]*c[1]*B[1, 1]) | |
H1[3:, 1] = -2.0 * B * c[[1, 0]] | |
H1[2, 2] = 2.0 * c[1]**2 * B[1, 1] | |
H1[3:, 2] = -2.0 * c[1] * B[:, 1] | |
H1[3:, 3:] = 2*B | |
return f, Df, z[0]*H0 + sum(z[1:])*H1 | |
class EllipsoidProductHandler(ProductHandler): | |
def on_new_configuration(self, conf: Configuration) -> List[Problem]: | |
self._counter = [0] * len(self.session.scene.image_sources) | |
os.makedirs(dev_data_dir, exist_ok=True) | |
return [] | |
def findContour(self, img): | |
contours, hierarchy = cv2.findContours(img, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) | |
return contours[np.argmax([cv2.contourArea(c) for c in contours])] | |
def get_cotour(self, image): | |
# self.mask = grabcut(image) | |
# contours, hierarchy = cv2.findContours(self.mask, 1, 2) | |
# contour_idx = np.argmax([con.shape[0] for con in contours]) | |
# my_contour = contours[contour_idx].squeeze() | |
# self.x_max, self.y_max = np.amax(my_contour, axis=0) | |
# my_contour = np.array(list(zip(my_contour[:, 0] / self.x_max, -my_contour[:, 1] / self.y_max))) | |
# | |
# my_contour = np.append(my_contour, my_contour[0]).reshape(-1, 2) | |
# self.contour = my_contour | |
# return my_contour | |
# original_size = (image.shape[1], image.shape[0]) | |
# new_size = (640, 480) | |
# image = cv2.resize(image, new_size) | |
threshold = 50 | |
self.mask = (image > threshold) | |
self.mask = self.mask.astype(np.uint8) * 255 | |
self.mask = cv2.cvtColor(self.mask, cv2.COLOR_BGR2GRAY).astype(np.uint8) | |
contours, hierarchy = cv2.findContours(self.mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) | |
self.contour = contours[np.argmax([cv2.contourArea(c) for c in contours])].squeeze() | |
self.x_max, self.y_max = np.amax(self.contour, axis=0) | |
# z = np.zeros_like(segmented_img) | |
# cv2.drawContours(z, [contours], -1, 255, thickness=1) | |
# return cv2.resize(z, original_size, interpolation=cv2.INTER_NEAREST) | |
return self.contour | |
def setup_inequalities(self, contour): | |
shape = contour.shape | |
X = matrix(contour, (shape[0], 2)) | |
m = X.size[0] - 1 | |
# Inequality description G*x <= h with h = 1 | |
# G, h = matrix(0.0, (m, 2)), matrix(0.0, (m, 1)) | |
# G = (X[:m, :] - X[1:, :]) * matrix([0., -1., 1., 0.], (2, 2)) | |
# h = (G * X.T)[::m + 1] | |
# G = mul(h[:, [0, 0]] ** -1, G) | |
# h = matrix(1.0, (m, 1)) | |
return X, m | |
def solve(self, X, m): | |
sol = solvers.cp(partial(F, X, m)) | |
self.A = matrix(sol['x'][[0, 1, 1, 2]], (2, 2)) | |
self.b = sol['x'][3:] | |
# def annotate( | |
# self, slot: int, frame_num: int, | |
# image, camera_projection: Projection, | |
# thickness=1, with_label=True): | |
def annotate( | |
self, canvas_image, slot: int, grading_image: GradingImage, | |
thickness=1, with_label=True): | |
# Ellipsoid in the form { x | || L' * (x-c) ||_2 <= 1 } | |
L = +self.A | |
lapack.potrf(L) | |
c = +self.b | |
lapack.potrs(L, c) | |
# 1000 points on the unit circle | |
nopts = 1000 | |
angles = matrix([a * 2.0 * pi / nopts for a in range(nopts)], (1, nopts)) | |
circle = matrix(0.0, (2, nopts)) | |
circle[0, :], circle[1, :] = cos(angles), sin(angles) | |
# ellipse = L^-T * circle + c | |
blas.trsm(L, circle, transA='T') | |
ellipse = circle + c[:, nopts * [0]] | |
# ellipse2 = 0.5 * circle + c[:, nopts*[0]] | |
# image = self.mask * 127 | |
x = ellipse[0, :].T | |
y = ellipse[1, :].T | |
for i, j in zip(x, y): | |
i *= self.x_max | |
j *= self.y_max | |
# i += self.x_max | |
# j += self.y_max | |
# image = cv2.circle(image, (i, j), radius=10, color=(0, 0, 255), thickness=-1) | |
image = cv2.circle(canvas_image, (int(i), -int(j)), radius=1, color=(0, 255, 0), thickness=-1) | |
for x, y in self.contour: | |
# image = cv2.circle(image, (int(x * self.x_max), -int(y * self.y_max)), radius=3, color=(0, 0, 255), thickness=-1) | |
image = cv2.circle(image, (x, y), radius=3, color=(0, 0, 255), thickness=-1) | |
if hasattr(self, 'stacked'): | |
self.stacked = np.hstack((self.stacked, image)) | |
print(self.stacked.shape) | |
self.i += 1 | |
else: | |
self.stacked = image | |
self.i = 1 | |
height, width, _ = image.shape | |
# self.video = cv2.VideoWriter('./video-4.avi', cv2.VideoWriter_fourcc(*"DIVX"), 1, (width, height)) | |
self.video = cv2.VideoWriter('./video-4-4.mp4', cv2.VideoWriter_fourcc(*'MP4V'), 4, (width, height)) | |
# cv2.imwrite(f"./{self.i}.jpg", self.stacked) | |
cv2.imshow("Stacked", self.stacked) | |
self.video.write(image) | |
return image | |
def on_new_image( | |
self, | |
camera_index: int, | |
new_grading_image: GradingImage, | |
image_buffers: Sequence[Sequence[GradingImage]]): | |
self._counter[camera_index] += 1 | |
# Circle markers on 10th frame | |
image = new_grading_image.image.copy() | |
contour = self.get_cotour(image) | |
print(image.shape) | |
self.x_max = 1536 // 4 | |
self.y_max = 2048 // 4 | |
# self.x_max = 1500 | |
# self.y_max = 2000 | |
# self.x_max = int(np.average(contour[:, 0])) | |
# self.y_max = int(np.average(contour[:, 1])) | |
# print("max stuff", self.x_max, self.y_max) | |
# self.x_max = 100 | |
# self.y_max = 100 | |
contour = np.array(list(zip(contour[:, 0] / self.x_max, -contour[:, 1] / self.y_max))) | |
# contour = np.array(list(zip(contour[:, 0] - self.x_max, -(contour[:, 1] - self.y_max)))) | |
# contour = np.array(list(zip(contour[:, 0], -contour[:, 1]))) | |
X, m = self.setup_inequalities(contour) | |
self.solve(X, m) | |
if camera_index == 0: | |
object_profile_reply = SubscribeObjectProfilesReply() | |
object_profile_reply.object_profile.id = self._counter[0] | |
object_profile_reply.object_profile.configuration_id = \ | |
self.session.configuration.id | |
self.session.publish(object_profile_reply) | |
def on_close(self): | |
if threading.current_thread() is threading.main_thread(): | |
logger.debug("Waiting for a keypress (while focused on an Open CV" | |
" window) to exit ...") | |
cv2.waitKey(0) | |
cv2.destroyAllWindows() | |
self.video.release() | |
class EllipsoidProductHandlerFactory(ProductHandlerFactory): | |
@staticmethod | |
def product_code() -> str: | |
return 'ellipsoid' | |
@staticmethod | |
def create(session: ProductHandlerSession) -> 'ProductHandler': | |
return EllipsoidProductHandler(session) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment