Last active
December 12, 2023 16:54
-
-
Save venetanji/3ce49d6e6ccc151a7a813b96be1dd95d to your computer and use it in GitHub Desktop.
ndi asyncio opencv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import numpy as np | |
import cv2 as cv | |
import NDIlib as ndi | |
import re | |
import asyncio | |
from pythonosc import udp_client | |
from facenet_pytorch import MTCNN | |
from hsemotion.facial_emotions import HSEmotionRecognizer | |
import torch | |
import concurrent.futures | |
device = 'cuda:0' if torch.cuda.is_available() else 'cpu' | |
print('Running on device: {}'.format(device)) | |
MAX_NDI_SOURCE_NAME = 'max' | |
model_name = 'enet_b0_8_best_afew' | |
mtcnn = MTCNN(keep_all=False, post_process=False, min_face_size=40, device=device) | |
fer = HSEmotionRecognizer(model_name=model_name,device=device) | |
pool = concurrent.futures.ProcessPoolExecutor() | |
def detect_face(frame): | |
bounding_boxes, probs = mtcnn.detect(frame, landmarks=False) | |
if len(probs) > 0 and probs[0] is not None: | |
print(probs) | |
bounding_boxes = bounding_boxes[probs>0.9] | |
for bbox in bounding_boxes: | |
box = bbox.astype(int) | |
x1,y1,x2,y2 = box[0:4] | |
face_img = frame[y1:y2,x1:x2,:] | |
emotion, scores = fer.predict_emotions(face_img,logits=True) | |
print(emotion,scores) | |
return bounding_boxes | |
else: | |
return None | |
loop = asyncio.get_event_loop() | |
def main(): | |
if not ndi.initialize(): | |
return 0 | |
ndi_find = ndi.find_create_v2() | |
if ndi_find is None: | |
return 0 | |
sources = [] | |
source = False | |
source_regexp = re.compile('(.*) \((.*)\)') | |
while not source: | |
print('Looking for sources ...') | |
ndi.find_wait_for_sources(ndi_find, 5000) | |
sources = ndi.find_get_current_sources(ndi_find) | |
for s in sources: | |
# match hostname and source name in string format | |
# example: HOSTNAME (ndi_source_name) | |
source_host, source_name = source_regexp.match(s.ndi_name).groups() | |
print('Found source: ' + source_host + ' (' + source_name + ')') | |
if source_name == MAX_NDI_SOURCE_NAME: | |
source = s | |
ndi_recv_create = ndi.RecvCreateV3() | |
ndi_recv_create.color_format = ndi.RECV_COLOR_FORMAT_BGRX_BGRA | |
ndi_recv = ndi.recv_create_v3(ndi_recv_create) | |
if ndi_recv is None: | |
return 0 | |
ndi.recv_connect(ndi_recv, sources[0]) | |
ndi.find_destroy(ndi_find) | |
cv.startWindowThread() | |
global current_frame | |
current_frame = None | |
osc_client = udp_client.SimpleUDPClient("127.0.0.1", 3333) | |
async def predict_emotion(): | |
global current_frame | |
if current_frame is not None: | |
cv.imshow('current frame', current_frame) | |
# discard alpha channel | |
current_frame = current_frame[:, :, :3] | |
bboxes = await loop.run_in_executor(pool, detect_face, current_frame) | |
print(bboxes) | |
osc_client.send_message("/emotion", "happy") | |
loop.create_task(predict_emotion()) | |
async def recv_frame(): | |
global current_frame | |
t, v, _, _ = ndi.recv_capture_v2(ndi_recv, 5000) | |
if t == ndi.FRAME_TYPE_VIDEO: | |
frame = np.copy(v.data) | |
current_frame = np.copy(frame) | |
cv.imshow('ndi image', frame) | |
ndi.recv_free_video_v2(ndi_recv, v) | |
if cv.waitKey(1) & 0xff == 27: | |
ndi.recv_destroy(ndi_recv) | |
ndi.destroy() | |
cv.destroyAllWindows() | |
loop.stop() | |
return False | |
else: | |
loop.create_task(recv_frame()) | |
loop.create_task(recv_frame()) | |
loop.create_task(predict_emotion()) | |
loop.run_forever() | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment