Created
September 30, 2024 16:30
-
-
Save chrismatthieu/677c1a5505f57bd508aea0c22453cc15 to your computer and use it in GitHub Desktop.
realsense person detection and follow me
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import rclpy | |
from rclpy.node import Node | |
from geometry_msgs.msg import Twist | |
import cv2 | |
import numpy as np | |
import pyrealsense2 as rs | |
from ultralytics import YOLO | |
def check_dependencies(): | |
global missing_modules | |
missing_modules = [] | |
try: | |
import cv2 | |
except ImportError: | |
missing_modules.append("opencv-python") | |
try: | |
import numpy as np | |
except ImportError: | |
missing_modules.append("numpy") | |
try: | |
import pyrealsense2 as rs | |
except ImportError: | |
missing_modules.append("pyrealsense2") | |
try: | |
from ultralytics import YOLO | |
except ImportError: | |
missing_modules.append("ultralytics") | |
try: | |
import rclpy | |
from geometry_msgs.msg import Twist | |
except ImportError: | |
missing_modules.append("rclpy") | |
if missing_modules: | |
print("Error: The following required modules are missing:") | |
for module in missing_modules: | |
print(f" - {module}") | |
print("\nPlease install the missing modules using:") | |
print(f"pip install {' '.join(missing_modules)}") | |
print("\nNote: For pyrealsense2 and rclpy, you may need to follow specific installation instructions for your system.") | |
sys.exit(1) | |
class PersonDetectionNode(Node): | |
def __init__(self): | |
super().__init__('person_detection_node') | |
self.cmd_vel_publisher = self.create_publisher(Twist, 'cmd_vel', 10) | |
self.timer = self.create_timer(0.1, self.timer_callback) | |
# Initialize RealSense pipeline | |
self.pipeline = rs.pipeline() | |
self.config = rs.config() | |
self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30) | |
self.config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30) | |
# Create an align object | |
self.align = rs.align(rs.stream.color) | |
self.pipeline.start(self.config) | |
# Initialize YOLO model | |
self.model = YOLO('yolov8n.pt') # Load the smallest YOLOv8 model | |
def timer_callback(self): | |
# Wait for a coherent pair of frames: depth and color | |
frames = self.pipeline.wait_for_frames() | |
aligned_frames = self.align.process(frames) | |
color_frame = aligned_frames.get_color_frame() | |
depth_frame = aligned_frames.get_depth_frame() | |
if not color_frame or not depth_frame: | |
return | |
# Convert images to numpy arrays | |
color_image = np.asanyarray(color_frame.get_data()) | |
depth_image = np.asanyarray(depth_frame.get_data()) | |
# Run YOLO detection | |
results = self.model(color_image) | |
closest_person_distance = float('inf') | |
# Process results | |
for result in results: | |
boxes = result.boxes.cpu().numpy() | |
for box in boxes: | |
if box.cls == 0: # Class 0 is person in COCO dataset | |
x1, y1, x2, y2 = box.xyxy[0].astype(int) | |
# Calculate the center of the bounding box | |
center_x = (x1 + x2) // 2 | |
center_y = (y1 + y2) // 2 | |
# Get the depth value at the center of the bounding box | |
depth_value = depth_frame.get_distance(center_x, center_y) | |
if depth_value < closest_person_distance: | |
closest_person_distance = depth_value | |
cv2.rectangle(color_image, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
cv2.putText(color_image, f"Person {box.conf[0]:.2f} - {depth_value:.2f}m", (x1, y1 - 10), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
# Control the robot based on the closest person's distance | |
self.control_robot(closest_person_distance) | |
# Display the image | |
cv2.imshow('RealSense', color_image) | |
cv2.waitKey(1) | |
def control_robot(self, distance): | |
cmd_vel = Twist() | |
if distance > 1.0: | |
# Move forward if the person is more than 1 meter away | |
cmd_vel.linear.x = 0.5 # Adjust this value for desired speed | |
else: | |
# Stop if the person is 1 meter away or closer | |
cmd_vel.linear.x = 0.0 | |
self.cmd_vel_publisher.publish(cmd_vel) | |
def main(args=None): | |
check_dependencies() | |
rclpy.init(args=args) | |
node = PersonDetectionNode() | |
try: | |
rclpy.spin(node) | |
except KeyboardInterrupt: | |
pass | |
finally: | |
node.pipeline.stop() | |
cv2.destroyAllWindows() | |
node.destroy_node() | |
rclpy.shutdown() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment