Skip to content

Instantly share code, notes, and snippets.

@tspannhw
Created August 16, 2024 15:09
Show Gist options
  • Select an option

  • Save tspannhw/8cb9379c90d702ddd53be07f908a215e to your computer and use it in GitHub Desktop.

Select an option

Save tspannhw/8cb9379c90d702ddd53be07f908a215e to your computer and use it in GitHub Desktop.
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
from datetime import datetime
import uuid
import glob
import torch
from torchvision import transforms
from PIL import Image
import timm
import requests
import json
import os
import boto3
from botocore.client import Config
from sklearn.preprocessing import normalize
from timm.data import resolve_data_config
from timm.data.transforms_factory import create_transform
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from pymilvus import connections
from pymilvus import utility
from pymilvus import FieldSchema, CollectionSchema, DataType, Collection
from pymilvus import MilvusClient
from pymilvus import MilvusClient
import hailo
from hailo_rpi_common import (
get_default_parser,
QUEUE,
get_caps_from_pad,
get_numpy_from_buffer,
GStreamerApp,
app_callback_class,
)
DIMENSION = 512
MILVUS_URL = "https://someid.serverless.gcp-us-west1.cloud.zilliz.com"
COLLECTION_NAME = "poseestimation"
TOKEN = os.environ["ZILLIZ_TOKEN"]
PATH = "/opt/demo/images"
time_list = [ 0, 5, 10, 20, 30, 40, 50, 59 ]
FREEIMAGE_KEY = os.environ["FREEIMAGE_KEY"]
FREEIMAGE_URL = url = 'https://freeimage.host/api/1/upload?key=' + str(FREEIMAGE_KEY) + '&action=upload'
S3_URL = 'http://123234234:9000'
milvus_client = MilvusClient( uri=MILVUS_URL, token=TOKEN )
slack_token = os.environ["SLACK_BOT_TOKEN"]
client = WebClient(token=slack_token)
fields = [
FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name='label', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='lefteye', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='righteye', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='nose', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='leftear', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='rightear', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='leftshoulder', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='rightshoulder', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='leftelbow', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='rightelbow', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='leftwrist', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='rightwrist', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='lefthip', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='righthip', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='leftknee', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='rightknee', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='leftankle', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='rightankle', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='confidence', dtype=DataType.FLOAT),
FieldSchema(name='width', dtype=DataType.VARCHAR, max_length=8),
FieldSchema(name='height', dtype=DataType.VARCHAR, max_length=8),
FieldSchema(name='size', dtype=DataType.VARCHAR, max_length=12),
FieldSchema(name='ogfilename', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='sizeformatted', dtype=DataType.VARCHAR, max_length=15),
FieldSchema(name='filename', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='url', dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name='mimetype', dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name='vector', dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)
]
schema = CollectionSchema(fields=fields)
if not milvus_client.has_collection(collection_name=COLLECTION_NAME):
milvus_client.create_collection(COLLECTION_NAME, DIMENSION, schema=schema, metric_type="COSINE", auto_id=True)
index_params = milvus_client.prepare_index_params()
index_params.add_index(field_name = "vector", metric_type="COSINE")
milvus_client.create_index(COLLECTION_NAME, index_params)
class FeatureExtractor:
def __init__(self, modelname):
self.model = timm.create_model(
modelname, pretrained=True, num_classes=0, global_pool="avg"
)
self.model.eval()
self.input_size = self.model.default_cfg["input_size"]
config = resolve_data_config({}, model=modelname)
self.preprocess = create_transform(**config)
def __call__(self, imagepath):
input_image = Image.open(imagepath).convert("RGB") # Convert to RGB if needed
input_image = self.preprocess(input_image)
input_tensor = input_image.unsqueeze(0)
with torch.no_grad():
output = self.model(input_tensor)
feature_vector = output.squeeze().numpy()
return normalize(feature_vector.reshape(1, -1), norm="l2").flatten()
extractor = FeatureExtractor("resnet34")
class user_app_callback_class(app_callback_class):
def __init__(self):
super().__init__()
def app_callback(pad, info, user_data):
lefteye = ""
righteye = ""
label = ""
nose = ""
left_ear = ""
right_ear = ""
left_shoulder = ""
right_shoulder = ""
left_elbow = ""
right_elbow = ""
left_wrist = ""
right_wrist = ""
left_hip = ""
right_hip = ""
left_knee = ""
right_knee = ""
left_ankle = ""
right_ankle = ""
file = ""
resp = ""
jsonresults = ""
pwidth = ""
pheight = ""
psize = ""
pogfilename = ""
psizeformatted = ""
pfilename = ""
purl = ""
pmime = ""
buffer = info.get_buffer()
if buffer is None:
return Gst.PadProbeReturn.OK
user_data.increment()
string_to_print = "" # f"Frame count: {user_data.get_count()}\n"
format, width, height = get_caps_from_pad(pad)
frame = None
if user_data.use_frame and format is not None and width is not None and height is not None:
frame = get_numpy_from_buffer(buffer, format, width, height)
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
for detection in detections:
label = detection.get_label()
bbox = detection.get_bbox()
confidence = detection.get_confidence()
if label == "person":
string_to_print += (f"Detection: {label} {confidence:.2f} ")
landmarks = detection.get_objects_typed(hailo.HAILO_LANDMARKS)
if len(landmarks) != 0:
points = landmarks[0].get_points()
left_eye = points[1] # assuming 1 is the index for the left eye
right_eye = points[2] # assuming 2 is the index for the right eye
try:
nose = points[0]
left_ear = points[3]
right_ear = points[4]
left_shoulder = points[5]
right_shoulder = points[6]
left_elbow = points[7]
right_elbow = points[8]
left_wrist = points[9]
right_wrist = points[10]
left_hip = points[11]
right_hip = points[12]
left_knee = points[13]
right_knee = points[14]
left_ankle = points[15]
right_ankle = points[16]
except Exception as e:
print("Body points issues error:", e)
left_eye_x = int((left_eye.x() * bbox.width() + bbox.xmin()) * width)
left_eye_y = int((left_eye.y() * bbox.height() + bbox.ymin()) * height)
right_eye_x = int((right_eye.x() * bbox.width() + bbox.xmin()) * width)
right_eye_y = int((right_eye.y() * bbox.height() + bbox.ymin()) * height)
string_to_print += (f" Left eye: x: {left_eye_x:.2f} y: {left_eye_y:.2f} Right eye: x: {right_eye_x:.2f} y: {right_eye_y:.2f}")
if user_data.use_frame:
cv2.circle(frame, (left_eye_x, left_eye_y), 5, (0, 255, 0), -1)
cv2.circle(frame, (right_eye_x, right_eye_y), 5, (0, 255, 0), -1)
if user_data.use_frame:
framesave = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
user_data.set_frame(framesave)
time_now = datetime.now()
current_time = int(time_now.strftime("%S"))
if current_time in time_list and len(label) > 4:
strfilename = PATH + "/personpose.jpg"
cv2.imwrite(strfilename, framesave)
lefteye = (f"x: {left_eye_x:.2f} y: {left_eye_y:.2f}")
righteye = (f"x: {right_eye_x:.2f} y: {right_eye_y:.2f}")
left_ear_x = int((left_ear.x() * bbox.width() + bbox.xmin()) * width)
left_ear_y = int((left_ear.y() * bbox.height() + bbox.ymin()) * height)
right_ear_x = int((right_ear.x() * bbox.width() + bbox.xmin()) * width)
right_ear_y = int((right_ear.y() * bbox.height() + bbox.ymin()) * height)
left_ear = (f"x: {left_ear_x:.2f} y: {left_ear_y:.2f}")
right_ear = (f"x: {right_ear_x:.2f} y: {right_ear_y:.2f}")
nose_x = int((nose.x() * bbox.width() + bbox.xmin()) * width)
nose_y = int((nose.y() * bbox.height() + bbox.ymin()) * height)
nose = (f"x: {nose_x:.2f} y: {nose_y:.2f}")
left_shoulder_x = int((left_shoulder.x() * bbox.width() + bbox.xmin()) * width)
left_shoulder_y = int((left_shoulder.y() * bbox.height() + bbox.ymin()) * height)
right_shoulder_x = int((right_shoulder.x() * bbox.width() + bbox.xmin()) * width)
right_shoulder_y = int((right_shoulder.y() * bbox.height() + bbox.ymin()) * height)
left_shoulder = (f"x: {left_shoulder_x:.2f} y: {left_shoulder_y:.2f}")
right_shoulder = (f"x: {right_shoulder_x:.2f} y: {right_shoulder_y:.2f}")
left_elbow_x = int((left_elbow.x() * bbox.width() + bbox.xmin()) * width)
left_elbow_y = int((left_elbow.y() * bbox.height() + bbox.ymin()) * height)
right_elbow_x = int((right_elbow.x() * bbox.width() + bbox.xmin()) * width)
right_elbow_y = int((right_elbow.y() * bbox.height() + bbox.ymin()) * height)
left_elbow = (f"x: {left_elbow_x:.2f} y: {left_elbow_y:.2f}")
right_elbow = (f"x: {right_elbow_x:.2f} y: {right_elbow_y:.2f}")
left_wrist_x = int((left_wrist.x() * bbox.width() + bbox.xmin()) * width)
left_wrist_y = int((left_wrist.y() * bbox.height() + bbox.ymin()) * height)
right_wrist_x = int((right_wrist.x() * bbox.width() + bbox.xmin()) * width)
right_wrist_y = int((right_wrist.y() * bbox.height() + bbox.ymin()) * height)
left_wrist = (f"x: {left_wrist_x:.2f} y: {left_wrist_y:.2f}")
right_wrist = (f"x: {right_wrist_x:.2f} y: {right_wrist_y:.2f}")
left_hip_x = int((left_hip.x() * bbox.width() + bbox.xmin()) * width)
left_hip_y = int((left_hip.y() * bbox.height() + bbox.ymin()) * height)
right_hip_x = int((right_hip.x() * bbox.width() + bbox.xmin()) * width)
right_hip_y = int((right_hip.y() * bbox.height() + bbox.ymin()) * height)
left_hip = (f"x: {left_hip_x:.2f} y: {left_hip_y:.2f}")
right_hip = (f"x: {right_hip_x:.2f} y: {right_hip_y:.2f}")
left_knee_x = int((left_knee.x() * bbox.width() + bbox.xmin()) * width)
left_knee_y = int((left_knee.y() * bbox.height() + bbox.ymin()) * height)
right_knee_x = int((right_knee.x() * bbox.width() + bbox.xmin()) * width)
right_knee_y = int((right_knee.y() * bbox.height() + bbox.ymin()) * height)
left_knee = (f"x: {left_knee_x:.2f} y: {left_knee_y:.2f}")
right_knee = (f"x: {right_knee_x:.2f} y: {right_knee_y:.2f}")
left_ankle_x = int((left_ankle.x() * bbox.width() + bbox.xmin()) * width)
left_ankle_y = int((left_ankle.y() * bbox.height() + bbox.ymin()) * height)
right_ankle_x = int((right_ankle.x() * bbox.width() + bbox.xmin()) * width)
right_ankle_y = int((right_ankle.y() * bbox.height() + bbox.ymin()) * height)
left_ankle = (f"x: {left_ankle_x:.2f} y: {left_ankle_y:.2f}")
right_ankle = (f"x: {right_ankle_x:.2f} y: {right_ankle_y:.2f}")
try:
response = client.chat_postMessage(
channel="C06NE1FU6SE",
text=(f"Detection: {label} {confidence:.2f}")
)
except SlackApiError as e:
assert e.response["error"]
try:
response = client.chat_postMessage(
channel="C06NE1FU6SE",
text=(f" Left eye: x: {left_eye_x:.2f} y: {left_eye_y:.2f} Right eye: x: {right_eye_x:.2f} y: {right_eye_y:.2f}\n")
)
except SlackApiError as e:
assert e.response["error"]
try:
file = {'source': open(strfilename, 'rb')}
resp = requests.post(url=FREEIMAGE_URL, files=file)
jsonresults = resp.json()
pwidth = str(jsonresults["image"]["width"])
pheight = str(jsonresults["image"]["height"])
psize = str(jsonresults["image"]["size"])
pogfilename = str(jsonresults["image"]["original_filename"])
psizeformatted = str(jsonresults["image"]["size_formatted"])
pfilename = str(jsonresults["image"]["filename"])
purl = str(jsonresults["image"]["url"])
pmime = str(jsonresults["image"]["image"]["mime"])
except Exception as e:
print("An error:", e)
try:
response = client.files_upload_v2(
channel="C06NE1FU6SE",
file=strfilename,
title=label,
initial_comment="Live Camera image ",
)
except SlackApiError as e:
assert e.response["error"]
try:
imageembedding = extractor(strfilename)
milvus_client.insert( COLLECTION_NAME, {"label": str(label),
"lefteye": str(lefteye), "righteye": str(righteye),
"nose": str(nose),"leftear": str(left_ear), "rightear": str(right_ear),
"leftshoulder": str(left_shoulder),"rightshoulder": str(right_shoulder),
"leftelbow": str(left_elbow), "rightelbow": str(right_elbow),
"leftwrist": str(left_wrist), "rightwrist": str(right_wrist), "lefthip": str(left_hip),
"righthip": str(right_hip), "leftknee": str(left_knee), "rightknee": str(right_knee),
"leftankle": str(left_ankle), "rightankle": str(right_ankle),
"confidence": float(confidence),"width": str(pwidth), "height": str(pheight),
"size": str(psize), "ogfilename": pogfilename,"sizeformatted": str(psizeformatted),
"filename": str(pfilename),
"url": str(purl), "mimetype": str(pmime),"vector": imageembedding})
except Exception as e:
print("An error:", e)
try:
randomfilename = '{0}{1}.jpg'.format(label,uuid.uuid4())
s3 = boto3.resource('s3',
endpoint_url=S3_URL,
aws_access_key_id='minioadmin',
aws_secret_access_key='minioadmin',
config=Config(signature_version='s3v4'))
s3.Bucket('images').upload_file(strfilename,randomfilename)
s3path = "images/" + randomfilename
except Exception as e:
print("An error:", e)
print(string_to_print)
return Gst.PadProbeReturn.OK
# ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment