Skip to content

Instantly share code, notes, and snippets.

@venetanji
Last active December 12, 2023 16:54
Show Gist options
  • Save venetanji/3ce49d6e6ccc151a7a813b96be1dd95d to your computer and use it in GitHub Desktop.
Save venetanji/3ce49d6e6ccc151a7a813b96be1dd95d to your computer and use it in GitHub Desktop.
ndi asyncio opencv
import sys
import numpy as np
import cv2 as cv
import NDIlib as ndi
import re
import asyncio
from pythonosc import udp_client
from facenet_pytorch import MTCNN
from hsemotion.facial_emotions import HSEmotionRecognizer
import torch
import concurrent.futures
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
print('Running on device: {}'.format(device))
MAX_NDI_SOURCE_NAME = 'max'
model_name = 'enet_b0_8_best_afew'
mtcnn = MTCNN(keep_all=False, post_process=False, min_face_size=40, device=device)
fer = HSEmotionRecognizer(model_name=model_name,device=device)
pool = concurrent.futures.ProcessPoolExecutor()
def detect_face(frame):
bounding_boxes, probs = mtcnn.detect(frame, landmarks=False)
if len(probs) > 0 and probs[0] is not None:
print(probs)
bounding_boxes = bounding_boxes[probs>0.9]
for bbox in bounding_boxes:
box = bbox.astype(int)
x1,y1,x2,y2 = box[0:4]
face_img = frame[y1:y2,x1:x2,:]
emotion, scores = fer.predict_emotions(face_img,logits=True)
print(emotion,scores)
return bounding_boxes
else:
return None
loop = asyncio.get_event_loop()
def main():
if not ndi.initialize():
return 0
ndi_find = ndi.find_create_v2()
if ndi_find is None:
return 0
sources = []
source = False
source_regexp = re.compile('(.*) \((.*)\)')
while not source:
print('Looking for sources ...')
ndi.find_wait_for_sources(ndi_find, 5000)
sources = ndi.find_get_current_sources(ndi_find)
for s in sources:
# match hostname and source name in string format
# example: HOSTNAME (ndi_source_name)
source_host, source_name = source_regexp.match(s.ndi_name).groups()
print('Found source: ' + source_host + ' (' + source_name + ')')
if source_name == MAX_NDI_SOURCE_NAME:
source = s
ndi_recv_create = ndi.RecvCreateV3()
ndi_recv_create.color_format = ndi.RECV_COLOR_FORMAT_BGRX_BGRA
ndi_recv = ndi.recv_create_v3(ndi_recv_create)
if ndi_recv is None:
return 0
ndi.recv_connect(ndi_recv, sources[0])
ndi.find_destroy(ndi_find)
cv.startWindowThread()
global current_frame
current_frame = None
osc_client = udp_client.SimpleUDPClient("127.0.0.1", 3333)
async def predict_emotion():
global current_frame
if current_frame is not None:
cv.imshow('current frame', current_frame)
# discard alpha channel
current_frame = current_frame[:, :, :3]
bboxes = await loop.run_in_executor(pool, detect_face, current_frame)
print(bboxes)
osc_client.send_message("/emotion", "happy")
loop.create_task(predict_emotion())
async def recv_frame():
global current_frame
t, v, _, _ = ndi.recv_capture_v2(ndi_recv, 5000)
if t == ndi.FRAME_TYPE_VIDEO:
frame = np.copy(v.data)
current_frame = np.copy(frame)
cv.imshow('ndi image', frame)
ndi.recv_free_video_v2(ndi_recv, v)
if cv.waitKey(1) & 0xff == 27:
ndi.recv_destroy(ndi_recv)
ndi.destroy()
cv.destroyAllWindows()
loop.stop()
return False
else:
loop.create_task(recv_frame())
loop.create_task(recv_frame())
loop.create_task(predict_emotion())
loop.run_forever()
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment