Last active
October 25, 2017 19:55
-
-
Save crackwitz/23ed1abe393b61fd106edff5efbb2f10 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
based on the MOSSE tracking example from OpenCV | |
[1] David S. Bolme et al. "Visual Object Tracking using Adaptive Correlation Filters" | |
http://www.cs.colostate.edu/~bolme/publications/Bolme2010Tracking.pdf | |
''' | |
import numpy as np | |
import math | |
from math import log as ln | |
import cv2 | |
def draw_str(dst, pos, s): | |
(x, y) = pos | |
cv2.putText(dst, s, (x+1, y+1), cv2.FONT_HERSHEY_PLAIN, 2.0, (0, 0, 0), thickness = 4, lineType=cv2.LINE_AA) | |
cv2.putText(dst, s, (x, y), cv2.FONT_HERSHEY_PLAIN, 2.0, (255, 255, 255), thickness=2, lineType=cv2.LINE_AA) | |
def rnd_warp(a, magnitude=0.1): | |
h, w = a.shape[:2] | |
T = np.zeros((2, 3)) | |
ang = (np.random.rand()-0.5)*magnitude | |
c, s = np.cos(ang), np.sin(ang) | |
T[:2, :2] = [[c,-s], [s, c]] | |
T[:2, :2] += (np.random.rand(2, 2) - 0.5)*magnitude | |
c = ((w-1)*0.5, (h-1)*0.5) | |
T[:,2] = c - np.dot(T[:2, :2], c) | |
return cv2.warpAffine(a, T, (w, h), flags=cv2.INTER_LANCZOS4, borderMode=cv2.BORDER_REPLICATE) | |
def divSpec(A, B): | |
Ar, Ai = A[...,0], A[...,1] | |
Br, Bi = B[...,0], B[...,1] | |
C = (Ar+1j*Ai)/(Br+1j*Bi) | |
C = np.dstack([np.real(C), np.imag(C)]).copy() | |
return C | |
eps = 1e-5 | |
def vertex_parabola(y1, y2, y3): | |
return 0.5 * (y1 - y3) / (y3 - 2*y2 + y1) | |
def vertex_gaussian(y1, y2, y3): | |
# A Comparison of Algorithms for Subpixel Peak Detection (1996), by R. B. Fisher , D. K. Naidu | |
return 0.5 * (ln(y1) - ln(y3)) / (ln(y1) - 2*ln(y2) + ln(y3)) | |
def fix8(*s): | |
return tuple(int(round(x * 256)) for x in s) | |
def getRectSubPixScaled(src, center, patchSize, scale=None): | |
if scale is None: | |
return cv2.getRectSubPix(src, center=center, patchSize=patchSize) | |
else: | |
(cx,cy) = center | |
(w,h) = patchSize | |
# TODO: confirm this matrix does the same | |
# pixel centers are integer coordinates | |
# pixel borders are halves | |
# only for odd patch size should the center be integral | |
M = np.float32([ | |
[scale, 0.0, cx - (w-1)*0.5 * scale], | |
[ 0.0, scale, cy - (h-1)*0.5 * scale]]) | |
return cv2.warpAffine( | |
src=src, | |
M=M, | |
dsize=patchSize, | |
flags=cv2.WARP_INVERSE_MAP | cv2.INTER_AREA, | |
borderMode=cv2.BORDER_REPLICATE) | |
class MOSSE: | |
def __init__(self, patchSize): | |
w,h = patchSize | |
w,h = self.size = np.int32([cv2.getOptimalDFTSize(int(w)), cv2.getOptimalDFTSize(int(h))]) | |
self.win = cv2.createHanningWindow((w,h), cv2.CV_32F) | |
def init(self, frame, center, warp_iter=128, warp_magnitude=0.1): | |
x,y = self.pos = np.float32(center) | |
w,h = self.size | |
g = np.zeros((h,w), np.float32) | |
g[(h-1)//2:(h+2)//2, (w-1)//2:(w+2)//2] = 1 # right in the center | |
g = cv2.GaussianBlur(g, (-1, -1), 2.0) | |
g /= g.max() | |
patch = cv2.getRectSubPix(frame, center=(x,y), patchSize=(w,h)) | |
self.last_img = patch | |
self.G = cv2.dft(g, flags=cv2.DFT_COMPLEX_OUTPUT) | |
self.H1 = np.zeros_like(self.G) | |
self.H2 = np.zeros_like(self.G) | |
A = cv2.dft(self.preprocess(patch), flags=cv2.DFT_COMPLEX_OUTPUT) | |
self.H1 += cv2.mulSpectrums(self.G, A, 0, conjB=True) | |
self.H2 += cv2.mulSpectrums( A, A, 0, conjB=True) | |
for i in range(warp_iter): | |
a = self.preprocess(rnd_warp(patch, magnitude=warp_magnitude)) | |
A = cv2.dft(a, flags=cv2.DFT_COMPLEX_OUTPUT) | |
self.H1 += cv2.mulSpectrums(self.G, A, 0, conjB=True) | |
self.H2 += cv2.mulSpectrums( A, A, 0, conjB=True) | |
self.update_kernel() | |
self.good = True | |
self.psr = np.inf | |
self.last_resp = None | |
return self | |
def track(self, frame, pos=None): | |
(x,y) = self.pos | |
(w,h) = self.size | |
if pos is not None: # override position? | |
(x,y) = pos | |
patch = cv2.getRectSubPix(frame, center=(x,y), patchSize=(w,h)) | |
if len(patch.shape) > 2: patch = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY) | |
self.last_img = patch | |
patch = self.preprocess(patch) | |
self.last_resp, (dx, dy), self.psr = self.correlate(patch) | |
self.good = (self.psr > 8.0) | |
return (self.psr, np.float32([dx, dy])) | |
def adapt(self, frame, pos=None, rate=0.125): | |
if rate <= 0: return | |
if pos is not None: | |
self.pos = (x,y) = pos | |
else: | |
(x,y) = self.pos | |
(w, h) = self.size | |
patch = cv2.getRectSubPix(frame, center=(x,y), patchSize=(w,h)) | |
if len(patch.shape) > 2: patch = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY) | |
self.last_img = patch | |
patch = self.preprocess(patch) | |
A = cv2.dft(patch, flags=cv2.DFT_COMPLEX_OUTPUT) | |
H1 = cv2.mulSpectrums(self.G, A, 0, conjB=True) | |
H2 = cv2.mulSpectrums( A, A, 0, conjB=True) | |
self.H1 += (H1-self.H1) * rate | |
self.H2 += (H2-self.H2) * rate | |
self.update_kernel() | |
def update(self, frame, rate=0.125): | |
psr,delta = self.track(frame) | |
if not self.good: | |
return | |
self.adapt(frame, pos=self.pos+delta, rate=rate) | |
def preprocess(self, patch): | |
patch = np.log(np.float32(patch)+1.0) | |
patch = (patch-patch.mean()) / (patch.std()+eps) | |
return patch*self.win | |
def correlate(self, img): | |
C = cv2.mulSpectrums(cv2.dft(img, flags=cv2.DFT_COMPLEX_OUTPUT), self.H, 0, conjB=True) | |
resp = cv2.idft(C, flags=cv2.DFT_SCALE | cv2.DFT_REAL_OUTPUT) | |
resp -= resp.mean() | |
h, w = resp.shape | |
_, mval, _, (mx, my) = cv2.minMaxLoc(resp) | |
fmx = mx + vertex_gaussian(*resp[my,mx-1:mx+2]) if (1 <= mx <= w-2) else mx | |
fmy = my + vertex_gaussian(*resp[my-1:my+2,mx]) if (1 <= my <= h-2) else my | |
side_resp = resp.copy() | |
cv2.rectangle(side_resp, (mx-5, my-5), (mx+5, my+5), 0, -1) | |
smean, sstd = side_resp.mean(), side_resp.std() | |
psr = (mval-smean) / (sstd+eps) | |
return resp, (fmx - (w-1)*0.5, fmy - (h-1)*0.5), psr | |
def update_kernel(self): | |
self.H = divSpec(self.H1, self.H2) | |
self.H[...,1] *= -1 | |
@property | |
def state_vis(self): | |
f = cv2.idft(self.H, flags=cv2.DFT_SCALE | cv2.DFT_REAL_OUTPUT ) | |
h, w = f.shape | |
f = np.roll(f, -h//2, 0) | |
f = np.roll(f, -w//2, 1) | |
kernel = np.uint8( (f-f.min()) / f.ptp()*255 ) | |
if self.last_resp is not None: | |
resp = self.last_resp | |
resp = np.uint8(np.clip(resp/resp.max(), 0, 1)*255) | |
vis = np.hstack([self.last_img, kernel, resp]) | |
else: | |
vis = np.hstack([self.last_img, kernel]) | |
return vis | |
def draw_state(self, vis, scale=1): | |
(x, y), (w, h) = self.pos, self.size | |
x *= scale | |
y *= scale | |
w *= scale | |
h *= scale | |
x1, y1, x2, y2 = (x-0.5*w), (y-0.5*h), (x+0.5*w), (y+0.5*h) | |
cv2.rectangle(vis, | |
fix8(x1, y1), | |
fix8(x2, y2), | |
(0, 0, 255), thickness=2, shift=8) | |
if self.good: | |
cv2.circle(vis, | |
fix8(int(x), int(y)), | |
fix8(4)[0], | |
(0, 0, 255), -1, shift=8) | |
else: | |
cv2.line(vis, | |
fix8(x1, y1), | |
fix8(x2, y2), | |
(0, 0, 255), thickness=2, shift=8) | |
cv2.line(vis, | |
fix8(x2, y1), | |
fix8(x1, y2), | |
(0, 0, 255), thickness=2, shift=8) | |
draw_str(vis, (int(x1), int(y2+32)), 'PSR: {:.0f} dB'.format(10*math.log10(self.psr))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment