Created
February 28, 2022 08:42
-
-
Save Erol444/aa8c3904063eb5a762d8667766aed0a2 to your computer and use it in GitHub Desktop.
DepthAI vehicle speed estimation on OAK - Luxonis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import cv2 | |
import depthai as dai | |
import blobconverter | |
import numpy as np | |
from libraries.depthai_replay import Replay | |
import time | |
from time import monotonic | |
import math | |
import filterpy | |
from filterpy.kalman import KalmanFilter | |
from filterpy.common import Q_discrete_white_noise | |
from scipy.linalg import block_diag | |
labelMap = ['background', 'car'] | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-p', '--path', default="data", type=str, help="Path where to store the captured data") | |
parser.add_argument('-t', '--threshold', default=0.15, type=float, | |
help="Minimum distance the person has to move (across the x/y axis) to be considered a real movement") #------- | |
args = parser.parse_args() | |
# Create Replay object | |
replay = Replay(args.path) | |
# Minimum distance the person has to move (across the x/y axis) to be considered a real movement | |
THRESH_DIST_DELTA = args.threshold #--------------- | |
# Initialize the pipeline. This will create required XLinkIn's and connect them together | |
pipeline, nodes = replay.init_pipeline() | |
# Resize color frames prior to sending them to the device | |
replay.set_resize_color((672, 384)) | |
# Keep aspect ratio when resizing the color frames. This will crop | |
# the color frame to the desired aspect ratio (in our case 672x384) | |
replay.keep_aspect_ratio(True) | |
# Create and configure the network | |
nn = pipeline.createMobileNetSpatialDetectionNetwork() | |
nn.setBoundingBoxScaleFactor(0.3) | |
nn.setDepthLowerThreshold(5000) # at least 5m | |
nn.setDepthUpperThreshold(20000) # 20m | |
nn.setBlobPath(str(blobconverter.from_zoo(name="vehicle-detection-adas-0002", shaves=6))) | |
nn.setConfidenceThreshold(0.5) | |
nn.input.setBlocking(False) | |
#Create and configure Object Tracker | |
objectTracker = pipeline.createObjectTracker() | |
objectTracker.setDetectionLabelsToTrack([1]) # Track cars | |
# possible tracking types: ZERO_TERM_COLOR_HISTOGRAM, ZERO_TERM_IMAGELESS, SHORT_TERM_IMAGELESS, SHORT_TERM_KCF | |
objectTracker.setTrackerType(dai.TrackerType.ZERO_TERM_COLOR_HISTOGRAM) | |
# Take the smallest ID when new object is tracked, possible options: SMALLEST_ID, UNIQUE_ID | |
objectTracker.setTrackerIdAssigmentPolicy(dai.TrackerIdAssigmentPolicy.SMALLEST_ID) | |
# Link required inputs to the Spatial detection network | |
nodes.color.out.link(nn.input) | |
nodes.stereo.setSubpixel(True) | |
nodes.stereo.depth.link(nn.inputDepth) | |
detOut = pipeline.createXLinkOut() | |
detOut.setStreamName("det_out") | |
nn.out.link(detOut.input) | |
depthOut = pipeline.createXLinkOut() | |
depthOut.setStreamName("depth_out") | |
nodes.stereo.disparity.link(depthOut.input) | |
#Link NN outputs to the object tracker | |
nn.passthrough.link(objectTracker.inputTrackerFrame) | |
nn.passthrough.link(objectTracker.inputDetectionFrame) | |
nn.out.link(objectTracker.inputDetections) | |
# Send tracklets to the host -------------- | |
trackerOut = pipeline.createXLinkOut() | |
trackerOut.setStreamName("tracklets") | |
objectTracker.out.link(trackerOut.input) | |
# Send RGB preview frames to the host | |
xlinkOut = pipeline.createXLinkOut() | |
xlinkOut.setStreamName("preview") | |
objectTracker.passthroughTrackerFrame.link(xlinkOut.input) | |
class TextHelper: | |
def __init__(self) -> None: | |
self.bg_color = (0, 0, 0) | |
self.color = (255, 255, 255) | |
self.text_type = cv2.FONT_HERSHEY_SIMPLEX | |
self.line_type = cv2.LINE_AA | |
def putText(self, frame, text, coords): | |
cv2.putText(frame, text, coords, self.text_type, 1.0, self.bg_color, 3, self.line_type) | |
cv2.putText(frame, text, coords, self.text_type, 1.0, self.color, 1, self.line_type) | |
def rectangle(self, frame, bbox): | |
x1,y1,x2,y2 = bbox | |
cv2.rectangle(frame, (x1, y1), (x2, y2), self.bg_color, 3) | |
cv2.rectangle(frame, (x1, y1), (x2, y2), self.color, 1) | |
##Class for tracking cars and estimating the speed of the vehicles | |
class CarTracker: | |
def __init__(self): | |
self.startTime = time.monotonic() | |
self.counter = 0 | |
self.fps = 0 | |
self.frame = None | |
self.color = (255, 0, 0) | |
self.tracking = {} | |
self.car_counter = [0, 0] # [0] = Y axis (up/down), [1] = X axis (left/right) #NOT SURE FOR THIS | |
self.statusMap = { | |
dai.Tracklet.TrackingStatus.NEW : "NEW", | |
dai.Tracklet.TrackingStatus.TRACKED :"TRACKED", | |
dai.Tracklet.TrackingStatus.LOST : "LOST", | |
dai.Tracklet.TrackingStatus.REMOVED: "REMOVED"} | |
self.dictionary = {} | |
self.covarianceMatrix = {} | |
self.objIDs = [] | |
def to_planar(self, arr: np.ndarray, shape: tuple) -> np.ndarray: | |
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten() | |
def tracklet_removed(self, coords1, coords2): | |
deltaX = coords2[0] - coords1[0] | |
deltaY = coords2[1] - coords1[1] | |
if abs(deltaX) > abs(deltaY) and abs(deltaX) > THRESH_DIST_DELTA: | |
direction = "left" if 0 > deltaX else "right" | |
self.car_counter[1] += 1 if 0 > deltaX else -1 | |
print(f"Car moved {direction}") | |
# print("DeltaX: " + str(abs(deltaX))) | |
if abs(deltaY) > abs(deltaX) and abs(deltaY) > THRESH_DIST_DELTA: | |
direction = "up" if 0 > deltaY else "down" | |
self.car_counter[0] += 1 if 0 > deltaY else -1 | |
print(f"Car moved {direction}") | |
# print("DeltaY: " + str(abs(deltaY))) | |
# else: print("Invalid movement") | |
def get_centroid(self, roi): | |
x1 = roi.topLeft().x | |
y1 = roi.topLeft().y | |
x2 = roi.bottomRight().x | |
y2 = roi.bottomRight().y | |
return ((x2-x1)/2+x1, (y2-y1)/2+y1) | |
def euclidean_distance (self, x1, y1, z1, x2, y2, z2): #----------- | |
dx, dy, dz = x1 - x2, y1 - y2, z1 - z2 | |
distance = math.sqrt(dx ** 2 + dy ** 2 + dz ** 2) | |
return distance | |
def check_queues(self, preview, tracklets): | |
#dictionary = {} | |
#TextHelper = TextHelper() #------------- | |
imgFrame = preview.tryGet() | |
track = tracklets.tryGet() | |
if imgFrame is not None: | |
self.counter+=1 | |
current_time = time.monotonic() | |
if (current_time - self.startTime) > 1 : | |
self.fps = self.counter / (current_time - self.startTime) | |
self.counter = 0 | |
self.startTime = current_time | |
self.frame = imgFrame.getCvFrame() | |
if track is not None: | |
trackletsData = track.tracklets | |
for t in trackletsData: | |
#print(t.id) | |
#print(t) | |
roi = t.roi.denormalize(self.frame.shape[1], self.frame.shape[0]) | |
x1 = int(roi.topLeft().x) | |
y1 = int(roi.topLeft().y) | |
x2 = int(roi.bottomRight().x) | |
y2 = int(roi.bottomRight().y) | |
try: | |
label = labelMap[t.label] | |
except: | |
label = t.label | |
# If new tracklet, save its centroid | |
if (t.status == dai.Tracklet.TrackingStatus.NEW): | |
self.tracking[str(t.id)] = self.get_centroid(t.roi) | |
# If tracklet was removed, check the "path" of this traclet | |
if (t.status == dai.Tracklet.TrackingStatus.REMOVED): | |
self.tracklet_removed(self.tracking[str(t.id)], self.get_centroid(t.roi)) | |
x_spatial = "{:.1f}".format(t.spatialCoordinates.x/1000) | |
y_spatial = "{:.1f}".format(t.spatialCoordinates.y/1000) | |
z_spatial = "{:.1f}".format(t.spatialCoordinates.z/1000) | |
#(x2 + 20, y2 - 60) | |
#cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (79, 79, 47), -1) | |
#cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (250, 206, 135), 2) | |
#cv2.putText(self.frame, f"ID: {t.id}", (x1 + 5, y1 - 15), cv2.FONT_HERSHEY_SIMPLEX , 0.75, (250, 206, 135), 2 , cv2.LINE_AA) | |
if(int(t.id) in self.dictionary and int(t.id) in self.objIDs and (t.status == dai.Tracklet.TrackingStatus.TRACKED or t.status == dai.Tracklet.TrackingStatus.NEW or t.status == dai.Tracklet.TrackingStatus.LOST)): | |
values = self.dictionary[int(t.id)] # values array contains the state vector for the Kalman Filter for that ovject | |
print(values) | |
#fps1 = values.pop(0) | |
values = np.array([[values.pop(0), values.pop(0), values.pop(0), values.pop(0), values.pop(0), values.pop(0)]]).T | |
print("SHAPE") | |
print(values.shape) | |
KF = KalmanFilter(dim_x=6, dim_z=3) | |
KF.x = values | |
dt = 1.0 | |
#KF.P = np.eye(6) * 1000. # covariance matrix | |
KF.P = self.covarianceMatrix[int(t.id)] | |
KF.F = np.array([[1, dt, 0, 0, 0, 0], | |
[0, 1, 0, 0, 0 ,0], | |
[0, 0, 1, dt, 0 , 0], | |
[0, 0, 0, 1, 0 ,0], | |
[0,0, 0, 0,1,dt], | |
[0, 0, 0, 0, 0, 1]]) # state transition matrix | |
q = Q_discrete_white_noise(dim=3, dt=dt, var = 0.05) # process uncertainty with variance 0.05 | |
KF.Q = block_diag(q,q) | |
KF.H = np.array([[0,1,0,0,0,0], | |
[0,0,0,1,0,0], | |
[0,0,0,0,0,1]]) # Measurement function defines how we go from state variables to measurement using the equation z = Hx | |
KF.R = np.array([[5, 0, 0], | |
[0, 5, 0], | |
[0, 0, 5]]) # state uncertainty | |
z = np.array([[float(t.spatialCoordinates.x/1000)], [float(t.spatialCoordinates.y/1000)], [float(t.spatialCoordinates.z/1000)]]) | |
KF.predict() # predict next state (prior) using the Kalman filter state propagation equations, given the previous measurements | |
KF.update(z) # add a new measurement (z) to the Kalman filter and the current state of the system is | |
# estimated given the measurement z | |
x = KF.x | |
P = KF.P | |
print(x) | |
print("Values") | |
print(values) | |
speed = math.sqrt(abs(x[1,0] * x[1,0]) + abs(x[3,0] * x[3,0]) + abs(x[5,0] * x[5,0])) | |
speed = "{:.1f}".format(speed) | |
print(speed) | |
#40 | |
#cv2.putText(self.frame, f"ID: {t.id}", (x1 + 10, y1 - 15), cv2.FONT_HERSHEY_SIMPLEX , 0.55, (0, 0, 255), 2 , cv2.LINE_AA) | |
cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (79, 79, 47), -1) | |
cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (250, 206, 135), 2) | |
cv2.putText(self.frame, f"ID: {t.id}", (x1 + 5, y1 - 25), cv2.FONT_HERSHEY_SIMPLEX , 0.75, (250, 206, 135), 2 , cv2.LINE_AA) | |
cv2.putText(self.frame, f"{speed} m/s" , (x1 + 5, y1 - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.75,(250, 206, 135), 2,cv2.LINE_AA) | |
#cv2.rectangle(self.frame, (x1 - 10, y1 - 70), (x2 + 20, y2 - 60), (0, 0, 0), -1) | |
x2p = x[0,0]# updated x spatial coordinate | |
x2v = x[1,0]# updated Vx | |
y2p = x[2,0]# updated y spatial coordinate | |
y2v = x[3,0]# updated Vy | |
z2p = x[4,0]# updated z spatial coordinate | |
z2v = x[5,0]# updated Vz | |
values = [] | |
#values.append(self.fps) | |
values.append(x2p) | |
values.append(x2v) | |
values.append(y2p) | |
values.append(y2v) | |
values.append(z2p) | |
values.append(z2v) | |
self.dictionary[int(t.id)] = values | |
del self.covarianceMatrix[int(t.id)] | |
self.covarianceMatrix[int(t.id)] = P | |
elif (int(t.id) not in self.dictionary and int(t.id) not in self.objIDs and t.status == dai.Tracklet.TrackingStatus.NEW):#(t.status == dai.Tracklet.TrackingStatus.TRACKED or t.status == dai.Tracklet.TrackingStatus.NEW)): | |
self.objIDs.append(int(t.id)) | |
KF = KalmanFilter(dim_x=6, dim_z=3) | |
#state vector has spatial coordinates (x,y,z) and their velocities, so the dimension is 6 (dim_x) | |
# measurement vector has spatial coordinates(x,y,z) that the camera reads (dim_z = 3) | |
KF.x = np.array([[0, 0, 0, 0, 0, 0]]).T # initial state vector (spatial coordinates and their velocities) | |
dt = 1.0 | |
KF.P = np.eye(6) * 1000. #covariance matrix 1000 | |
KF.F = np.array([[1, dt, 0,0, 0, 0], #state transition matrix | |
[0, 1, 0, 0, 0 ,0], | |
[0, 0, 1, dt, 0 , 0], | |
[0, 0, 0, 1, 0 ,0], | |
[0, 0, 0, 0, 1, dt], | |
[0, 0, 0, 0, 0, 1]]) | |
q = Q_discrete_white_noise(dim=3, dt=dt, var = 0.05) # process uncertainty/noise matrix with variance 0,05 | |
KF.Q = block_diag(q,q) | |
KF.H = np.array([[0,1,0,0,0,0], | |
[0,0,0,1,0,0], | |
[0,0,0,0,0,1]]) # Measurement function defines how we go from state variables to measurement using the equation z = Hx | |
KF.R = np.array([[5, 0, 0], #state uncertainty 5 | |
[0, 5, 0], | |
[0, 0, 5]]) | |
z = np.array([[float(t.spatialCoordinates.x/1000)], [float(t.spatialCoordinates.y/1000)], [float(t.spatialCoordinates.z/1000)]]) | |
KF.predict() # predict next state (prior) using the Kalman filter state propagation equations, given the previous measurements | |
KF.update(z) # add a new measurement (z) to the Kalman filter and the current state of the system is | |
# estimated given the measurement z | |
x = KF.x # updated x which will be used as a state vector in the next step | |
P = KF.P #updated covariance matix P | |
print(x) | |
print("SHAPE: ") | |
print(x.shape) | |
#Calculates speed using the equation V = sqrt(( Vx * Vx ) + (Vy * Vy) + (Vz + Vz)) | |
speed = math.sqrt(abs(x[1,0] * x[1,0]) + abs(x[3,0] * x[3,0]) + abs(x[5,0] * x[5,0])) | |
speed = "{:.1f}".format(speed) | |
print(speed) | |
cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (79, 79, 47), -1) | |
cv2.rectangle(self.frame, (x1 - 10, y1 - 90), (x2 - 30, y2 - 70), (250, 206, 135), 2) | |
cv2.putText(self.frame, f"ID: {t.id}", (x1 + 5, y1 - 25), cv2.FONT_HERSHEY_SIMPLEX , 0.75, (250, 206, 135), 2 , cv2.LINE_AA) | |
cv2.putText(self.frame, f"{speed} m/s" , (x1 + 5, y1 - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (250, 206, 135), 2 ,cv2.LINE_AA) | |
values = [] | |
#values.append(self.fps) | |
values.append(x[0, 0]) # x spatial coordinate | |
values.append(x[1, 0])# Vx | |
values.append(x[2, 0])# y spatial coordinate | |
values.append(x[3, 0])# Vy | |
values.append(x[4, 0])# z spatial coordinate | |
values.append(x[5, 0])# Vz | |
print(values) | |
self.dictionary[int(t.id)] = values | |
self.covarianceMatrix[int(t.id)] = P | |
elif(int(t.id) in self.dictionary and t.status == dai.Tracklet.TrackingStatus.REMOVED): | |
del self.dictionary[int(t.id)] | |
del self.covarianceMatrix[int(t.id)] | |
self.objIDs.remove(int(t.id)) | |
if self.frame is not None: | |
#cv2.putText(self.frame, "NN fps: {:.2f}".format(self.fps), (2, self.frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, self.color) | |
#cv2.putText(self.frame, f"Counter X: {self.car_counter[1]}, Counter Y: {self.car_counter[0]}", (3, 20), cv2.FONT_HERSHEY_TRIPLEX, 0.7, (255,255,0)) | |
cv2.imshow("tracker", self.frame) | |
with dai.Device(pipeline) as device: | |
replay.create_queues(device) | |
CarTracker = CarTracker() | |
depthQ = device.getOutputQueue(name="depth_out", maxSize=4, blocking=False) | |
detQ = device.getOutputQueue(name="det_out", maxSize=4, blocking=False) | |
tracklets = device.getOutputQueue(name="tracklets", maxSize=4, blocking=False) | |
preview = device.getOutputQueue("preview", maxSize=4, blocking=False) | |
disparityMultiplier = 255 / nodes.stereo.initialConfig.getMaxDisparity() | |
color = (255, 0, 0) | |
queue = [] | |
# Read rgb/mono frames, send them to device and wait for the spatial object detection results | |
while replay.send_frames(): | |
#rgbFrame = replay.lastFrame['color'] | |
depthFrame = depthQ.get().getFrame() | |
depthFrameColor = (depthFrame*disparityMultiplier).astype(np.uint8) | |
# depthFrameColor = cv2.normalize(depthFrame, None, 255, 0, cv2.NORM_INF, cv2.CV_8UC1) | |
# depthFrameColor = cv2.equalizeHist(depthFrameColor) | |
depthFrameColor = cv2.applyColorMap(depthFrameColor, cv2.COLORMAP_JET) | |
cv2.imshow("depth", depthFrameColor) | |
CarTracker.check_queues(preview,tracklets) | |
if cv2.waitKey(1) == ord('q'): | |
break | |
print('End of the recording') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment