Created
July 27, 2023 20:24
-
-
Save benongithub/2b48bd75d5d432adc48ec1854bddfd8f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from ximea import xiapi | |
import cv2 | |
import argparse | |
import collections | |
from functools import partial | |
import re | |
import time | |
import numpy as np | |
from PIL import Image | |
import svgwrite | |
from MouseController import MouseController | |
from pose_engine import PoseEngine | |
from pose_engine import KeypointType | |
from pycoral.adapters.common import input_size | |
from pycoral.adapters.detect import get_objects | |
from pycoral.utils.dataset import read_label_file | |
from pycoral.utils.edgetpu import make_interpreter | |
from pycoral.utils.edgetpu import run_inference | |
import time | |
EDGES = ( | |
(KeypointType.NOSE, KeypointType.LEFT_EYE), | |
(KeypointType.NOSE, KeypointType.RIGHT_EYE), | |
(KeypointType.NOSE, KeypointType.LEFT_EAR), | |
(KeypointType.NOSE, KeypointType.RIGHT_EAR), | |
(KeypointType.LEFT_EAR, KeypointType.LEFT_EYE), | |
(KeypointType.RIGHT_EAR, KeypointType.RIGHT_EYE), | |
(KeypointType.LEFT_EYE, KeypointType.RIGHT_EYE), | |
(KeypointType.LEFT_SHOULDER, KeypointType.RIGHT_SHOULDER), | |
(KeypointType.LEFT_SHOULDER, KeypointType.LEFT_ELBOW), | |
(KeypointType.LEFT_SHOULDER, KeypointType.LEFT_HIP), | |
(KeypointType.RIGHT_SHOULDER, KeypointType.RIGHT_ELBOW), | |
(KeypointType.RIGHT_SHOULDER, KeypointType.RIGHT_HIP), | |
(KeypointType.LEFT_ELBOW, KeypointType.LEFT_WRIST), | |
(KeypointType.RIGHT_ELBOW, KeypointType.RIGHT_WRIST), | |
(KeypointType.LEFT_HIP, KeypointType.RIGHT_HIP), | |
(KeypointType.LEFT_HIP, KeypointType.LEFT_KNEE), | |
(KeypointType.RIGHT_HIP, KeypointType.RIGHT_KNEE), | |
(KeypointType.LEFT_KNEE, KeypointType.LEFT_ANKLE), | |
(KeypointType.RIGHT_KNEE, KeypointType.RIGHT_ANKLE), | |
) | |
def avg_fps_counter(window_size): | |
window = collections.deque(maxlen=window_size) | |
prev = time.monotonic() | |
yield 0.0 # First fps value. | |
while True: | |
curr = time.monotonic() | |
window.append(curr - prev) | |
prev = curr | |
yield len(window) / sum(window) | |
def main(): | |
mc = MouseController() | |
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument('--mirror', help='flip video horizontally', action='store_true', default=True) | |
parser.add_argument('--model', help='.tflite model path.', required=False) | |
parser.add_argument('--res', help='Resolution', default='480x360', | |
choices=['480x360', '640x480', '1280x720']) | |
parser.add_argument('--videosrc', help='Which video source to use', default='/dev/video0') | |
parser.add_argument('--h264', help='Use video/x-h264 input', action='store_true') | |
parser.add_argument('--jpeg', help='Use image/jpeg input', action='store_true') | |
args = parser.parse_args() | |
default_model = 'models/mobilenet/posenet_mobilenet_v1_075_%d_%d_quant_decoder_edgetpu.tflite' | |
#if args.res == '480x360': | |
# src_size = (640, 480) | |
# appsink_size = (480, 360) | |
# model = args.model or default_model % (353, 481) | |
#elif args.res == '640x480': | |
# src_size = (640, 480) | |
# appsink_size = (640, 480) | |
# model = args.model or default_model % (481, 641) | |
#elif args.res == '1280x720': | |
# src_size = (1280, 720) | |
# appsink_size = (1280, 720) | |
# model = args.model or default_model % (721, 1281) | |
src_size = (640, 480) | |
appsink_size = (640, 480) | |
model = args.model or default_model % (481, 641) | |
print('Loading model: ', model) | |
engine = PoseEngine(model) | |
input_shape = engine.get_input_tensor_shape() | |
inference_size = (input_shape[2], input_shape[1]) | |
n = 0 | |
sum_process_time = 0 | |
sum_inference_time = 0 | |
ctr = 0 | |
fps_counter = avg_fps_counter(30) | |
#default_model_dir = '/home/bene/coral/examples-camera/all_models' | |
#default_model = 'mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite' | |
#default_labels = 'coco_labels.txt' | |
#threshold = 0.1 | |
#top_k = 1 | |
#model = os.path.join(default_model_dir,default_model) | |
#labels = os.path.join(default_model_dir, default_labels) | |
#print('Loading {} with {} labels.'.format(model, labels)) | |
#interpreter = make_interpreter(model) | |
#interpreter.allocate_tensors() | |
#labels = read_label_file(labels) | |
#inference_size = input_size(interpreter) | |
# create instance for first connected camera | |
cam = xiapi.Camera() | |
# start communication | |
print('Opening first camera...') | |
cam.open_device() | |
# settings | |
cam.set_exposure(80000) | |
cam.enable_auto_wb() | |
cam.set_imgdataformat("XI_RGB24") | |
cam.set_downsampling("XI_DWN_4x4") | |
cam.set_downsampling_type("XI_SKIPPING") | |
img = xiapi.Image() | |
# start data acquisition | |
print('Starting data acquisition...') | |
cam.start_acquisition() | |
try: | |
print('Starting video. Press CTRL+C to exit.') | |
while True: | |
# get data and pass them from camera to img | |
cam.get_image(img) | |
data = img.get_image_data_numpy() | |
cv2_im_rgb = cv2.resize(data, inference_size) | |
#run_inference(interpreter, cv2_im_rgb.tobytes()) | |
#objs = get_objects(interpreter, threshold)[:top_k] | |
#cv2_im = append_objs_to_img(data, inference_size, objs, labels) | |
engine.run_inference(cv2_im_rgb.tobytes()) | |
#nonlocal n, sum_process_time, sum_inference_time, fps_counter | |
#svg_canvas = svgwrite.Drawing('', size=src_size) | |
start_time = time.monotonic() | |
outputs, inference_time = engine.ParseOutput() | |
end_time = time.monotonic() | |
n += 1 | |
sum_process_time += 1000 * (end_time - start_time) | |
sum_inference_time += inference_time * 1000 | |
avg_inference_time = sum_inference_time / n | |
#text_line = 'PoseNet: %.1fms (%.2f fps) TrueFPS: %.2f Nposes %d' % ( | |
# avg_inference_time, 1000 / avg_inference_time, next(fps_counter), len(outputs) | |
#) | |
# print(text_line) | |
text_line = 'FPS: %.2f' % ( | |
next(fps_counter) | |
) | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
cv2.putText(data, text_line, (20, 30), font, 1, (255, 0, 0), 2, cv2.LINE_AA) | |
#shadow_text(svg_canvas, 10, 20, text_line) | |
threshold = 0.2 | |
for pose in outputs: | |
# draw_pose(svg_canvas, pose, src_size, inference_box) | |
box_x, box_y, box_w, box_h = (0,0,640,480) | |
scale_x, scale_y = src_size[0] / box_w, src_size[1] / box_h | |
xys = {} | |
for label, keypoint in pose.keypoints.items(): | |
if keypoint.score < threshold: continue | |
# Offset and scale to source coordinate space. | |
kp_x = int((keypoint.point[0] - box_x) * scale_x) | |
kp_y = int((keypoint.point[1] - box_y) * scale_y) | |
if label == KeypointType.LEFT_EYE: | |
# v2.arrowedLine(data, (int(box_w/2),int(box_h/2)), (int(box_w/2), kp_y), (0,255,0), 1) | |
mc.set_error(kp_x - int(box_w/2), kp_y - int(box_h/2)) | |
xys[label] = (kp_x, kp_y) | |
cv2.circle(data, (int(kp_x), int(kp_y)), 5, (0, 0, 200), -1) | |
#dwg.add(dwg.circle(center=(int(kp_x), int(kp_y)), r=5, | |
# fill='cyan', fill_opacity=keypoint.score, stroke=color)) | |
for a, b in EDGES: | |
if a not in xys or b not in xys: continue | |
ax, ay = xys[a] | |
bx, by = xys[b] | |
# dwg.add(dwg.line(start=(ax, ay), end=(bx, by), stroke=color, stroke_width=2)) | |
cv2.line(data, (ax, ay), (bx, by), (200, 0, 0), 5) | |
cv2.imshow('frame', data) | |
cv2.waitKey(1) | |
except KeyboardInterrupt: | |
cv2.destroyAllWindows() | |
# stop data acquisition | |
print('Stopping acquisition...') | |
cam.stop_acquisition() | |
# stop communication | |
cam.close_device() | |
print('Done.') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment