Skip to content

Instantly share code, notes, and snippets.

@jameselsey
Created March 8, 2025 09:11
Show Gist options
  • Save jameselsey/25016415575675b89e11d6784e52dff6 to your computer and use it in GitHub Desktop.
Save jameselsey/25016415575675b89e11d6784e52dff6 to your computer and use it in GitHub Desktop.
YouTube - cvml
gst-launch-1.0 shmsrc socket-path=/tmp/infered.feed do-timestamp=1 ! \
video/x-raw,format=RGB,width=1920,height=1080,framerate=30/1 ! \
identity sync=true ! \
queue max-size-buffers=2 leaky=downstream ! \
videoconvert ! \
fpsdisplaysink video-sink=autovideosink text-overlay=true sync=false
gst-launch-1.0 shmsrc socket-path=/tmp/feed.raw do-timestamp=1 ! \
video/x-raw,format=NV12,width=1920,height=1080,framerate=30/1 ! \
identity sync=true ! \
queue max-size-buffers=2 leaky=downstream ! \
videoconvert ! \
fpsdisplaysink video-sink=autovideosink text-overlay=true sync=false
source setup_env.sh
python detection.py -i rpi -f --hef-path yolov8m.hef --labels-json yolov8.json
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import pprint
import supervision as sv
from hailo_rpi_common import (
get_caps_from_pad,
get_numpy_from_buffer,
app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp
import boto3
import json
tracker = sv.ByteTrack()
label_annotator = sv.LabelAnnotator()
# Set to keep track of emitted tracking IDs
emitted_ids = set()
# Define the target classes for detection
# this should align to the COCO list of objects
# https://github.com/ultralytics/ultralytics/blob/main/ultralytics/cfg/datasets/coco.yaml
# note that this just filters which ones we'll call MQTT on, they'll still appear as bounding boxes because we haven't
# told hailo to exclude them.
target_classes = {'person', 'car', 'bird'}
# Setup the iot client for sending MQTT
accessKey = os.getenv("AWS_ACCESS_KEY")
secretKey = os.getenv("AWS_SECRET_ACCESS_KEY")
region = os.getenv("AWS_REGION")
topic = "detections"
client = boto3.client(
"iot-data",
region_name=region,
aws_access_key_id=accessKey,
aws_secret_access_key=secretKey
)
# -----------------------------------------------------------------------------------------------
# User-defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
# Inheritance from the app_callback_class
class user_app_callback_class(app_callback_class):
def __init__(self):
super().__init__()
self.new_variable = 42 # New variable example
def new_function(self): # New function example
return "The meaning of life is: "
# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------
# This is the callback function that will be called when data is available from the pipeline
def app_callback(pad, info, user_data):
# Get the GstBuffer from the probe info
buffer = info.get_buffer()
if buffer is None:
return Gst.PadProbeReturn.OK
# Increment frame count
user_data.increment()
# Get the caps from the pad
format, width, height = get_caps_from_pad(pad)
# Retrieve the video frame if required
frame = None
if user_data.use_frame and format and width and height:
frame = get_numpy_from_buffer(buffer, format, width, height)
# Extract detections from the buffer
roi = hailo.get_roi_from_buffer(buffer)
hailo_detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
# Filter detections to include only target classes
filtered_detections = [
detection for detection in hailo_detections
if detection.get_label() in target_classes
]
# Prepare detection data for Supervision
boxes = []
confidences = []
class_ids = []
for detection in filtered_detections:
tracking_id = detection.get_objects_typed(hailo.HAILO_UNIQUE_ID)[0].get_id()
# Emit event only if the tracking ID hasn't been emitted before
if tracking_id not in emitted_ids:
print(f"Detection!: {tracking_id} {detection.get_label()} {detection.get_confidence():.2f}\n")
emitted_ids.add(tracking_id)
payload = {
"tracking_id" : tracking_id,
"label": detection.get_label()
}
response = client.publish(topic=topic, qos=1, payload=json.dumps(payload))
print(f"Message published: {response}")
label = detection.get_label()
bbox = detection.get_bbox()
confidence = detection.get_confidence()
boxes.append([bbox.xmin() * width, bbox.ymin() * height, bbox.xmax() * width, bbox.ymax() * height])
confidences.append(confidence)
class_ids.append(label) # Ensure label is an integer class ID
return Gst.PadProbeReturn.OK
if __name__ == "__main__":
# Create an instance of the user app callback class
user_data = user_app_callback_class()
app = GStreamerDetectionApp(app_callback, user_data)
app.run()
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import hailo
from hailo_rpi_common import (
get_default_parser,
QUEUE,
SOURCE_PIPELINE,
INFERENCE_PIPELINE,
INFERENCE_PIPELINE_WRAPPER,
USER_CALLBACK_PIPELINE,
DISPLAY_PIPELINE,
GStreamerApp,
app_callback_class,
dummy_callback,
detect_hailo_arch,
)
# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
# This class inherits from the hailo_rpi_common.GStreamerApp class
class GStreamerDetectionApp(GStreamerApp):
def __init__(self, app_callback, user_data):
parser = get_default_parser()
parser.add_argument(
"--labels-json",
default=None,
help="Path to costume labels JSON file",
)
args = parser.parse_args()
# Call the parent class constructor
super().__init__(args, user_data)
# Additional initialization code can be added here
# Set Hailo parameters these parameters should be set based on the model used
self.batch_size = 2
self.network_width = 640
self.network_height = 640
self.network_format = "RGB"
nms_score_threshold = 0.3
nms_iou_threshold = 0.45
# Determine the architecture if not specified
if args.arch is None:
detected_arch = detect_hailo_arch()
if detected_arch is None:
raise ValueError("Could not auto-detect Hailo architecture. Please specify --arch manually.")
self.arch = detected_arch
print(f"Auto-detected Hailo architecture: {self.arch}")
else:
self.arch = args.arch
if args.hef_path is not None:
self.hef_path = args.hef_path
# Set the HEF file path based on the arch
elif self.arch == "hailo8":
self.hef_path = os.path.join(self.current_path, '../resources/yolov8m.hef')
else: # hailo8l
self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
# Set the post-processing shared object file
self.post_process_so = os.path.join(self.current_path, 'libyolo_hailortpp_postprocess.so')
self.cropper_process_so = os.path.join(self.current_path, 'libwhole_buffer.so')
# User-defined label JSON file
self.labels_json = args.labels_json
self.app_callback = app_callback
self.thresholds_str = (
f"nms-score-threshold={nms_score_threshold} "
f"nms-iou-threshold={nms_iou_threshold} "
f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
)
# Set the process title
setproctitle.setproctitle("Hailo Detection App")
self.create_pipeline()
def get_pipeline_string(self):
pipeline = (
"shmsrc socket-path=/tmp/feed.raw do-timestamp=true is-live=true ! "
+ "video/x-raw, format=NV12, width=1920, height=1080, framerate=30/1 ! "
+ "videoconvert ! "
+ "video/x-raw, format=RGB, width=1920, height=1080, framerate=30/1 ! "
+ f"hailocropper so-path={self.cropper_process_so} function-name=create_crops use-letterbox=true resize-method=inter-area internal-offset=true name=cropper1 "
+ "hailoaggregator name=agg1 "
+ "cropper1. ! queue name=bypess1_q leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! agg1. "
+ "cropper1. ! "
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! "
+ "videoscale qos=false n-threads=2 ! "
+ "video/x-raw, pixel-aspect-ratio=1/1 ! "
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! "
+ "hailonet hef-path=yolov8m.hef batch-size=1 ! "
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! "
+ f"hailofilter so-path={self.post_process_so} qos=false ! "
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! agg1. "
+ "agg1. ! hailotracker name=hailo_tracker keep-tracked-frames=3 keep-new-frames=3 keep-lost-frames=3 ! "
+ "queue name=queue_user_callback leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! "
+ "identity name=identity_callback signal-handoffs=true ! "
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! "
+ "hailooverlay qos=false line-thickness=4 font-thickness=4 ! "
+ "queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! "
+ "videoconvert n-threads=2 qos=false ! "
# fps displaysink
#+ "fpsdisplaysink video-sink=xvimagesink name=hailo_display sync=false"
# OR try the shared memory sink
+ "shmsink socket-path=/tmp/infered.feed sync=false wait-for-connection=false"
)
print(pipeline)
return pipeline
if __name__ == "__main__":
# Create an instance of the user app callback class
user_data = app_callback_class()
app_callback = dummy_callback
app = GStreamerDetectionApp(app_callback, user_data)
app.run()
gst-launch-1.0 libcamerasrc ! \
video/x-raw,width=1920,height=1080,framerate=30/1,format=NV12 ! \
queue max-size-buffers=2 leaky=downstream ! \
videorate ! \
video/x-raw,framerate=30/1 ! \
videoconvert ! \
shmsink socket-path=/tmp/feed.raw sync=false wait-for-connection=false
#!/bin/bash
# TAPPAS CORE Definitions
CORE_VENV_NAME="venv_hailo_rpi5_examples"
CORE_REQUIRED_VERSION=("3.29.1" "3.30.0")
# TAPPAS Definitions
TAPPAS_VENV_NAME="hailo_tappas_venv"
TAPPAS_REQUIRED_VERSION=("3.29.0" "3.29.1" "3.30.0")
# Function to check if the script is being sourced
is_sourced() {
if [ -n "$ZSH_VERSION" ]; then
[[ -o sourced ]]
elif [ -n "$BASH_VERSION" ]; then
[[ "${BASH_SOURCE[0]}" != "$0" ]]
else
echo "Unsupported shell. Please use bash or zsh."
return 1
fi
}
# Only proceed if the script is being sourced
if is_sourced; then
echo "Setting up the environment..."
# Check if we are working with hailo_tappas or hailo-tappas-core
if pkg-config --exists hailo_tappas; then
TAPPAS_CORE=0
REQUIRED_VERSION=("${TAPPAS_REQUIRED_VERSION[@]}")
echo "Setting up the environment for hailo_tappas..."
TAPPAS_VERSION=$(pkg-config --modversion hailo_tappas)
TAPPAS_WORKSPACE=$(pkg-config --variable=tappas_workspace hailo_tappas)
export TAPPAS_WORKSPACE
echo "TAPPAS_WORKSPACE set to $TAPPAS_WORKSPACE"
if [[ "$TAPPAS_WORKSPACE" == "/local/workspace/tappas" ]]; then
VENV_NAME="DOCKER"
else
VENV_NAME=$TAPPAS_VENV_NAME
fi
else
TAPPAS_CORE=1
VENV_NAME=$CORE_VENV_NAME
REQUIRED_VERSION=("${CORE_REQUIRED_VERSION[@]}")
echo "Setting up the environment for hailo-tappas-core..."
TAPPAS_VERSION=$(pkg-config --modversion hailo-tappas-core)
fi
# Check if TAPPAS_VERSION is in REQUIRED_VERSION
version_match=0
for version in "${REQUIRED_VERSION[@]}"; do
if [ "$TAPPAS_VERSION" = "$version" ]; then
version_match=1
break
fi
done
if [ "$version_match" -eq 1 ]; then
echo "TAPPAS_VERSION is ${TAPPAS_VERSION}. Proceeding..."
else
echo "TAPPAS_VERSION is ${TAPPAS_VERSION} not in the list of required versions ${REQUIRED_VERSION[*]}."
return 1
fi
if [ $TAPPAS_CORE -eq 1 ]; then
# Get the directory of the current script
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-${(%):-%N}}")" &> /dev/null && pwd)"
# Check if we are in the defined virtual environment
if [[ "$VIRTUAL_ENV" == *"$VENV_NAME"* ]]; then
echo "You are in the $VENV_NAME virtual environment."
else
echo "You are not in the $VENV_NAME virtual environment."
# Check if the virtual environment exists in the same directory as the script
if [ -d "$SCRIPT_DIR/$VENV_NAME" ]; then
echo "Virtual environment exists. Activating..."
source "$SCRIPT_DIR/$VENV_NAME/bin/activate"
else
echo "Virtual environment does not exist. Creating and activating..."
python3 -m venv --system-site-packages "$SCRIPT_DIR/$VENV_NAME"
source "$SCRIPT_DIR/$VENV_NAME/bin/activate"
fi
fi
TAPPAS_POST_PROC_DIR=$(pkg-config --variable=tappas_postproc_lib_dir hailo-tappas-core)
else
if [[ "$VENV_NAME" == "DOCKER" ]]; then
echo "Running in DOCKER using default virtualenv"
else
# Check if we are in the defined virtual environment
if [[ "$VIRTUAL_ENV" == *"$VENV_NAME"* ]]; then
echo "You are in the $VENV_NAME virtual environment."
else
echo "You are not in the $VENV_NAME virtual environment."
# Activate TAPPAS virtual environment
VENV_PATH="${TAPPAS_WORKSPACE}/hailo_tappas_venv/bin/activate"
if [ -f "$VENV_PATH" ]; then
echo "Activating virtual environment..."
source "$VENV_PATH"
else
echo "Error: Virtual environment not found at $VENV_PATH."
return 1
fi
fi
fi
TAPPAS_POST_PROC_DIR="${TAPPAS_WORKSPACE}/apps/h8/gstreamer/libs/post_processes/"
fi
export TAPPAS_POST_PROC_DIR
echo "TAPPAS_POST_PROC_DIR set to $TAPPAS_POST_PROC_DIR"
# Get the Device Architecture
output=$(hailortcli fw-control identify | tr -d '\0')
# Extract the Device Architecture from the output
device_arch=$(echo "$output" | grep "Device Architecture" | awk -F": " '{print $2}')
# if the device architecture is not found, output the error message and return
if [ -z "$device_arch" ]; then
echo "Error: Device Architecture not found. Please check the connection to the device."
return 1
fi
# Export the Device Architecture to an environment variable
export DEVICE_ARCHITECTURE="$device_arch"
# Print the environment variable to verify
echo "DEVICE_ARCHITECTURE is set to: $DEVICE_ARCHITECTURE"
else
echo "This script needs to be sourced to correctly set up the environment. Please run '. $(basename "$0")' instead of executing it."
fi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment