Skip to content

Instantly share code, notes, and snippets.

@ghamarian
Created March 29, 2021 08:49
Show Gist options
  • Save ghamarian/291af52ea6ffc9a16f69dfbad2789a7e to your computer and use it in GitHub Desktop.
Save ghamarian/291af52ea6ffc9a16f69dfbad2789a7e to your computer and use it in GitHub Desktop.
import itertools
import logging
import os
import os.path
import threading
from functools import partial
from typing import List, Sequence
import cv2
from math import log, pi
from cvxopt import blas, lapack, solvers, matrix, sqrt, mul, cos, sin
import numpy as np
from gs.proto import Configuration, Problem, SubscribeObjectProfilesReply
from gs_vision.scene import GradingImage, Projection
from .base import ProductHandler, ProductHandlerFactory, ProductHandlerSession
logger = logging.getLogger(__name__)
dev_data_dir = os.path.join(
os.path.abspath(
os.path.dirname(
os.path.dirname(
os.path.dirname(os.path.dirname(__file__))))),
'data', 'dev')
# Load an image
# image = cv2.imread("./pear3.png")
SIZE = (512, 384)
RECT = (0, 100, 450, 250)
def grabcut(img):
mask = np.zeros(SIZE[::-1], np.uint8)
bgdModel = np.zeros((1, 65))
fgdModel = np.zeros((1, 65))
orig_size = img.shape[:2][::-1]
img = cv2.resize(img, SIZE)
cv2.grabCut(img, mask, RECT, bgdModel, fgdModel,
2, mode=cv2.GC_INIT_WITH_RECT)
out_mask = np.where((mask == 2) | (mask == 0), 0, 2).astype('uint8')
return cv2.resize(out_mask, orig_size, interpolation=cv2.INTER_NEAREST)
# Loewner-John ellipsoid
#
# minimize log det A^-1
# subject to xk'*A*xk - 2*xk'*b + b'*A^1*b <= 1, k=1,...,m
#
# 5 variables x = (A[0,0], A[1,0], A[1,1], b[0], b[1])
def F(X, m, x=None, z=None):
if x is None:
return m, matrix([1.0, 0.0, 1.0, 0.0, 0.0])
# Factor A as A = L*L'. Compute inverse B = A^-1.
A = matrix([x[0], x[1], x[1], x[2]], (2, 2))
L = +A
try:
lapack.potrf(L)
except:
return None
B = +L
lapack.potri(B)
B[0, 1] = B[1, 0]
# f0 = -log det A
f = matrix(0.0, (m+1, 1))
f[0] = -2.0 * (log(L[0, 0]) + log(L[1, 1]))
# fk = xk'*A*xk - 2*xk'*b + b*A^-1*b - 1
# = (xk - c)' * A * (xk - c) - 1 where c = A^-1*b
c = x[3:]
lapack.potrs(L, c)
for k in range(m):
f[k+1] = (X[k, :].T - c).T * A * (X[k, :].T - c) - 1.0
# gradf0 = (-A^-1, 0) = (-B, 0)
Df = matrix(0.0, (m+1, 5))
Df[0, 0], Df[0, 1], Df[0, 2] = -B[0, 0], -2.0*B[1, 0], -B[1, 1]
# gradfk = (xk*xk' - A^-1*b*b'*A^-1, 2*(-xk + A^-1*b))
# = (xk*xk' - c*c', 2*(-xk+c))
Df[1:, 0] = X[:m, 0]**2 - c[0]**2
Df[1:, 1] = 2.0 * (mul(X[:m, 0], X[:m, 1]) - c[0]*c[1])
Df[1:, 2] = X[:m, 1]**2 - c[1]**2
Df[1:, 3] = 2.0 * (-X[:m, 0] + c[0])
Df[1:, 4] = 2.0 * (-X[:m, 1] + c[1])
if z is None:
return f, Df
# hessf0(Y, y) = (A^-1*Y*A^-1, 0) = (B*YB, 0)
H0 = matrix(0.0, (5, 5))
H0[0, 0] = B[0, 0]**2
H0[1, 0] = 2.0 * B[0, 0] * B[1, 0]
H0[2, 0] = B[1, 0]**2
H0[1, 1] = 2.0 * (B[0, 0] * B[1, 1] + B[1, 0]**2)
H0[2, 1] = 2.0 * B[1, 0] * B[1, 1]
H0[2, 2] = B[1, 1]**2
# hessfi(Y, y)
# = ( A^-1*Y*A^-1*b*b'*A^-1 + A^-1*b*b'*A^-1*Y*A^-1
# - A^-1*y*b'*A^-1 - A^-1*b*y'*A^-1,
# -2*A^-1*Y*A^-1*b + 2*A^-1*y )
# = ( B*Y*c*c' + c*c'*Y*B - B*y*c' - c*y'*B, -2*B*Y*c + 2*B*y )
# = ( B*(Y*c-y)*c' + c*(Y*c-y)'*B, -2*B*(Y*c - y) )
H1 = matrix(0.0, (5, 5))
H1[0, 0] = 2.0 * c[0]**2 * B[0, 0]
H1[1, 0] = 2.0 * (c[0] * c[1] * B[0, 0] + c[0]**2 * B[1, 0])
H1[2, 0] = 2.0 * c[0] * c[1] * B[1, 0]
H1[3:, 0] = -2.0 * c[0] * B[:, 0]
H1[1, 1] = 2.0 * c[0]**2 * B[1, 1] + 4.0 * c[0]*c[1]*B[1, 0] + \
2.0 * c[1]**2 + B[0, 0]
H1[2, 1] = 2.0 * (c[1]**2 * B[1, 0] + c[0]*c[1]*B[1, 1])
H1[3:, 1] = -2.0 * B * c[[1, 0]]
H1[2, 2] = 2.0 * c[1]**2 * B[1, 1]
H1[3:, 2] = -2.0 * c[1] * B[:, 1]
H1[3:, 3:] = 2*B
return f, Df, z[0]*H0 + sum(z[1:])*H1
class EllipsoidProductHandler(ProductHandler):
def on_new_configuration(self, conf: Configuration) -> List[Problem]:
self._counter = [0] * len(self.session.scene.image_sources)
os.makedirs(dev_data_dir, exist_ok=True)
return []
def findContour(self, img):
contours, hierarchy = cv2.findContours(img, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
return contours[np.argmax([cv2.contourArea(c) for c in contours])]
def get_cotour(self, image):
# self.mask = grabcut(image)
# contours, hierarchy = cv2.findContours(self.mask, 1, 2)
# contour_idx = np.argmax([con.shape[0] for con in contours])
# my_contour = contours[contour_idx].squeeze()
# self.x_max, self.y_max = np.amax(my_contour, axis=0)
# my_contour = np.array(list(zip(my_contour[:, 0] / self.x_max, -my_contour[:, 1] / self.y_max)))
#
# my_contour = np.append(my_contour, my_contour[0]).reshape(-1, 2)
# self.contour = my_contour
# return my_contour
# original_size = (image.shape[1], image.shape[0])
# new_size = (640, 480)
# image = cv2.resize(image, new_size)
threshold = 50
self.mask = (image > threshold)
self.mask = self.mask.astype(np.uint8) * 255
self.mask = cv2.cvtColor(self.mask, cv2.COLOR_BGR2GRAY).astype(np.uint8)
contours, hierarchy = cv2.findContours(self.mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
self.contour = contours[np.argmax([cv2.contourArea(c) for c in contours])].squeeze()
self.x_max, self.y_max = np.amax(self.contour, axis=0)
# z = np.zeros_like(segmented_img)
# cv2.drawContours(z, [contours], -1, 255, thickness=1)
# return cv2.resize(z, original_size, interpolation=cv2.INTER_NEAREST)
return self.contour
def setup_inequalities(self, contour):
shape = contour.shape
X = matrix(contour, (shape[0], 2))
m = X.size[0] - 1
# Inequality description G*x <= h with h = 1
# G, h = matrix(0.0, (m, 2)), matrix(0.0, (m, 1))
# G = (X[:m, :] - X[1:, :]) * matrix([0., -1., 1., 0.], (2, 2))
# h = (G * X.T)[::m + 1]
# G = mul(h[:, [0, 0]] ** -1, G)
# h = matrix(1.0, (m, 1))
return X, m
def solve(self, X, m):
sol = solvers.cp(partial(F, X, m))
self.A = matrix(sol['x'][[0, 1, 1, 2]], (2, 2))
self.b = sol['x'][3:]
# def annotate(
# self, slot: int, frame_num: int,
# image, camera_projection: Projection,
# thickness=1, with_label=True):
def annotate(
self, canvas_image, slot: int, grading_image: GradingImage,
thickness=1, with_label=True):
# Ellipsoid in the form { x | || L' * (x-c) ||_2 <= 1 }
L = +self.A
lapack.potrf(L)
c = +self.b
lapack.potrs(L, c)
# 1000 points on the unit circle
nopts = 1000
angles = matrix([a * 2.0 * pi / nopts for a in range(nopts)], (1, nopts))
circle = matrix(0.0, (2, nopts))
circle[0, :], circle[1, :] = cos(angles), sin(angles)
# ellipse = L^-T * circle + c
blas.trsm(L, circle, transA='T')
ellipse = circle + c[:, nopts * [0]]
# ellipse2 = 0.5 * circle + c[:, nopts*[0]]
# image = self.mask * 127
x = ellipse[0, :].T
y = ellipse[1, :].T
for i, j in zip(x, y):
i *= self.x_max
j *= self.y_max
# i += self.x_max
# j += self.y_max
# image = cv2.circle(image, (i, j), radius=10, color=(0, 0, 255), thickness=-1)
image = cv2.circle(canvas_image, (int(i), -int(j)), radius=1, color=(0, 255, 0), thickness=-1)
for x, y in self.contour:
# image = cv2.circle(image, (int(x * self.x_max), -int(y * self.y_max)), radius=3, color=(0, 0, 255), thickness=-1)
image = cv2.circle(image, (x, y), radius=3, color=(0, 0, 255), thickness=-1)
if hasattr(self, 'stacked'):
self.stacked = np.hstack((self.stacked, image))
print(self.stacked.shape)
self.i += 1
else:
self.stacked = image
self.i = 1
height, width, _ = image.shape
# self.video = cv2.VideoWriter('./video-4.avi', cv2.VideoWriter_fourcc(*"DIVX"), 1, (width, height))
self.video = cv2.VideoWriter('./video-4-4.mp4', cv2.VideoWriter_fourcc(*'MP4V'), 4, (width, height))
# cv2.imwrite(f"./{self.i}.jpg", self.stacked)
cv2.imshow("Stacked", self.stacked)
self.video.write(image)
return image
def on_new_image(
self,
camera_index: int,
new_grading_image: GradingImage,
image_buffers: Sequence[Sequence[GradingImage]]):
self._counter[camera_index] += 1
# Circle markers on 10th frame
image = new_grading_image.image.copy()
contour = self.get_cotour(image)
print(image.shape)
self.x_max = 1536 // 4
self.y_max = 2048 // 4
# self.x_max = 1500
# self.y_max = 2000
# self.x_max = int(np.average(contour[:, 0]))
# self.y_max = int(np.average(contour[:, 1]))
# print("max stuff", self.x_max, self.y_max)
# self.x_max = 100
# self.y_max = 100
contour = np.array(list(zip(contour[:, 0] / self.x_max, -contour[:, 1] / self.y_max)))
# contour = np.array(list(zip(contour[:, 0] - self.x_max, -(contour[:, 1] - self.y_max))))
# contour = np.array(list(zip(contour[:, 0], -contour[:, 1])))
X, m = self.setup_inequalities(contour)
self.solve(X, m)
if camera_index == 0:
object_profile_reply = SubscribeObjectProfilesReply()
object_profile_reply.object_profile.id = self._counter[0]
object_profile_reply.object_profile.configuration_id = \
self.session.configuration.id
self.session.publish(object_profile_reply)
def on_close(self):
if threading.current_thread() is threading.main_thread():
logger.debug("Waiting for a keypress (while focused on an Open CV"
" window) to exit ...")
cv2.waitKey(0)
cv2.destroyAllWindows()
self.video.release()
class EllipsoidProductHandlerFactory(ProductHandlerFactory):
@staticmethod
def product_code() -> str:
return 'ellipsoid'
@staticmethod
def create(session: ProductHandlerSession) -> 'ProductHandler':
return EllipsoidProductHandler(session)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment