Created
August 27, 2024 20:24
-
-
Save chrismatthieu/067359bc2fedd0e37ad6342bf85e3c0e to your computer and use it in GitHub Desktop.
realsense-sms-youtube
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyrealsense2 as rs | |
import numpy as np | |
import cv2 | |
import requests | |
import http.client | |
import json | |
# Configure depth and color streams | |
pipeline = rs.pipeline() | |
config = rs.config() | |
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) | |
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) | |
# Start streaming | |
pipeline.start(config) | |
# Load YOLOv3 model | |
net = cv2.dnn.readNetFromDarknet( | |
'C:/Users/cmatthie/Projects/robotchat-tests/yolov3.cfg', | |
'C:/Users/cmatthie/Projects/robotchat-tests/yolov3.weights' | |
) | |
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV) | |
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) | |
# Load class labels | |
with open('C:/Users/cmatthie/Projects/robotchat-tests/coco.names', 'r') as f: | |
classes = f.read().strip().split('\n') | |
person_detected = False | |
try: | |
while True: | |
# Wait for a coherent pair of frames: depth and color | |
frames = pipeline.wait_for_frames() | |
color_frame = frames.get_color_frame() | |
depth_frame = frames.get_depth_frame() | |
if not color_frame or not depth_frame: | |
continue | |
# Convert images to numpy arrays | |
color_image = np.asanyarray(color_frame.get_data()) | |
# Prepare the image for person detection | |
(h, w) = color_image.shape[:2] | |
blob = cv2.dnn.blobFromImage(color_image, 1 / 255.0, (416, 416), swapRB=True, crop=False) | |
net.setInput(blob) | |
layer_names = net.getLayerNames() | |
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()] | |
detections = net.forward(output_layers) | |
# Loop over the detections and show only one bounding box | |
for output in detections: | |
for detection in output: | |
scores = detection[5:] | |
class_id = np.argmax(scores) | |
confidence = scores[class_id] | |
# Filter out weak detections and ensure the detected object is a person | |
if confidence > 0.5 and classes[class_id] == 'person': | |
box = detection[0:4] * np.array([w, h, w, h]) | |
(centerX, centerY, width, height) = box.astype("int") | |
startX = int(centerX - (width / 2)) | |
startY = int(centerY - (height / 2)) | |
# Calculate the distance of the person from the camera | |
distance = depth_frame.get_distance(centerX, centerY) | |
# Draw the bounding box of the person along with the associated probability and distance | |
text = "{:.2f}%".format(confidence * 100) | |
y = startY - 10 if startY - 10 > 10 else startY + 10 | |
cv2.rectangle(color_image, (startX, startY), (startX + width, startY + height), | |
(0, 0, 255), 2) | |
cv2.putText(color_image, f"{text}, {distance:.2f}m", (startX, y), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) | |
print(distance) | |
if not person_detected and distance < 1.0: | |
conn = http.client.HTTPSConnection("x.api.infobip.com") | |
payload = json.dumps({ | |
"messages": [ | |
{ | |
"destinations": [{"to":"14803813574"}], | |
"from": "447491163443", | |
"text": "a person was detected within 1 meter" | |
} | |
] | |
}) | |
headers = { | |
'Authorization': 'App 0fe535ba12a2e76df32d-x-2471fbb80a38', | |
'Content-Type': 'application/json', | |
'Accept': 'application/json' | |
} | |
conn.request("POST", "/sms/2/text/advanced", payload, headers) | |
res = conn.getresponse() | |
data = res.read() | |
print(data.decode("utf-8")) | |
person_detected = True | |
break | |
break | |
# Reset person_detected flag if no person is detected | |
if not any(classes[np.argmax(detection[5:])] == 'person' and detection[5 + np.argmax(detection[5:])] > 0.5 for output in detections for detection in output): | |
person_detected = False | |
if distance > 1.0: | |
person_detected = False | |
# Show the output frame | |
cv2.imshow("Frame", color_image) | |
key = cv2.waitKey(1) & 0xFF | |
# If the `q` key was pressed, break from the loop | |
if key == ord("q"): | |
break | |
finally: | |
# Stop streaming | |
pipeline.stop() | |
cv2.destroyAllWindows() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment