Created
March 8, 2025 09:11
-
-
Save jameselsey/25016415575675b89e11d6784e52dff6 to your computer and use it in GitHub Desktop.
YouTube - cvml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
gst-launch-1.0 shmsrc socket-path=/tmp/infered.feed do-timestamp=1 ! \ | |
video/x-raw,format=RGB,width=1920,height=1080,framerate=30/1 ! \ | |
identity sync=true ! \ | |
queue max-size-buffers=2 leaky=downstream ! \ | |
videoconvert ! \ | |
fpsdisplaysink video-sink=autovideosink text-overlay=true sync=false |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
gst-launch-1.0 shmsrc socket-path=/tmp/feed.raw do-timestamp=1 ! \ | |
video/x-raw,format=NV12,width=1920,height=1080,framerate=30/1 ! \ | |
identity sync=true ! \ | |
queue max-size-buffers=2 leaky=downstream ! \ | |
videoconvert ! \ | |
fpsdisplaysink video-sink=autovideosink text-overlay=true sync=false |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source setup_env.sh | |
python detection.py -i rpi -f --hef-path yolov8m.hef --labels-json yolov8.json |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gi | |
gi.require_version('Gst', '1.0') | |
from gi.repository import Gst, GLib | |
import os | |
import numpy as np | |
import cv2 | |
import hailo | |
import pprint | |
import supervision as sv | |
from hailo_rpi_common import ( | |
get_caps_from_pad, | |
get_numpy_from_buffer, | |
app_callback_class, | |
) | |
from detection_pipeline import GStreamerDetectionApp | |
import boto3 | |
import json | |
tracker = sv.ByteTrack() | |
label_annotator = sv.LabelAnnotator() | |
# Set to keep track of emitted tracking IDs | |
emitted_ids = set() | |
# Define the target classes for detection | |
# this should align to the COCO list of objects | |
# https://github.com/ultralytics/ultralytics/blob/main/ultralytics/cfg/datasets/coco.yaml | |
# note that this just filters which ones we'll call MQTT on, they'll still appear as bounding boxes because we haven't | |
# told hailo to exclude them. | |
target_classes = {'person', 'car', 'bird'} | |
# Setup the iot client for sending MQTT | |
accessKey = os.getenv("AWS_ACCESS_KEY") | |
secretKey = os.getenv("AWS_SECRET_ACCESS_KEY") | |
region = os.getenv("AWS_REGION") | |
topic = "detections" | |
client = boto3.client( | |
"iot-data", | |
region_name=region, | |
aws_access_key_id=accessKey, | |
aws_secret_access_key=secretKey | |
) | |
# ----------------------------------------------------------------------------------------------- | |
# User-defined class to be used in the callback function | |
# ----------------------------------------------------------------------------------------------- | |
# Inheritance from the app_callback_class | |
class user_app_callback_class(app_callback_class): | |
def __init__(self): | |
super().__init__() | |
self.new_variable = 42 # New variable example | |
def new_function(self): # New function example | |
return "The meaning of life is: " | |
# ----------------------------------------------------------------------------------------------- | |
# User-defined callback function | |
# ----------------------------------------------------------------------------------------------- | |
# This is the callback function that will be called when data is available from the pipeline | |
def app_callback(pad, info, user_data): | |
# Get the GstBuffer from the probe info | |
buffer = info.get_buffer() | |
if buffer is None: | |
return Gst.PadProbeReturn.OK | |
# Increment frame count | |
user_data.increment() | |
# Get the caps from the pad | |
format, width, height = get_caps_from_pad(pad) | |
# Retrieve the video frame if required | |
frame = None | |
if user_data.use_frame and format and width and height: | |
frame = get_numpy_from_buffer(buffer, format, width, height) | |
# Extract detections from the buffer | |
roi = hailo.get_roi_from_buffer(buffer) | |
hailo_detections = roi.get_objects_typed(hailo.HAILO_DETECTION) | |
# Filter detections to include only target classes | |
filtered_detections = [ | |
detection for detection in hailo_detections | |
if detection.get_label() in target_classes | |
] | |
# Prepare detection data for Supervision | |
boxes = [] | |
confidences = [] | |
class_ids = [] | |
for detection in filtered_detections: | |
tracking_id = detection.get_objects_typed(hailo.HAILO_UNIQUE_ID)[0].get_id() | |
# Emit event only if the tracking ID hasn't been emitted before | |
if tracking_id not in emitted_ids: | |
print(f"Detection!: {tracking_id} {detection.get_label()} {detection.get_confidence():.2f}\n") | |
emitted_ids.add(tracking_id) | |
payload = { | |
"tracking_id" : tracking_id, | |
"label": detection.get_label() | |
} | |
response = client.publish(topic=topic, qos=1, payload=json.dumps(payload)) | |
print(f"Message published: {response}") | |
label = detection.get_label() | |
bbox = detection.get_bbox() | |
confidence = detection.get_confidence() | |
boxes.append([bbox.xmin() * width, bbox.ymin() * height, bbox.xmax() * width, bbox.ymax() * height]) | |
confidences.append(confidence) | |
class_ids.append(label) # Ensure label is an integer class ID | |
return Gst.PadProbeReturn.OK | |
if __name__ == "__main__": | |
# Create an instance of the user app callback class | |
user_data = user_app_callback_class() | |
app = GStreamerDetectionApp(app_callback, user_data) | |
app.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gi | |
gi.require_version('Gst', '1.0') | |
from gi.repository import Gst, GLib | |
import os | |
import argparse | |
import multiprocessing | |
import numpy as np | |
import setproctitle | |
import cv2 | |
import time | |
import hailo | |
from hailo_rpi_common import ( | |
get_default_parser, | |
QUEUE, | |
SOURCE_PIPELINE, | |
INFERENCE_PIPELINE, | |
INFERENCE_PIPELINE_WRAPPER, | |
USER_CALLBACK_PIPELINE, | |
DISPLAY_PIPELINE, | |
GStreamerApp, | |
app_callback_class, | |
dummy_callback, | |
detect_hailo_arch, | |
) | |
# ----------------------------------------------------------------------------------------------- | |
# User Gstreamer Application | |
# ----------------------------------------------------------------------------------------------- | |
# This class inherits from the hailo_rpi_common.GStreamerApp class | |
class GStreamerDetectionApp(GStreamerApp): | |
def __init__(self, app_callback, user_data): | |
parser = get_default_parser() | |
parser.add_argument( | |
"--labels-json", | |
default=None, | |
help="Path to costume labels JSON file", | |
) | |
args = parser.parse_args() | |
# Call the parent class constructor | |
super().__init__(args, user_data) | |
# Additional initialization code can be added here | |
# Set Hailo parameters these parameters should be set based on the model used | |
self.batch_size = 2 | |
self.network_width = 640 | |
self.network_height = 640 | |
self.network_format = "RGB" | |
nms_score_threshold = 0.3 | |
nms_iou_threshold = 0.45 | |
# Determine the architecture if not specified | |
if args.arch is None: | |
detected_arch = detect_hailo_arch() | |
if detected_arch is None: | |
raise ValueError("Could not auto-detect Hailo architecture. Please specify --arch manually.") | |
self.arch = detected_arch | |
print(f"Auto-detected Hailo architecture: {self.arch}") | |
else: | |
self.arch = args.arch | |
if args.hef_path is not None: | |
self.hef_path = args.hef_path | |
# Set the HEF file path based on the arch | |
elif self.arch == "hailo8": | |
self.hef_path = os.path.join(self.current_path, '../resources/yolov8m.hef') | |
else: # hailo8l | |
self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef') | |
# Set the post-processing shared object file | |
self.post_process_so = os.path.join(self.current_path, 'libyolo_hailortpp_postprocess.so') | |
self.cropper_process_so = os.path.join(self.current_path, 'libwhole_buffer.so') | |
# User-defined label JSON file | |
self.labels_json = args.labels_json | |
self.app_callback = app_callback | |
self.thresholds_str = ( | |
f"nms-score-threshold={nms_score_threshold} " | |
f"nms-iou-threshold={nms_iou_threshold} " | |
f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32" | |
) | |
# Set the process title | |
setproctitle.setproctitle("Hailo Detection App") | |
self.create_pipeline() | |
def get_pipeline_string(self): | |
pipeline = ( | |
"shmsrc socket-path=/tmp/feed.raw do-timestamp=true is-live=true ! " | |
+ "video/x-raw, format=NV12, width=1920, height=1080, framerate=30/1 ! " | |
+ "videoconvert ! " | |
+ "video/x-raw, format=RGB, width=1920, height=1080, framerate=30/1 ! " | |
+ f"hailocropper so-path={self.cropper_process_so} function-name=create_crops use-letterbox=true resize-method=inter-area internal-offset=true name=cropper1 " | |
+ "hailoaggregator name=agg1 " | |
+ "cropper1. ! queue name=bypess1_q leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! agg1. " | |
+ "cropper1. ! " | |
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! " | |
+ "videoscale qos=false n-threads=2 ! " | |
+ "video/x-raw, pixel-aspect-ratio=1/1 ! " | |
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! " | |
+ "hailonet hef-path=yolov8m.hef batch-size=1 ! " | |
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! " | |
+ f"hailofilter so-path={self.post_process_so} qos=false ! " | |
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! agg1. " | |
+ "agg1. ! hailotracker name=hailo_tracker keep-tracked-frames=3 keep-new-frames=3 keep-lost-frames=3 ! " | |
+ "queue name=queue_user_callback leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! " | |
+ "identity name=identity_callback signal-handoffs=true ! " | |
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! " | |
+ "hailooverlay qos=false line-thickness=4 font-thickness=4 ! " | |
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! " | |
+ "videoconvert n-threads=2 qos=false ! " | |
# fps displaysink | |
#+ "fpsdisplaysink video-sink=xvimagesink name=hailo_display sync=false" | |
# OR try the shared memory sink | |
+ "shmsink socket-path=/tmp/infered.feed sync=false wait-for-connection=false" | |
) | |
print(pipeline) | |
return pipeline | |
if __name__ == "__main__": | |
# Create an instance of the user app callback class | |
user_data = app_callback_class() | |
app_callback = dummy_callback | |
app = GStreamerDetectionApp(app_callback, user_data) | |
app.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
gst-launch-1.0 libcamerasrc ! \ | |
video/x-raw,width=1920,height=1080,framerate=30/1,format=NV12 ! \ | |
queue max-size-buffers=2 leaky=downstream ! \ | |
videorate ! \ | |
video/x-raw,framerate=30/1 ! \ | |
videoconvert ! \ | |
shmsink socket-path=/tmp/feed.raw sync=false wait-for-connection=false |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# TAPPAS CORE Definitions | |
CORE_VENV_NAME="venv_hailo_rpi5_examples" | |
CORE_REQUIRED_VERSION=("3.29.1" "3.30.0") | |
# TAPPAS Definitions | |
TAPPAS_VENV_NAME="hailo_tappas_venv" | |
TAPPAS_REQUIRED_VERSION=("3.29.0" "3.29.1" "3.30.0") | |
# Function to check if the script is being sourced | |
is_sourced() { | |
if [ -n "$ZSH_VERSION" ]; then | |
[[ -o sourced ]] | |
elif [ -n "$BASH_VERSION" ]; then | |
[[ "${BASH_SOURCE[0]}" != "$0" ]] | |
else | |
echo "Unsupported shell. Please use bash or zsh." | |
return 1 | |
fi | |
} | |
# Only proceed if the script is being sourced | |
if is_sourced; then | |
echo "Setting up the environment..." | |
# Check if we are working with hailo_tappas or hailo-tappas-core | |
if pkg-config --exists hailo_tappas; then | |
TAPPAS_CORE=0 | |
REQUIRED_VERSION=("${TAPPAS_REQUIRED_VERSION[@]}") | |
echo "Setting up the environment for hailo_tappas..." | |
TAPPAS_VERSION=$(pkg-config --modversion hailo_tappas) | |
TAPPAS_WORKSPACE=$(pkg-config --variable=tappas_workspace hailo_tappas) | |
export TAPPAS_WORKSPACE | |
echo "TAPPAS_WORKSPACE set to $TAPPAS_WORKSPACE" | |
if [[ "$TAPPAS_WORKSPACE" == "/local/workspace/tappas" ]]; then | |
VENV_NAME="DOCKER" | |
else | |
VENV_NAME=$TAPPAS_VENV_NAME | |
fi | |
else | |
TAPPAS_CORE=1 | |
VENV_NAME=$CORE_VENV_NAME | |
REQUIRED_VERSION=("${CORE_REQUIRED_VERSION[@]}") | |
echo "Setting up the environment for hailo-tappas-core..." | |
TAPPAS_VERSION=$(pkg-config --modversion hailo-tappas-core) | |
fi | |
# Check if TAPPAS_VERSION is in REQUIRED_VERSION | |
version_match=0 | |
for version in "${REQUIRED_VERSION[@]}"; do | |
if [ "$TAPPAS_VERSION" = "$version" ]; then | |
version_match=1 | |
break | |
fi | |
done | |
if [ "$version_match" -eq 1 ]; then | |
echo "TAPPAS_VERSION is ${TAPPAS_VERSION}. Proceeding..." | |
else | |
echo "TAPPAS_VERSION is ${TAPPAS_VERSION} not in the list of required versions ${REQUIRED_VERSION[*]}." | |
return 1 | |
fi | |
if [ $TAPPAS_CORE -eq 1 ]; then | |
# Get the directory of the current script | |
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-${(%):-%N}}")" &> /dev/null && pwd)" | |
# Check if we are in the defined virtual environment | |
if [[ "$VIRTUAL_ENV" == *"$VENV_NAME"* ]]; then | |
echo "You are in the $VENV_NAME virtual environment." | |
else | |
echo "You are not in the $VENV_NAME virtual environment." | |
# Check if the virtual environment exists in the same directory as the script | |
if [ -d "$SCRIPT_DIR/$VENV_NAME" ]; then | |
echo "Virtual environment exists. Activating..." | |
source "$SCRIPT_DIR/$VENV_NAME/bin/activate" | |
else | |
echo "Virtual environment does not exist. Creating and activating..." | |
python3 -m venv --system-site-packages "$SCRIPT_DIR/$VENV_NAME" | |
source "$SCRIPT_DIR/$VENV_NAME/bin/activate" | |
fi | |
fi | |
TAPPAS_POST_PROC_DIR=$(pkg-config --variable=tappas_postproc_lib_dir hailo-tappas-core) | |
else | |
if [[ "$VENV_NAME" == "DOCKER" ]]; then | |
echo "Running in DOCKER using default virtualenv" | |
else | |
# Check if we are in the defined virtual environment | |
if [[ "$VIRTUAL_ENV" == *"$VENV_NAME"* ]]; then | |
echo "You are in the $VENV_NAME virtual environment." | |
else | |
echo "You are not in the $VENV_NAME virtual environment." | |
# Activate TAPPAS virtual environment | |
VENV_PATH="${TAPPAS_WORKSPACE}/hailo_tappas_venv/bin/activate" | |
if [ -f "$VENV_PATH" ]; then | |
echo "Activating virtual environment..." | |
source "$VENV_PATH" | |
else | |
echo "Error: Virtual environment not found at $VENV_PATH." | |
return 1 | |
fi | |
fi | |
fi | |
TAPPAS_POST_PROC_DIR="${TAPPAS_WORKSPACE}/apps/h8/gstreamer/libs/post_processes/" | |
fi | |
export TAPPAS_POST_PROC_DIR | |
echo "TAPPAS_POST_PROC_DIR set to $TAPPAS_POST_PROC_DIR" | |
# Get the Device Architecture | |
output=$(hailortcli fw-control identify | tr -d '\0') | |
# Extract the Device Architecture from the output | |
device_arch=$(echo "$output" | grep "Device Architecture" | awk -F": " '{print $2}') | |
# if the device architecture is not found, output the error message and return | |
if [ -z "$device_arch" ]; then | |
echo "Error: Device Architecture not found. Please check the connection to the device." | |
return 1 | |
fi | |
# Export the Device Architecture to an environment variable | |
export DEVICE_ARCHITECTURE="$device_arch" | |
# Print the environment variable to verify | |
echo "DEVICE_ARCHITECTURE is set to: $DEVICE_ARCHITECTURE" | |
else | |
echo "This script needs to be sourced to correctly set up the environment. Please run '. $(basename "$0")' instead of executing it." | |
fi |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment