Created
November 5, 2025 13:22
-
-
Save iamparthaonline/7d1b293c82cfc04fc919553e0823b751 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import onnxruntime as ort | |
| import numpy as np | |
| import cv2 | |
| import os | |
| import time | |
| from datetime import datetime | |
| import requests | |
| import json | |
| # === Configuration === | |
| TELEGRAM_BOT_TOKEN = "" | |
| TELEGRAM_CHAT_ID = "" # your channel ID | |
| IMGBB_API_KEY = "" | |
| MODEL_PATH = "/home/roy/yolov8s.onnx" # change to yolov8s.onnx if you want more accuracy | |
| LOG_FILE = "/home/roy/cat_log.jsonl" | |
| CAT_CLASS_ID = 15 # COCO 'cat' | |
| CONF_THRESH = 0.25 | |
| # === COCO Class Labels === | |
| COCO_CLASSES = [ | |
| "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", | |
| "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", | |
| "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", | |
| "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", | |
| "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", | |
| "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", | |
| "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", | |
| "chair", "couch", "potted plant", "bed", "dining table", "toilet", "tv", "laptop", | |
| "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", | |
| "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", | |
| "toothbrush" | |
| ] | |
| # === Initialize ONNX model === | |
| print("π§ Loading YOLO ONNX model...") | |
| session = ort.InferenceSession(MODEL_PATH, providers=["CPUExecutionProvider"]) | |
| print("β Model loaded successfully!") | |
| def letterbox(im, new_shape=(640, 640), color=(114, 114, 114)): | |
| """ | |
| Resize and pad image to meet new_shape while keeping aspect ratio. | |
| Returns: padded_image, scale_ratio, pad_w (left), pad_h (top) | |
| """ | |
| shape = im.shape[:2] # (h, w) | |
| r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) | |
| new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r))) # (w, h) | |
| dw = new_shape[1] - new_unpad[0] | |
| dh = new_shape[0] - new_unpad[1] | |
| dw /= 2 | |
| dh /= 2 | |
| # resize | |
| im_resized = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR) | |
| # pad | |
| top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) | |
| left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) | |
| im_padded = cv2.copyMakeBorder(im_resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) | |
| return im_padded, r, left, top | |
| def capture_photo(): | |
| """Capture photo using Pi Camera.""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| photo_path = f"/home/roy/photo_{timestamp}.jpg" | |
| os.system(f"rpicam-still -t 10 -n --width 1280 --height 720 -o {photo_path}") | |
| return photo_path | |
| def detect_objects_onnx(image_path): | |
| """Detect objects with ONNX model and return normalized detections mapped to original image.""" | |
| start = time.time() | |
| img = cv2.imread(image_path) | |
| if img is not None: | |
| # Rotate 180 degrees | |
| img = cv2.rotate(img, cv2.ROTATE_180) | |
| if img is None: | |
| return {"isCat": False, "description": "Image not found", "duration_sec": 0.0, "detections": []} | |
| orig_h, orig_w = img.shape[:2] | |
| # letterbox (preserve aspect ratio) to 640x640 | |
| img_letter, r, pad_w, pad_h = letterbox(img, (640, 640)) | |
| img_input = img_letter.transpose(2, 0, 1)[None].astype(np.float32) / 255.0 | |
| # Run inference | |
| outputs = session.run(None, {"images": img_input})[0] | |
| # Normalize outputs shape | |
| if isinstance(outputs, list): | |
| outputs = outputs[0] | |
| if outputs is None: | |
| return {"isCat": False, "description": "No output", "duration_sec": 0.0, "detections": []} | |
| # outputs shape could be (1,N,...) or (N,...) | |
| if outputs.ndim == 3: | |
| outputs = outputs[0] | |
| detections = [] | |
| cat_found = False | |
| for det in outputs: | |
| det = np.array(det) | |
| if det.size == 0: | |
| continue | |
| # handle different output formats: | |
| # format A: [x1, y1, x2, y2, obj_conf, cls1, cls2, ...] | |
| # format B: [x1, y1, x2, y2, conf, cls_id] | |
| if det.size > 6: | |
| x1, y1, x2, y2 = det[:4].astype(float) | |
| obj_conf = float(det[4]) | |
| class_scores = det[5:] | |
| if class_scores.size == 0: | |
| continue | |
| cls_id = int(np.argmax(class_scores)) | |
| cls_conf = float(class_scores[cls_id]) | |
| conf = obj_conf * cls_conf | |
| else: | |
| x1, y1, x2, y2, conf, cls_id = det[:6].astype(float) | |
| cls_id = int(cls_id) | |
| conf = float(conf) | |
| if conf < CONF_THRESH: | |
| continue | |
| # map coordinates from 640x640 (with padding) back to original image coordinates | |
| # first remove padding offset, then divide by scale | |
| x1_unpad = (x1 - pad_w) / r | |
| y1_unpad = (y1 - pad_h) / r | |
| x2_unpad = (x2 - pad_w) / r | |
| y2_unpad = (y2 - pad_h) / r | |
| # clip to image size | |
| x1c = max(0, min(int(round(x1_unpad)), orig_w - 1)) | |
| y1c = max(0, min(int(round(y1_unpad)), orig_h - 1)) | |
| x2c = max(0, min(int(round(x2_unpad)), orig_w - 1)) | |
| y2c = max(0, min(int(round(y2_unpad)), orig_h - 1)) | |
| label = COCO_CLASSES[cls_id] if 0 <= cls_id < len(COCO_CLASSES) else str(cls_id) | |
| if cls_id == CAT_CLASS_ID: | |
| cat_found = True | |
| detections.append({ | |
| "label": label, | |
| "conf": float(conf), | |
| "box": [x1c, y1c, x2c, y2c] | |
| }) | |
| elapsed = round(time.time() - start, 2) | |
| return { | |
| "isCat": cat_found, | |
| "description": "Cat detected locally via YOLO" if cat_found else "No cat detected", | |
| "duration_sec": elapsed, | |
| "detections": detections, | |
| } | |
| def draw_boxes(image_path, detections): | |
| """Draw bounding boxes with labels and confidence % on the original image.""" | |
| img = cv2.imread(image_path) | |
| if img is None: | |
| return None | |
| for idx, det in enumerate(detections): | |
| label = det["label"] | |
| conf = det["conf"] | |
| x1, y1, x2, y2 = det["box"] | |
| # color pick: cats green, persons blue, others yellow | |
| if label == "cat": | |
| color = (0, 200, 0) | |
| elif label == "person": | |
| color = (200, 100, 0) | |
| else: | |
| color = (0, 200, 200) | |
| # draw rectangle | |
| cv2.rectangle(img, (x1, y1), (x2, y2), color, 2) | |
| # prepare label text | |
| text = f"{label} {conf*100:.1f}%" | |
| # put filled rectangle behind text for visibility | |
| (text_w, text_h), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) | |
| cv2.rectangle(img, (x1, max(0, y1 - text_h - baseline - 4)), (x1 + text_w, y1), color, -1) | |
| cv2.putText(img, text, (x1, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA) | |
| marked_path = image_path.replace(".jpg", "_marked.jpg") | |
| cv2.imwrite(marked_path, img) | |
| return marked_path | |
| def upload_photo(image_path): | |
| """Uploads photo to imgbb and returns the image URL (or None).""" | |
| try: | |
| with open(image_path, "rb") as f: | |
| files = {"image": f} | |
| upload_url = f"https://api.imgbb.com/1/upload?key={IMGBB_API_KEY}" | |
| upload_resp = requests.post(upload_url, files=files) | |
| if upload_resp.status_code != 200: | |
| print("β Image upload failed:", upload_resp.text) | |
| return None | |
| image_url = upload_resp.json()["data"]["url"] | |
| print(f"π Uploaded image to imgbb: {image_url}") | |
| return image_url | |
| except Exception as e: | |
| print("β Upload exception:", e) | |
| return None | |
| def send_telegram_alert(image_path, description, duration, extra_text=None): | |
| """Send Telegram message with image and timing info.""" | |
| try: | |
| with open(image_path, "rb") as photo: | |
| caption_lines = [ | |
| "πΎ Detection Alert!", | |
| f"π {description}", | |
| f"β±οΈ Processing time: {duration:.2f} sec" | |
| ] | |
| if extra_text: | |
| caption_lines.insert(1, extra_text) | |
| caption = "\n".join(caption_lines) | |
| url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" | |
| data = {"chat_id": TELEGRAM_CHAT_ID, "caption": caption} | |
| r = requests.post(url, data=data, files={"photo": photo}) | |
| if r.status_code == 200: | |
| print("π¨ Sent to Telegram successfully!") | |
| else: | |
| print("β οΈ Telegram error:", r.status_code, r.text) | |
| except Exception as e: | |
| print("β οΈ Telegram send failed:", e) | |
| def log_result(entry): | |
| """Append result to log file.""" | |
| os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) | |
| with open(LOG_FILE, "a") as f: | |
| f.write(json.dumps(entry) + "\n") | |
| print(f"π Logged to {LOG_FILE}") | |
| def job(): | |
| """Main pipeline.""" | |
| print("π Running detection job...") | |
| total_start = time.time() | |
| photo_path = capture_photo() | |
| print(f"πΈ Captured: {photo_path}") | |
| result = detect_objects_onnx(photo_path) | |
| print("π Detection result:", result) | |
| # draw boxes (on original) and get marked image path | |
| marked_path = draw_boxes(photo_path, result["detections"]) if result["detections"] else None | |
| total_time = round(time.time() - total_start, 2) | |
| print(f"β±οΈ Total time (capture β detection): {total_time} sec") | |
| # Log | |
| log_entry = { | |
| "timestamp": datetime.now().isoformat(timespec="seconds"), | |
| "image_path": photo_path, | |
| "isCat": result["isCat"], | |
| "description": result["description"], | |
| "duration_sec": total_time, | |
| "detections_count": len(result["detections"]) | |
| } | |
| log_result(log_entry) | |
| # Upload marked image (if any) for reference | |
| uploaded_url = None | |
| if marked_path: | |
| uploaded_url = upload_photo(marked_path) | |
| # Compose extra caption if multiple cats or multiple detections | |
| extra_text = None | |
| cat_confs = [d["conf"] for d in result["detections"] if d["label"] == "cat"] | |
| if len(cat_confs) > 0: | |
| # example: "π± 2 cats detected (83.2%, 71.5%)" | |
| cat_text = "π± {} cat{}".format(len(cat_confs), "s" if len(cat_confs) > 1 else "") | |
| confs_text = ", ".join([f"{c*100:.1f}%" for c in cat_confs]) | |
| extra_text = f"{cat_text} ({confs_text})" | |
| # Telegram alert if cat found | |
| if result["isCat"] and marked_path: | |
| send_telegram_alert(marked_path, result["description"], total_time, extra_text=extra_text) | |
| # Cleanup local files | |
| for f in [photo_path, marked_path]: | |
| if f and os.path.exists(f): | |
| os.remove(f) | |
| print(f"π§Ή Deleted {f}") | |
| if __name__ == "__main__": | |
| job() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment