Skip to content

Instantly share code, notes, and snippets.

@lizTheDeveloper
Created December 17, 2024 06:04
Show Gist options
  • Save lizTheDeveloper/c81eafff3017d27a155cf65a1b61d9c4 to your computer and use it in GitHub Desktop.
Save lizTheDeveloper/c81eafff3017d27a155cf65a1b61d9c4 to your computer and use it in GitHub Desktop.
import time
import os
import cv2
from pathlib import Path
from PIL import Image
from transformers import AutoModelForVision2Seq, AutoProcessor
import torch
from lerobot import make_robot, init_hydra_config
from lerobot.utils import log_say, log_control_info, busy_wait
class DualCameraCapture:
def __init__(self, stream_url1, stream_url2):
self.stream_url1 = stream_url1
self.stream_url2 = stream_url2
self.cap1 = None
self.cap2 = None
self.most_recent_timestamp = time.time()
def capture_frames(self):
"""
Capture a frame from two streams and save them as 'cam1.jpg' and 'cam2.jpg'.
"""
self.most_recent_timestamp = time.time()
try:
# Initialize connections if needed
if self.cap1 is None or not self.cap1.isOpened():
self.cap1 = cv2.VideoCapture(self.stream_url1)
if self.cap2 is None or not self.cap2.isOpened():
self.cap2 = cv2.VideoCapture(self.stream_url2)
# Capture frames from both cameras
ret1, frame1 = self.cap1.read()
ret2, frame2 = self.cap2.read()
if not ret1 or not ret2:
print("Error: Unable to fetch frames from one or both cameras.")
return False
# Save frames
cv2.imwrite("cam1.jpg", frame1)
cv2.imwrite("cam2.jpg", frame2)
print("Captured frames from both cameras.")
return True
except Exception as e:
print(f"Error: {e}")
return False
def release(self):
if self.cap1:
self.cap1.release()
if self.cap2:
self.cap2.release()
cv2.destroyAllWindows()
def load_openvla():
"""
Load the OpenVLA model and processor.
"""
print("Loading OpenVLA model...")
processor = AutoProcessor.from_pretrained("openvla/openvla-7b", trust_remote_code=True)
vla = AutoModelForVision2Seq.from_pretrained(
"openvla/openvla-7b",
attn_implementation="flash_attention_2",
torch_dtype=torch.bfloat16,
low_cpu_mem_usage=True,
trust_remote_code=True
).to("cuda:0")
print("OpenVLA model loaded successfully.")
return processor, vla
def control_so100_with_openvla(robot, fps, stream_url1, stream_url2):
"""
Capture frames from two cameras, predict actions with OpenVLA, and send actions to the SO-100 robot.
"""
processor, vla = load_openvla()
camera_capture = DualCameraCapture(stream_url1, stream_url2)
if not robot.is_connected:
robot.connect()
log_say("Running OpenVLA action predictions on SO-100 with dual cameras", play_sounds=True, blocking=True)
try:
while True:
start_time = time.perf_counter()
# Capture frames from both cameras
if not camera_capture.capture_frames():
print("Frame capture failed. Retrying...")
continue
# Combine frames (e.g., concatenate horizontally for OpenVLA input)
frame1 = Image.open("cam1.jpg")
frame2 = Image.open("cam2.jpg")
combined_image = Image.new("RGB", (frame1.width + frame2.width, frame1.height))
combined_image.paste(frame1, (0, 0))
combined_image.paste(frame2, (frame1.width, 0))
combined_image.save("combined.jpg")
# Process combined frame
prompt = "What action should the robot take?"
inputs = processor(prompt, combined_image).to("cuda:0", dtype=torch.bfloat16)
# Predict action and send it to the robot
action = vla.predict_action(**inputs, do_sample=False)
robot.send_action(action)
print(f"Action sent to SO-100: {action}")
# Maintain FPS control
elapsed_time = time.perf_counter() - start_time
if fps:
busy_wait(max(0, 1 / fps - elapsed_time))
except KeyboardInterrupt:
print("Stopping SO-100 OpenVLA control.")
finally:
camera_capture.release()
if robot.is_connected:
robot.disconnect()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--robot-path", type=str, default="lerobot/configs/robot/so100.yaml",
help="Path to SO-100 YAML config file.")
parser.add_argument("--fps", type=int, default=30, help="Frames per second.")
parser.add_argument("--stream-url1", type=str, required=True,
help="URL of the first video stream.")
parser.add_argument("--stream-url2", type=str, required=True,
help="URL of the second video stream.")
args = parser.parse_args()
# Initialize SO-100 robot
robot_cfg = init_hydra_config(args.robot_path, [])
robot = make_robot(robot_cfg)
# Control SO-100 with OpenVLA and dual cameras
try:
control_so100_with_openvla(robot, fps=args.fps, stream_url1=args.stream_url1, stream_url2=args.stream_url2)
except Exception as e:
print(f"Error: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment