Skip to content

Instantly share code, notes, and snippets.

@jaycosaur
Created October 29, 2019 09:04
Show Gist options
  • Save jaycosaur/333c01d6997c0c36d8e05b9cd96b678e to your computer and use it in GitHub Desktop.
Save jaycosaur/333c01d6997c0c36d8e05b9cd96b678e to your computer and use it in GitHub Desktop.
import cv2
import time
import numpy as np
from threading import Thread
from queue import Queue
log_const = 30
class FPS:
def __init__(self, length: int, title: str):
self.length = length
self.counter = 0
self.start = time.time_ns()
self.title = title
def tick(self):
self.counter += 1
if (self.counter % self.length == 0):
print(self.title, round(self.counter/round((time.time_ns() -
self.start)/1e9)), "FPS", round((time.time_ns() -
self.start)/(self.counter*1e3)), 'us')
def scale_frame(frame, wanted_frame_width: int):
'''Scales frame to wanted width if smaller than frame or returns original'''
raw_frame_height, raw_frame_width, _ = frame.shape
if wanted_frame_width >= raw_frame_width:
return frame
scale_factor = wanted_frame_width / raw_frame_width
width = int(raw_frame_width * scale_factor)
height = int(raw_frame_height * scale_factor)
dim = (width, height)
frame_scaled = cv2.resize(
frame, dim, interpolation=cv2.INTER_AREA)
return frame_scaled
def cpu_bound():
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
cpu_bound_ticker = FPS(5, 'CPU Bound')
def inner(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
face_cascade.detectMultiScale(gray, 1.01, 5)
cpu_bound_ticker.tick()
return inner
def camera_producer():
camera_ticker = FPS(log_const, 'Camera')
cam = cv2.VideoCapture(0)
while True:
ok, frame = cam.read()
if ok:
camera_ticker.tick()
yield frame
display_ticker = FPS(log_const, 'Display')
def display(image_frame):
cv2.imshow('display', scale_frame(image_frame, 200))
cv2.waitKey(1)
display_ticker.tick()
def display_thread(input_queue: Queue):
while True:
image = input_queue.get()
display(image)
def camera_thread(display_queue: Queue, cpu_bound_queue: Queue):
camera = camera_producer()
for frame in camera:
display_queue.put(frame)
cpu_bound_queue.put(frame, False)
def cpu_bound_thread(input_queue: Queue):
cpu_bound_op = cpu_bound()
while True:
frame = input_queue.get()
cpu_bound_op(frame)
def main():
display_input_queue = Queue()
cpu_bound_input_queue = Queue()
THREADS = [
Thread(target=camera_thread, args=(
display_input_queue, cpu_bound_input_queue)),
Thread(target=cpu_bound_thread, args=(
cpu_bound_input_queue,)),
]
for thread in THREADS:
thread.start()
display_thread(display_input_queue)
for thread in THREADS:
thread.join()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment