Skip to content

Instantly share code, notes, and snippets.

@iamparthaonline
Created November 5, 2025 13:22
Show Gist options
  • Select an option

  • Save iamparthaonline/7d1b293c82cfc04fc919553e0823b751 to your computer and use it in GitHub Desktop.

Select an option

Save iamparthaonline/7d1b293c82cfc04fc919553e0823b751 to your computer and use it in GitHub Desktop.
import onnxruntime as ort
import numpy as np
import cv2
import os
import time
from datetime import datetime
import requests
import json
# === Configuration ===
TELEGRAM_BOT_TOKEN = ""
TELEGRAM_CHAT_ID = "" # your channel ID
IMGBB_API_KEY = ""
MODEL_PATH = "/home/roy/yolov8s.onnx" # change to yolov8s.onnx if you want more accuracy
LOG_FILE = "/home/roy/cat_log.jsonl"
CAT_CLASS_ID = 15 # COCO 'cat'
CONF_THRESH = 0.25
# === COCO Class Labels ===
COCO_CLASSES = [
"person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat",
"traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat",
"dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack",
"umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball",
"kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket",
"bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
"sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake",
"chair", "couch", "potted plant", "bed", "dining table", "toilet", "tv", "laptop",
"mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink",
"refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier",
"toothbrush"
]
# === Initialize ONNX model ===
print("🧠 Loading YOLO ONNX model...")
session = ort.InferenceSession(MODEL_PATH, providers=["CPUExecutionProvider"])
print("βœ… Model loaded successfully!")
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114)):
"""
Resize and pad image to meet new_shape while keeping aspect ratio.
Returns: padded_image, scale_ratio, pad_w (left), pad_h (top)
"""
shape = im.shape[:2] # (h, w)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r))) # (w, h)
dw = new_shape[1] - new_unpad[0]
dh = new_shape[0] - new_unpad[1]
dw /= 2
dh /= 2
# resize
im_resized = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
# pad
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
im_padded = cv2.copyMakeBorder(im_resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
return im_padded, r, left, top
def capture_photo():
"""Capture photo using Pi Camera."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
photo_path = f"/home/roy/photo_{timestamp}.jpg"
os.system(f"rpicam-still -t 10 -n --width 1280 --height 720 -o {photo_path}")
return photo_path
def detect_objects_onnx(image_path):
"""Detect objects with ONNX model and return normalized detections mapped to original image."""
start = time.time()
img = cv2.imread(image_path)
if img is not None:
# Rotate 180 degrees
img = cv2.rotate(img, cv2.ROTATE_180)
if img is None:
return {"isCat": False, "description": "Image not found", "duration_sec": 0.0, "detections": []}
orig_h, orig_w = img.shape[:2]
# letterbox (preserve aspect ratio) to 640x640
img_letter, r, pad_w, pad_h = letterbox(img, (640, 640))
img_input = img_letter.transpose(2, 0, 1)[None].astype(np.float32) / 255.0
# Run inference
outputs = session.run(None, {"images": img_input})[0]
# Normalize outputs shape
if isinstance(outputs, list):
outputs = outputs[0]
if outputs is None:
return {"isCat": False, "description": "No output", "duration_sec": 0.0, "detections": []}
# outputs shape could be (1,N,...) or (N,...)
if outputs.ndim == 3:
outputs = outputs[0]
detections = []
cat_found = False
for det in outputs:
det = np.array(det)
if det.size == 0:
continue
# handle different output formats:
# format A: [x1, y1, x2, y2, obj_conf, cls1, cls2, ...]
# format B: [x1, y1, x2, y2, conf, cls_id]
if det.size > 6:
x1, y1, x2, y2 = det[:4].astype(float)
obj_conf = float(det[4])
class_scores = det[5:]
if class_scores.size == 0:
continue
cls_id = int(np.argmax(class_scores))
cls_conf = float(class_scores[cls_id])
conf = obj_conf * cls_conf
else:
x1, y1, x2, y2, conf, cls_id = det[:6].astype(float)
cls_id = int(cls_id)
conf = float(conf)
if conf < CONF_THRESH:
continue
# map coordinates from 640x640 (with padding) back to original image coordinates
# first remove padding offset, then divide by scale
x1_unpad = (x1 - pad_w) / r
y1_unpad = (y1 - pad_h) / r
x2_unpad = (x2 - pad_w) / r
y2_unpad = (y2 - pad_h) / r
# clip to image size
x1c = max(0, min(int(round(x1_unpad)), orig_w - 1))
y1c = max(0, min(int(round(y1_unpad)), orig_h - 1))
x2c = max(0, min(int(round(x2_unpad)), orig_w - 1))
y2c = max(0, min(int(round(y2_unpad)), orig_h - 1))
label = COCO_CLASSES[cls_id] if 0 <= cls_id < len(COCO_CLASSES) else str(cls_id)
if cls_id == CAT_CLASS_ID:
cat_found = True
detections.append({
"label": label,
"conf": float(conf),
"box": [x1c, y1c, x2c, y2c]
})
elapsed = round(time.time() - start, 2)
return {
"isCat": cat_found,
"description": "Cat detected locally via YOLO" if cat_found else "No cat detected",
"duration_sec": elapsed,
"detections": detections,
}
def draw_boxes(image_path, detections):
"""Draw bounding boxes with labels and confidence % on the original image."""
img = cv2.imread(image_path)
if img is None:
return None
for idx, det in enumerate(detections):
label = det["label"]
conf = det["conf"]
x1, y1, x2, y2 = det["box"]
# color pick: cats green, persons blue, others yellow
if label == "cat":
color = (0, 200, 0)
elif label == "person":
color = (200, 100, 0)
else:
color = (0, 200, 200)
# draw rectangle
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
# prepare label text
text = f"{label} {conf*100:.1f}%"
# put filled rectangle behind text for visibility
(text_w, text_h), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(img, (x1, max(0, y1 - text_h - baseline - 4)), (x1 + text_w, y1), color, -1)
cv2.putText(img, text, (x1, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
marked_path = image_path.replace(".jpg", "_marked.jpg")
cv2.imwrite(marked_path, img)
return marked_path
def upload_photo(image_path):
"""Uploads photo to imgbb and returns the image URL (or None)."""
try:
with open(image_path, "rb") as f:
files = {"image": f}
upload_url = f"https://api.imgbb.com/1/upload?key={IMGBB_API_KEY}"
upload_resp = requests.post(upload_url, files=files)
if upload_resp.status_code != 200:
print("❌ Image upload failed:", upload_resp.text)
return None
image_url = upload_resp.json()["data"]["url"]
print(f"🌐 Uploaded image to imgbb: {image_url}")
return image_url
except Exception as e:
print("❌ Upload exception:", e)
return None
def send_telegram_alert(image_path, description, duration, extra_text=None):
"""Send Telegram message with image and timing info."""
try:
with open(image_path, "rb") as photo:
caption_lines = [
"🐾 Detection Alert!",
f"πŸ“ {description}",
f"⏱️ Processing time: {duration:.2f} sec"
]
if extra_text:
caption_lines.insert(1, extra_text)
caption = "\n".join(caption_lines)
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
data = {"chat_id": TELEGRAM_CHAT_ID, "caption": caption}
r = requests.post(url, data=data, files={"photo": photo})
if r.status_code == 200:
print("πŸ“¨ Sent to Telegram successfully!")
else:
print("⚠️ Telegram error:", r.status_code, r.text)
except Exception as e:
print("⚠️ Telegram send failed:", e)
def log_result(entry):
"""Append result to log file."""
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
with open(LOG_FILE, "a") as f:
f.write(json.dumps(entry) + "\n")
print(f"πŸ“ Logged to {LOG_FILE}")
def job():
"""Main pipeline."""
print("πŸš€ Running detection job...")
total_start = time.time()
photo_path = capture_photo()
print(f"πŸ“Έ Captured: {photo_path}")
result = detect_objects_onnx(photo_path)
print("πŸ” Detection result:", result)
# draw boxes (on original) and get marked image path
marked_path = draw_boxes(photo_path, result["detections"]) if result["detections"] else None
total_time = round(time.time() - total_start, 2)
print(f"⏱️ Total time (capture β†’ detection): {total_time} sec")
# Log
log_entry = {
"timestamp": datetime.now().isoformat(timespec="seconds"),
"image_path": photo_path,
"isCat": result["isCat"],
"description": result["description"],
"duration_sec": total_time,
"detections_count": len(result["detections"])
}
log_result(log_entry)
# Upload marked image (if any) for reference
uploaded_url = None
if marked_path:
uploaded_url = upload_photo(marked_path)
# Compose extra caption if multiple cats or multiple detections
extra_text = None
cat_confs = [d["conf"] for d in result["detections"] if d["label"] == "cat"]
if len(cat_confs) > 0:
# example: "🐱 2 cats detected (83.2%, 71.5%)"
cat_text = "🐱 {} cat{}".format(len(cat_confs), "s" if len(cat_confs) > 1 else "")
confs_text = ", ".join([f"{c*100:.1f}%" for c in cat_confs])
extra_text = f"{cat_text} ({confs_text})"
# Telegram alert if cat found
if result["isCat"] and marked_path:
send_telegram_alert(marked_path, result["description"], total_time, extra_text=extra_text)
# Cleanup local files
for f in [photo_path, marked_path]:
if f and os.path.exists(f):
os.remove(f)
print(f"🧹 Deleted {f}")
if __name__ == "__main__":
job()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment