Created
February 19, 2025 18:19
-
-
Save lizTheDeveloper/fb4d92d2ce5fa91b21e742a51c07f4fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import json | |
import time | |
import cv2 | |
import numpy as np | |
from PIL import Image | |
# Define connection to ABB RobotStudio | |
HOST = "127.0.0.1" | |
PORT = 5000 # Must match RAPID socket port | |
# Create a list to store the dataset | |
dataset = [] | |
# Open a connection to ABB | |
def receive_robot_data(): | |
""" Receive real-time ABB robot joint position data & save it. """ | |
try: | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.connect((HOST, PORT)) # Connect to RAPID | |
while True: | |
# Receive and parse joint positions | |
data = s.recv(1024).decode().strip() | |
if not data: | |
break | |
# Parse joint angles from string | |
joint_positions = [float(x) for x in data.split(",")] | |
# Capture an image from the robot camera | |
frame = capture_image() | |
timestamp = time.time() | |
# Create an RLDS-compatible data entry | |
data_entry = { | |
"timestamp": timestamp, | |
"image": encode_image(frame), # Convert image to base64 for storage | |
"joint_positions": joint_positions, | |
"action": None, # Placeholder (e.g., "move_left") | |
"success": None, # Optional success label | |
"instruction": "Pick up the object" # Example instruction | |
} | |
dataset.append(data_entry) | |
# Save every 100 samples | |
if len(dataset) % 100 == 0: | |
save_dataset(dataset) | |
except Exception as e: | |
print(f"Socket error: {e}") | |
def capture_image(): | |
""" Capture an image from the robot's camera (Modify this for your setup) """ | |
# Assuming a virtual webcam or RobotStudio camera source | |
cap = cv2.VideoCapture(0) # Change to correct camera index | |
ret, frame = cap.read() | |
cap.release() | |
if ret: | |
return frame | |
return None | |
def encode_image(image): | |
""" Convert image to base64 for RLDS storage. """ | |
_, buffer = cv2.imencode('.jpg', image) | |
return buffer.tobytes() | |
def save_dataset(dataset): | |
""" Save collected data in RLDS JSON format. """ | |
with open("robot_data.json", "w") as f: | |
json.dump(dataset, f, indent=4) | |
print("Dataset saved.") | |
# Start data collection | |
receive_robot_data() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment