Created
December 21, 2017 08:06
-
-
Save crearo/8f82c24e52a0b4d1f6879d9505002748 to your computer and use it in GitHub Desktop.
Simple software stabilization with linear smoothing filter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Performs software stabilization using optical flow + linear smoothing. | |
Vision algos used : | |
- KLT for flow tracking | |
- Harris corner detection to detect points to be tracked in each frame | |
This is the Python translation I wrote of the original .cpp code written by nghiaho.com/?p=2093 | |
''' | |
import cv2 | |
import numpy as np | |
import sys | |
SMOOTHENING_RADIUS = 10 | |
class TransformParam: | |
def __init__(self, timestamp, dx, dy, da): | |
self.timestamp = timestamp | |
self.dx = dx | |
self.dy = dy | |
self.da = da | |
class Trajectory: | |
def __init__(self, x, y, a): | |
self.x = x | |
self.y = y | |
self.a = a | |
def stabilize(video_path): | |
out_prev_cur_transform = open('./data/py_prev_to_cur_transform.txt', 'w') | |
out_trajectory = open('./data/py_trajectory.txt', 'w') | |
out_smoothed_trajectory = open('./data/py_smoothed_trajectory.txt', 'w') | |
out_new_transform = open('./data/py_new_prev_to_cur_transformation.txt', 'w') | |
out_prev_cur_transform.truncate() | |
out_trajectory.truncate() | |
out_smoothed_trajectory.truncate() | |
out_new_transform.truncate() | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise AssertionError("Error opening video stream or file") | |
ret, prev_frame = cap.read() | |
prev_grey = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
frame_num = 1 | |
prev_to_cur_transform = [] | |
while cap.isOpened(): | |
ret, cur_frame = cap.read() | |
if not ret: | |
break | |
cur_grey = cv2.cvtColor(cur_frame, cv2.COLOR_BGR2GRAY) | |
prev_corners = cv2.goodFeaturesToTrack(prev_grey, 200, 0.01, 30) | |
cur_corners, status, err = cv2.calcOpticalFlowPyrLK(prev_grey, cur_grey, prev_corners, None) | |
good_cur = cur_corners[status==1] # todo figure out why this works | |
good_prev = prev_corners[status==1] | |
transform = cv2.estimateRigidTransform(good_prev, good_cur, False) | |
if transform is None: | |
transform = np.zeros(shape=(3,3)) | |
dx = transform[0][2] | |
dy = transform[1][2] | |
da = 0 | |
timestamp = cap.get(cv2.CAP_PROP_POS_MSEC) | |
prev_to_cur_transform.append(TransformParam(timestamp, dx, dy, da)) | |
prev_frame = cur_frame | |
prev_grey = cur_grey | |
out_prev_cur_transform.write("%d %.5f %.5f %.5f\n" % (frame_num, dx, dy, da)) | |
print ("Frame:%d/%d - good optical flow: %d" % (frame_num, total_frames, len(prev_corners))) | |
frame_num += 1 | |
# Step 2 - Accumulate the transformations to get the image trajectory | |
a = 0 | |
x = 0 | |
y = 0 | |
trajectory = [] | |
frame_num = 1 | |
for transform in prev_to_cur_transform: | |
x += transform.dx | |
y += transform.dy | |
a += transform.da | |
trajectory.append(Trajectory(x,y,a)) | |
out_trajectory.write("%d %.5f %.5f %.5f\n" % (frame_num, x, y, a)) | |
frame_num += 1 | |
# Step 3 - Smooth out the trajectory using an averaging window | |
smoothed_trajectory = [] | |
frame_num = 1 | |
for i in range(0, len(trajectory)): | |
sumx, sumy, suma = 0,0,0 | |
count = 0 | |
for j in xrange(-SMOOTHENING_RADIUS, SMOOTHENING_RADIUS + 1): | |
if i+j >= 0 and i+j < len(trajectory): | |
sumx += trajectory[i+j].x | |
sumy += trajectory[i+j].y | |
suma += trajectory[i+j].a | |
count += 1 | |
avgx = sumx / count | |
avgy = sumy / count | |
avga = suma / count | |
smoothed_trajectory.append(Trajectory(avgx, avgy,avga)) | |
out_smoothed_trajectory.write("%d %.5f %.5f %.5f\n" % (frame_num, avgx, avgy, avga)) | |
frame_num += 1 | |
# Step 4 - Generate new set of previous to current transform, such that the trajectory ends up being the same as the smoothed trajectory | |
new_prev_to_cur_transform = [] | |
frame_num = 1 | |
for i in range(0, len(prev_to_cur_transform)): | |
diffx = smoothed_trajectory[i].x - trajectory[i].x | |
diffy = smoothed_trajectory[i].y - trajectory[i].y | |
diffa = smoothed_trajectory[i].a - trajectory[i].a | |
dx = prev_to_cur_transform[i].dx + diffx | |
dy = prev_to_cur_transform[i].dy + diffy | |
da = prev_to_cur_transform[i].da + diffa | |
timestamp = prev_to_cur_transform[i].timestamp | |
new_prev_to_cur_transform.append(TransformParam(timestamp, dx, dy, da)) | |
out_new_transform.write("%d %f %f %f\n" % (frame_num, dx, dy, da)) | |
frame_num += 1 | |
# Step 5 - Apply the new transformation to the video | |
cap.set(cv2.CAP_PROP_POS_FRAMES, 0); | |
frame_num = 0 | |
while cap.isOpened(): | |
ret, raw_frame = cap.read() | |
if not ret or frame_num == total_frames - 1: | |
break | |
affine_mat = np.zeros(shape=(2,3)) | |
affine_mat[0][0] = 1 | |
affine_mat[1][1] = 1 | |
affine_mat[0][2] = new_prev_to_cur_transform[frame_num].dx | |
affine_mat[1][2] = new_prev_to_cur_transform[frame_num].dy | |
rows, cols = raw_frame.shape[:2] | |
stab_frame = cv2.warpAffine(raw_frame, affine_mat, (cols, rows)) | |
frame_num +=1 | |
cv2.imshow('img', stab_frame) | |
if cv2.waitKey(20) & 0xFF == 27: | |
break | |
out_prev_cur_transform.close() | |
out_trajectory.close() | |
out_smoothed_trajectory.close() | |
out_new_transform.close() | |
cv2.destroyAllWindows() | |
cap.release() | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
raise ValueError('[video_file.mp4]') | |
stabilize(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment