Created
June 6, 2022 11:17
-
-
Save Erol444/0094479718c62604aa6441f72885cfc5 to your computer and use it in GitHub Desktop.
DepthAI running age-gender demo from video
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from MultiMsgSync import TwoStageHostSeqSync | |
import blobconverter | |
import cv2 | |
import depthai as dai | |
import numpy as np | |
def frame_norm(frame, bbox): | |
normVals = np.full(len(bbox), frame.shape[0]) | |
normVals[::2] = frame.shape[1] | |
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int) | |
def create_pipeline(stereo): | |
pipeline = dai.Pipeline() | |
cam_xin = pipeline.create(dai.node.XLinkIn) | |
cam_xin.setStreamName("frame-in") | |
# No need to actually send back the frame | |
cam_xout = pipeline.create(dai.node.XLinkOut) | |
cam_xout.setStreamName("color") | |
cam_xin.out.link(cam_xout.input) | |
# ImageManip will resize the frame before sending it to the Face detection NN node | |
face_det_manip = pipeline.create(dai.node.ImageManip) | |
face_det_manip.initialConfig.setResize(300, 300) | |
face_det_manip.initialConfig.setFrameType(dai.RawImgFrame.Type.RGB888p) | |
cam_xin.out.link(face_det_manip.inputImage) | |
face_det_nn = pipeline.create(dai.node.MobileNetDetectionNetwork) | |
face_det_nn.setConfidenceThreshold(0.5) | |
face_det_nn.setBlobPath(blobconverter.from_zoo(name="face-detection-retail-0004", shaves=6)) | |
face_det_manip.out.link(face_det_nn.input) | |
# Send face detections to the host (for bounding boxes) | |
face_det_xout = pipeline.create(dai.node.XLinkOut) | |
face_det_xout.setStreamName("detection") | |
face_det_nn.out.link(face_det_xout.input) | |
# Script node will take the output from the face detection NN as an input and set ImageManipConfig | |
# to the 'image_manip_script' to crop the initial frame | |
image_manip_script = pipeline.create(dai.node.Script) | |
face_det_nn.out.link(image_manip_script.inputs['face_det_in']) | |
# Only send metadata, we are only interested in timestamp, so we can sync | |
# depth frames with NN output | |
face_det_nn.passthrough.link(image_manip_script.inputs['passthrough']) | |
image_manip_script.setScript(""" | |
l = [] # List of images | |
# So the correct frame will be the first in the list | |
# For this experiment this function is redundant, since everything | |
# runs in blocking mode, so no frames will get lost | |
def get_latest_frame(seq): | |
global l | |
for i, frame in enumerate(l): | |
if seq == frame.getSequenceNum(): | |
# node.warn(f"List len {len(l)} Frame with same seq num: {i},seq {seq}") | |
l = l[i:] | |
break | |
return l[0] | |
def correct_bb(bb): | |
if bb.xmin < 0: bb.xmin = 0.001 | |
if bb.ymin < 0: bb.ymin = 0.001 | |
if bb.xmax > 1: bb.xmax = 0.999 | |
if bb.ymax > 1: bb.ymax = 0.999 | |
return bb | |
while True: | |
preview = node.io['preview'].tryGet() | |
if preview is not None: | |
node.warn(f"New frame {preview.getSequenceNum()}, total {len(l)}") | |
l.append(preview) | |
face_dets = node.io['face_det_in'].tryGet() | |
# node.warn(f"Faces detected: {len(face_dets)}") | |
if face_dets is not None: | |
passthrough = node.io['passthrough'].get() | |
seq = passthrough.getSequenceNum() | |
# node.warn(f"New detection {seq}") | |
if len(l) == 0: | |
continue | |
img = get_latest_frame(seq) | |
for i, det in enumerate(face_dets.detections): | |
cfg = ImageManipConfig() | |
correct_bb(det) | |
cfg.setCropRect(det.xmin, det.ymin, det.xmax, det.ymax) | |
# node.warn(f"Sending {i + 1}. det. Seq {seq}. Det {det.xmin}, {det.ymin}, {det.xmax}, {det.ymax}") | |
cfg.setResize(62, 62) | |
cfg.setKeepAspectRatio(False) | |
node.io['manip_cfg'].send(cfg) | |
node.io['manip_img'].send(img) | |
""") | |
cam_xin.out.link(image_manip_script.inputs['preview']) | |
crop_manip = pipeline.create(dai.node.ImageManip) | |
crop_manip.initialConfig.setResize(62, 62) | |
crop_manip.setWaitForConfigInput(True) | |
image_manip_script.outputs['manip_cfg'].link(crop_manip.inputConfig) | |
image_manip_script.outputs['manip_img'].link(crop_manip.inputImage) | |
# Age/Gender second stange NN | |
print("Creating Age Gender Neural Network...") | |
recognition_nn = pipeline.create(dai.node.NeuralNetwork) | |
recognition_nn.setBlobPath(blobconverter.from_zoo(name="age-gender-recognition-retail-0013", shaves=6)) | |
crop_manip.out.link(recognition_nn.input) | |
recognition_nn_xout = pipeline.create(dai.node.XLinkOut) | |
recognition_nn_xout.setStreamName("recognition") | |
recognition_nn.out.link(recognition_nn_xout.input) | |
return pipeline | |
def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray: | |
return cv2.resize(arr, shape).transpose(2, 0, 1).flatten() | |
with dai.Device(create_pipeline(False)) as device: | |
sync = TwoStageHostSeqSync() | |
queues = {} | |
# Create output queues | |
for name in ["color", "detection", "recognition"]: | |
queues[name] = device.getOutputQueue(name) | |
xin = device.getInputQueue("frame-in") | |
cap = cv2.VideoCapture("demo.mp4") | |
cnt = 0 | |
while cap.isOpened(): | |
read_correctly, frame = cap.read() | |
if not read_correctly: | |
break | |
img = dai.ImgFrame() | |
print("Sending frame") | |
frame = cv2.resize(frame, (1080,1080)) | |
img.setData(to_planar(frame, (1080, 1080))) | |
img.setType(dai.ImgFrame.Type.BGR888p) | |
img.setWidth(1080) | |
img.setHeight(1080) | |
img.setSequenceNum(cnt) | |
print(img.getSequenceNum(), cnt) | |
xin.send(img) | |
cnt += 1 | |
for name, q in queues.items(): | |
# Add all msgs (object detections and recognitions) to the Sync class. | |
if name == "color": | |
print("wait for color frame") | |
sync.add_msg(q.get(), name) # block to get the frame | |
print("got color frame") | |
else: | |
if q.has(): | |
sync.add_msg(q.get(), name) | |
msgs = sync.get_msgs() | |
if msgs is not None: | |
print("Synced!") | |
frame = msgs["color"].getCvFrame() | |
detections = msgs["detection"].detections | |
recognitions = msgs["recognition"] | |
for i, detection in enumerate(detections): | |
bbox = frame_norm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax)) | |
rec = recognitions[i] | |
age = int(float(np.squeeze(np.array(rec.getLayerFp16('age_conv3')))) * 100) | |
gender = np.squeeze(np.array(rec.getLayerFp16('prob'))) | |
gender_str = "female" if gender[0] > gender[1] else "male" | |
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (10, 245, 10), 2) | |
y = (bbox[1] + bbox[3]) // 2 | |
cv2.putText(frame, str(age), (bbox[0], y), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (0, 0, 0), 8) | |
cv2.putText(frame, str(age), (bbox[0], y), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (255, 255, 255), 2) | |
cv2.putText(frame, gender_str, (bbox[0], y + 30), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (0, 0, 0), 8) | |
cv2.putText(frame, gender_str, (bbox[0], y + 30), cv2.FONT_HERSHEY_TRIPLEX, 1.5, (255, 255, 255), 2) | |
cv2.imshow("Camera", frame) | |
if cv2.waitKey(1) == ord('q'): | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment