Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save benongithub/2b48bd75d5d432adc48ec1854bddfd8f to your computer and use it in GitHub Desktop.
Save benongithub/2b48bd75d5d432adc48ec1854bddfd8f to your computer and use it in GitHub Desktop.
import os
from ximea import xiapi
import cv2
import argparse
import collections
from functools import partial
import re
import time
import numpy as np
from PIL import Image
import svgwrite
from MouseController import MouseController
from pose_engine import PoseEngine
from pose_engine import KeypointType
from pycoral.adapters.common import input_size
from pycoral.adapters.detect import get_objects
from pycoral.utils.dataset import read_label_file
from pycoral.utils.edgetpu import make_interpreter
from pycoral.utils.edgetpu import run_inference
import time
EDGES = (
(KeypointType.NOSE, KeypointType.LEFT_EYE),
(KeypointType.NOSE, KeypointType.RIGHT_EYE),
(KeypointType.NOSE, KeypointType.LEFT_EAR),
(KeypointType.NOSE, KeypointType.RIGHT_EAR),
(KeypointType.LEFT_EAR, KeypointType.LEFT_EYE),
(KeypointType.RIGHT_EAR, KeypointType.RIGHT_EYE),
(KeypointType.LEFT_EYE, KeypointType.RIGHT_EYE),
(KeypointType.LEFT_SHOULDER, KeypointType.RIGHT_SHOULDER),
(KeypointType.LEFT_SHOULDER, KeypointType.LEFT_ELBOW),
(KeypointType.LEFT_SHOULDER, KeypointType.LEFT_HIP),
(KeypointType.RIGHT_SHOULDER, KeypointType.RIGHT_ELBOW),
(KeypointType.RIGHT_SHOULDER, KeypointType.RIGHT_HIP),
(KeypointType.LEFT_ELBOW, KeypointType.LEFT_WRIST),
(KeypointType.RIGHT_ELBOW, KeypointType.RIGHT_WRIST),
(KeypointType.LEFT_HIP, KeypointType.RIGHT_HIP),
(KeypointType.LEFT_HIP, KeypointType.LEFT_KNEE),
(KeypointType.RIGHT_HIP, KeypointType.RIGHT_KNEE),
(KeypointType.LEFT_KNEE, KeypointType.LEFT_ANKLE),
(KeypointType.RIGHT_KNEE, KeypointType.RIGHT_ANKLE),
)
def avg_fps_counter(window_size):
window = collections.deque(maxlen=window_size)
prev = time.monotonic()
yield 0.0 # First fps value.
while True:
curr = time.monotonic()
window.append(curr - prev)
prev = curr
yield len(window) / sum(window)
def main():
mc = MouseController()
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--mirror', help='flip video horizontally', action='store_true', default=True)
parser.add_argument('--model', help='.tflite model path.', required=False)
parser.add_argument('--res', help='Resolution', default='480x360',
choices=['480x360', '640x480', '1280x720'])
parser.add_argument('--videosrc', help='Which video source to use', default='/dev/video0')
parser.add_argument('--h264', help='Use video/x-h264 input', action='store_true')
parser.add_argument('--jpeg', help='Use image/jpeg input', action='store_true')
args = parser.parse_args()
default_model = 'models/mobilenet/posenet_mobilenet_v1_075_%d_%d_quant_decoder_edgetpu.tflite'
#if args.res == '480x360':
# src_size = (640, 480)
# appsink_size = (480, 360)
# model = args.model or default_model % (353, 481)
#elif args.res == '640x480':
# src_size = (640, 480)
# appsink_size = (640, 480)
# model = args.model or default_model % (481, 641)
#elif args.res == '1280x720':
# src_size = (1280, 720)
# appsink_size = (1280, 720)
# model = args.model or default_model % (721, 1281)
src_size = (640, 480)
appsink_size = (640, 480)
model = args.model or default_model % (481, 641)
print('Loading model: ', model)
engine = PoseEngine(model)
input_shape = engine.get_input_tensor_shape()
inference_size = (input_shape[2], input_shape[1])
n = 0
sum_process_time = 0
sum_inference_time = 0
ctr = 0
fps_counter = avg_fps_counter(30)
#default_model_dir = '/home/bene/coral/examples-camera/all_models'
#default_model = 'mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite'
#default_labels = 'coco_labels.txt'
#threshold = 0.1
#top_k = 1
#model = os.path.join(default_model_dir,default_model)
#labels = os.path.join(default_model_dir, default_labels)
#print('Loading {} with {} labels.'.format(model, labels))
#interpreter = make_interpreter(model)
#interpreter.allocate_tensors()
#labels = read_label_file(labels)
#inference_size = input_size(interpreter)
# create instance for first connected camera
cam = xiapi.Camera()
# start communication
print('Opening first camera...')
cam.open_device()
# settings
cam.set_exposure(80000)
cam.enable_auto_wb()
cam.set_imgdataformat("XI_RGB24")
cam.set_downsampling("XI_DWN_4x4")
cam.set_downsampling_type("XI_SKIPPING")
img = xiapi.Image()
# start data acquisition
print('Starting data acquisition...')
cam.start_acquisition()
try:
print('Starting video. Press CTRL+C to exit.')
while True:
# get data and pass them from camera to img
cam.get_image(img)
data = img.get_image_data_numpy()
cv2_im_rgb = cv2.resize(data, inference_size)
#run_inference(interpreter, cv2_im_rgb.tobytes())
#objs = get_objects(interpreter, threshold)[:top_k]
#cv2_im = append_objs_to_img(data, inference_size, objs, labels)
engine.run_inference(cv2_im_rgb.tobytes())
#nonlocal n, sum_process_time, sum_inference_time, fps_counter
#svg_canvas = svgwrite.Drawing('', size=src_size)
start_time = time.monotonic()
outputs, inference_time = engine.ParseOutput()
end_time = time.monotonic()
n += 1
sum_process_time += 1000 * (end_time - start_time)
sum_inference_time += inference_time * 1000
avg_inference_time = sum_inference_time / n
#text_line = 'PoseNet: %.1fms (%.2f fps) TrueFPS: %.2f Nposes %d' % (
# avg_inference_time, 1000 / avg_inference_time, next(fps_counter), len(outputs)
#)
# print(text_line)
text_line = 'FPS: %.2f' % (
next(fps_counter)
)
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(data, text_line, (20, 30), font, 1, (255, 0, 0), 2, cv2.LINE_AA)
#shadow_text(svg_canvas, 10, 20, text_line)
threshold = 0.2
for pose in outputs:
# draw_pose(svg_canvas, pose, src_size, inference_box)
box_x, box_y, box_w, box_h = (0,0,640,480)
scale_x, scale_y = src_size[0] / box_w, src_size[1] / box_h
xys = {}
for label, keypoint in pose.keypoints.items():
if keypoint.score < threshold: continue
# Offset and scale to source coordinate space.
kp_x = int((keypoint.point[0] - box_x) * scale_x)
kp_y = int((keypoint.point[1] - box_y) * scale_y)
if label == KeypointType.LEFT_EYE:
# v2.arrowedLine(data, (int(box_w/2),int(box_h/2)), (int(box_w/2), kp_y), (0,255,0), 1)
mc.set_error(kp_x - int(box_w/2), kp_y - int(box_h/2))
xys[label] = (kp_x, kp_y)
cv2.circle(data, (int(kp_x), int(kp_y)), 5, (0, 0, 200), -1)
#dwg.add(dwg.circle(center=(int(kp_x), int(kp_y)), r=5,
# fill='cyan', fill_opacity=keypoint.score, stroke=color))
for a, b in EDGES:
if a not in xys or b not in xys: continue
ax, ay = xys[a]
bx, by = xys[b]
# dwg.add(dwg.line(start=(ax, ay), end=(bx, by), stroke=color, stroke_width=2))
cv2.line(data, (ax, ay), (bx, by), (200, 0, 0), 5)
cv2.imshow('frame', data)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
# stop data acquisition
print('Stopping acquisition...')
cam.stop_acquisition()
# stop communication
cam.close_device()
print('Done.')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment