Skip to content

Instantly share code, notes, and snippets.

@lizTheDeveloper
Created February 19, 2025 18:19
Show Gist options
  • Save lizTheDeveloper/fb4d92d2ce5fa91b21e742a51c07f4fa to your computer and use it in GitHub Desktop.
Save lizTheDeveloper/fb4d92d2ce5fa91b21e742a51c07f4fa to your computer and use it in GitHub Desktop.
import socket
import json
import time
import cv2
import numpy as np
from PIL import Image
# Define connection to ABB RobotStudio
HOST = "127.0.0.1"
PORT = 5000 # Must match RAPID socket port
# Create a list to store the dataset
dataset = []
# Open a connection to ABB
def receive_robot_data():
""" Receive real-time ABB robot joint position data & save it. """
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT)) # Connect to RAPID
while True:
# Receive and parse joint positions
data = s.recv(1024).decode().strip()
if not data:
break
# Parse joint angles from string
joint_positions = [float(x) for x in data.split(",")]
# Capture an image from the robot camera
frame = capture_image()
timestamp = time.time()
# Create an RLDS-compatible data entry
data_entry = {
"timestamp": timestamp,
"image": encode_image(frame), # Convert image to base64 for storage
"joint_positions": joint_positions,
"action": None, # Placeholder (e.g., "move_left")
"success": None, # Optional success label
"instruction": "Pick up the object" # Example instruction
}
dataset.append(data_entry)
# Save every 100 samples
if len(dataset) % 100 == 0:
save_dataset(dataset)
except Exception as e:
print(f"Socket error: {e}")
def capture_image():
""" Capture an image from the robot's camera (Modify this for your setup) """
# Assuming a virtual webcam or RobotStudio camera source
cap = cv2.VideoCapture(0) # Change to correct camera index
ret, frame = cap.read()
cap.release()
if ret:
return frame
return None
def encode_image(image):
""" Convert image to base64 for RLDS storage. """
_, buffer = cv2.imencode('.jpg', image)
return buffer.tobytes()
def save_dataset(dataset):
""" Save collected data in RLDS JSON format. """
with open("robot_data.json", "w") as f:
json.dump(dataset, f, indent=4)
print("Dataset saved.")
# Start data collection
receive_robot_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment