Created
December 17, 2024 06:04
-
-
Save lizTheDeveloper/c81eafff3017d27a155cf65a1b61d9c4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import os | |
| import cv2 | |
| from pathlib import Path | |
| from PIL import Image | |
| from transformers import AutoModelForVision2Seq, AutoProcessor | |
| import torch | |
| from lerobot import make_robot, init_hydra_config | |
| from lerobot.utils import log_say, log_control_info, busy_wait | |
| class DualCameraCapture: | |
| def __init__(self, stream_url1, stream_url2): | |
| self.stream_url1 = stream_url1 | |
| self.stream_url2 = stream_url2 | |
| self.cap1 = None | |
| self.cap2 = None | |
| self.most_recent_timestamp = time.time() | |
| def capture_frames(self): | |
| """ | |
| Capture a frame from two streams and save them as 'cam1.jpg' and 'cam2.jpg'. | |
| """ | |
| self.most_recent_timestamp = time.time() | |
| try: | |
| # Initialize connections if needed | |
| if self.cap1 is None or not self.cap1.isOpened(): | |
| self.cap1 = cv2.VideoCapture(self.stream_url1) | |
| if self.cap2 is None or not self.cap2.isOpened(): | |
| self.cap2 = cv2.VideoCapture(self.stream_url2) | |
| # Capture frames from both cameras | |
| ret1, frame1 = self.cap1.read() | |
| ret2, frame2 = self.cap2.read() | |
| if not ret1 or not ret2: | |
| print("Error: Unable to fetch frames from one or both cameras.") | |
| return False | |
| # Save frames | |
| cv2.imwrite("cam1.jpg", frame1) | |
| cv2.imwrite("cam2.jpg", frame2) | |
| print("Captured frames from both cameras.") | |
| return True | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| return False | |
| def release(self): | |
| if self.cap1: | |
| self.cap1.release() | |
| if self.cap2: | |
| self.cap2.release() | |
| cv2.destroyAllWindows() | |
| def load_openvla(): | |
| """ | |
| Load the OpenVLA model and processor. | |
| """ | |
| print("Loading OpenVLA model...") | |
| processor = AutoProcessor.from_pretrained("openvla/openvla-7b", trust_remote_code=True) | |
| vla = AutoModelForVision2Seq.from_pretrained( | |
| "openvla/openvla-7b", | |
| attn_implementation="flash_attention_2", | |
| torch_dtype=torch.bfloat16, | |
| low_cpu_mem_usage=True, | |
| trust_remote_code=True | |
| ).to("cuda:0") | |
| print("OpenVLA model loaded successfully.") | |
| return processor, vla | |
| def control_so100_with_openvla(robot, fps, stream_url1, stream_url2): | |
| """ | |
| Capture frames from two cameras, predict actions with OpenVLA, and send actions to the SO-100 robot. | |
| """ | |
| processor, vla = load_openvla() | |
| camera_capture = DualCameraCapture(stream_url1, stream_url2) | |
| if not robot.is_connected: | |
| robot.connect() | |
| log_say("Running OpenVLA action predictions on SO-100 with dual cameras", play_sounds=True, blocking=True) | |
| try: | |
| while True: | |
| start_time = time.perf_counter() | |
| # Capture frames from both cameras | |
| if not camera_capture.capture_frames(): | |
| print("Frame capture failed. Retrying...") | |
| continue | |
| # Combine frames (e.g., concatenate horizontally for OpenVLA input) | |
| frame1 = Image.open("cam1.jpg") | |
| frame2 = Image.open("cam2.jpg") | |
| combined_image = Image.new("RGB", (frame1.width + frame2.width, frame1.height)) | |
| combined_image.paste(frame1, (0, 0)) | |
| combined_image.paste(frame2, (frame1.width, 0)) | |
| combined_image.save("combined.jpg") | |
| # Process combined frame | |
| prompt = "What action should the robot take?" | |
| inputs = processor(prompt, combined_image).to("cuda:0", dtype=torch.bfloat16) | |
| # Predict action and send it to the robot | |
| action = vla.predict_action(**inputs, do_sample=False) | |
| robot.send_action(action) | |
| print(f"Action sent to SO-100: {action}") | |
| # Maintain FPS control | |
| elapsed_time = time.perf_counter() - start_time | |
| if fps: | |
| busy_wait(max(0, 1 / fps - elapsed_time)) | |
| except KeyboardInterrupt: | |
| print("Stopping SO-100 OpenVLA control.") | |
| finally: | |
| camera_capture.release() | |
| if robot.is_connected: | |
| robot.disconnect() | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--robot-path", type=str, default="lerobot/configs/robot/so100.yaml", | |
| help="Path to SO-100 YAML config file.") | |
| parser.add_argument("--fps", type=int, default=30, help="Frames per second.") | |
| parser.add_argument("--stream-url1", type=str, required=True, | |
| help="URL of the first video stream.") | |
| parser.add_argument("--stream-url2", type=str, required=True, | |
| help="URL of the second video stream.") | |
| args = parser.parse_args() | |
| # Initialize SO-100 robot | |
| robot_cfg = init_hydra_config(args.robot_path, []) | |
| robot = make_robot(robot_cfg) | |
| # Control SO-100 with OpenVLA and dual cameras | |
| try: | |
| control_so100_with_openvla(robot, fps=args.fps, stream_url1=args.stream_url1, stream_url2=args.stream_url2) | |
| except Exception as e: | |
| print(f"Error: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment