Created
March 3, 2022 12:31
-
-
Save Erol444/5946c78e5f78f30d3c06ce6692cf6efc to your computer and use it in GitHub Desktop.
DepthAI feature tracking from video file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import cv2 | |
import depthai as dai | |
from collections import deque | |
import numpy as np | |
# Add path here! | |
cap = cv2.VideoCapture("/path/to/video.mp4") | |
class FeatureTrackerDrawer: | |
lineColor = (200, 0, 200) | |
pointColor = (0, 0, 255) | |
circleRadius = 2 | |
maxTrackedFeaturesPathLength = 30 | |
# for how many frames the feature is tracked | |
trackedFeaturesPathLength = 10 | |
trackedIDs = None | |
trackedFeaturesPath = None | |
def onTrackBar(self, val): | |
FeatureTrackerDrawer.trackedFeaturesPathLength = val | |
pass | |
def trackFeaturePath(self, features): | |
newTrackedIDs = set() | |
for currentFeature in features: | |
currentID = currentFeature.id | |
newTrackedIDs.add(currentID) | |
if currentID not in self.trackedFeaturesPath: | |
self.trackedFeaturesPath[currentID] = deque() | |
path = self.trackedFeaturesPath[currentID] | |
path.append(currentFeature.position) | |
while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)): | |
path.popleft() | |
self.trackedFeaturesPath[currentID] = path | |
featuresToRemove = set() | |
for oldId in self.trackedIDs: | |
if oldId not in newTrackedIDs: | |
featuresToRemove.add(oldId) | |
for id in featuresToRemove: | |
self.trackedFeaturesPath.pop(id) | |
self.trackedIDs = newTrackedIDs | |
def drawFeatures(self, img): | |
cv2.setTrackbarPos(self.trackbarName, self.windowName, FeatureTrackerDrawer.trackedFeaturesPathLength) | |
for featurePath in self.trackedFeaturesPath.values(): | |
path = featurePath | |
for j in range(len(path) - 1): | |
src = (int(path[j].x), int(path[j].y)) | |
dst = (int(path[j + 1].x), int(path[j + 1].y)) | |
cv2.line(img, src, dst, self.lineColor, 1, cv2.LINE_AA, 0) | |
j = len(path) - 1 | |
cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, self.pointColor, -1, cv2.LINE_AA, 0) | |
def __init__(self, trackbarName, windowName): | |
self.trackbarName = trackbarName | |
self.windowName = windowName | |
cv2.namedWindow(windowName) | |
cv2.createTrackbar(trackbarName, windowName, FeatureTrackerDrawer.trackedFeaturesPathLength, FeatureTrackerDrawer.maxTrackedFeaturesPathLength, self.onTrackBar) | |
self.trackedIDs = set() | |
self.trackedFeaturesPath = dict() | |
# Create pipeline | |
pipeline = dai.Pipeline() | |
# Define sources and outputs | |
xinFrame = pipeline.createXLinkIn() | |
xinFrame.setStreamName("inFrame") | |
xinTrackedFeaturesConfig = pipeline.create(dai.node.XLinkIn) | |
xinTrackedFeaturesConfig.setStreamName("trackedFeaturesConfig") | |
featureTrackerColor = pipeline.create(dai.node.FeatureTracker) | |
xinFrame.out.link(featureTrackerColor.inputImage) | |
xinTrackedFeaturesConfig.out.link(featureTrackerColor.inputConfig) | |
xoutTrackedFeaturesColor = pipeline.create(dai.node.XLinkOut) | |
xoutTrackedFeaturesColor.setStreamName("trackedFeaturesColor") | |
featureTrackerColor.outputFeatures.link(xoutTrackedFeaturesColor.input) | |
# By default the least mount of resources are allocated | |
# increasing it improves performance | |
numShaves = 2 | |
numMemorySlices = 2 | |
featureTrackerColor.setHardwareResources(numShaves, numMemorySlices) | |
featureTrackerConfig = featureTrackerColor.initialConfig.get() | |
print("Press 's' to switch between Lucas-Kanade optical flow and hardware accelerated motion estimation!") | |
def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray: | |
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten() | |
# Connect to device and start pipeline | |
with dai.Device(pipeline) as device: | |
# Output queues used to receive the results | |
outputFeaturesColorQueue = device.getOutputQueue("trackedFeaturesColor", 8, False) | |
inputFeatureTrackerConfigQueue = device.getInputQueue("trackedFeaturesConfig") | |
inFrameQ = device.getInputQueue("inFrame") | |
colorWindowName = "color" | |
colorFeatureDrawer = FeatureTrackerDrawer("Feature tracking duration (frames)", colorWindowName) | |
while True: | |
if not cap.isOpened(): break | |
ok, frame = cap.read() | |
if not ok: break | |
img = dai.ImgFrame() | |
img.setType(dai.ImgFrame.Type.NV12) | |
frame720p = cv2.resize(frame, (1280, 720)) | |
img.setData(cv2.cvtColor(frame720p, cv2.COLOR_BGR2YUV_YV12)) | |
img.setWidth(1280) | |
img.setHeight(720) | |
inFrameQ.send(img) | |
trackedFeaturesColor = outputFeaturesColorQueue.get().trackedFeatures | |
colorFeatureDrawer.trackFeaturePath(trackedFeaturesColor) | |
colorFeatureDrawer.drawFeatures(frame720p) | |
# Show the frame | |
cv2.imshow(colorWindowName, frame720p) | |
key = cv2.waitKey(1) | |
if key == ord('q'): | |
break | |
elif key == ord('s'): | |
if featureTrackerConfig.motionEstimator.type == dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW: | |
featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.HW_MOTION_ESTIMATION | |
print("Switching to hardware accelerated motion estimation") | |
else: | |
featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW | |
print("Switching to Lucas-Kanade optical flow") | |
cfg = dai.FeatureTrackerConfig() | |
cfg.set(featureTrackerConfig) | |
inputFeatureTrackerConfigQueue.send(cfg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment