Created
January 10, 2024 06:28
-
-
Save WangYixuan12/01a50a8776d2bd2e7c92ee0823fa2e5c to your computer and use it in GitHub Desktop.
multiple femto processes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from typing import List, Optional, Union, Dict, Callable | |
import numbers | |
import time | |
import pathlib | |
from multiprocessing.managers import SharedMemoryManager | |
from pyorbbecsdk import Context, DeviceList, \ | |
OBPropertyID, OBSensorType, OBFormat, OBAlignMode, \ | |
Pipeline, Config, StreamProfileList, VideoFrame | |
import numpy as np | |
import open3d as o3d | |
from diffusion_policy.real_world.single_femto import SingleFemto | |
from diffusion_policy.real_world.aloha_bimanual_puppet import AlohaBimanualPuppet | |
from diffusion_policy.real_world.video_recorder import VideoRecorder | |
from d3fields.utils.draw_utils import aggr_point_cloud_from_data | |
class MultiFemto: | |
def __init__(self, | |
serial_numbers: Optional[List[str]]=None, | |
shm_manager: Optional[SharedMemoryManager]=None, | |
resolution=(1280,960), | |
capture_fps=30, | |
put_fps=None, | |
put_downsample=True, | |
record_fps=None, | |
enable_color=True, | |
enable_depth=False, | |
enable_infrared=False, | |
get_max_k=30, | |
advanced_mode_config: Optional[Union[dict, List[dict]]]=None, | |
transform: Optional[Union[Callable[[Dict], Dict], List[Callable]]]=None, | |
vis_transform: Optional[Union[Callable[[Dict], Dict], List[Callable]]]=None, | |
recording_transform: Optional[Union[Callable[[Dict], Dict], List[Callable]]]=None, | |
video_recorder: Optional[Union[VideoRecorder, List[VideoRecorder]]]=None, | |
verbose=False | |
): | |
if shm_manager is None: | |
shm_manager = SharedMemoryManager() | |
shm_manager.start() | |
if serial_numbers is None: | |
serial_numbers = SingleFemto.get_connected_devices_serial() | |
print(f'Found {len(serial_numbers)} connected devices: {serial_numbers}') | |
n_cameras = len(serial_numbers) | |
advanced_mode_config = repeat_to_list( | |
advanced_mode_config, n_cameras, dict) | |
transform = repeat_to_list( | |
transform, n_cameras, Callable) | |
vis_transform = repeat_to_list( | |
vis_transform, n_cameras, Callable) | |
recording_transform = repeat_to_list( | |
recording_transform, n_cameras, Callable) | |
video_recorder = repeat_to_list( | |
video_recorder, n_cameras, VideoRecorder) | |
cameras = dict() | |
for i, serial in enumerate(serial_numbers): | |
cameras[serial] = SingleFemto( | |
shm_manager=shm_manager, | |
serial_number=serial, | |
resolution=resolution, | |
capture_fps=capture_fps, | |
put_fps=put_fps, | |
put_downsample=put_downsample, | |
record_fps=record_fps, | |
enable_color=enable_color, | |
enable_depth=enable_depth, | |
enable_infrared=enable_infrared, | |
get_max_k=get_max_k, | |
advanced_mode_config=advanced_mode_config[i], | |
transform=transform[i], | |
vis_transform=vis_transform[i], | |
recording_transform=recording_transform[i], | |
video_recorder=video_recorder[i], | |
verbose=verbose | |
) | |
self.cameras = cameras | |
self.shm_manager = shm_manager | |
def __enter__(self): | |
self.start() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.stop() | |
@property | |
def n_cameras(self): | |
return len(self.cameras) | |
@property | |
def is_ready(self): | |
is_ready = True | |
for camera in self.cameras.values(): | |
if not camera.is_ready: | |
is_ready = False | |
return is_ready | |
def start(self, wait=True, put_start_time=None): | |
if put_start_time is None: | |
put_start_time = time.time() | |
for camera in self.cameras.values(): | |
camera.start(wait=False, put_start_time=put_start_time) | |
if wait: | |
self.start_wait() | |
def stop(self, wait=True): | |
for camera in self.cameras.values(): | |
camera.stop(wait=False) | |
if wait: | |
self.stop_wait() | |
def start_wait(self): | |
for camera in self.cameras.values(): | |
camera.start_wait() | |
def stop_wait(self): | |
for camera in self.cameras.values(): | |
camera.join() | |
def get(self, k=None, out=None) -> Dict[int, Dict[str, np.ndarray]]: | |
""" | |
Return order T,H,W,C | |
{ | |
0: { | |
'rgb': (T,H,W,C), | |
'timestamp': (T,) | |
}, | |
1: ... | |
} | |
""" | |
if out is None: | |
out = dict() | |
for i, camera in enumerate(self.cameras.values()): | |
this_out = None | |
if i in out: | |
this_out = out[i] | |
this_out = camera.get(k=k, out=this_out) | |
out[i] = this_out | |
return out | |
def get_vis(self, out=None): | |
results = list() | |
for i, camera in enumerate(self.cameras.values()): | |
this_out = None | |
if out is not None: | |
this_out = dict() | |
for key, v in out.items(): | |
# use the slicing trick to maintain the array | |
# when v is 1D | |
this_out[key] = v[i:i+1].reshape(v.shape[1:]) | |
this_out = camera.get(out=this_out) | |
if out is None: | |
results.append(this_out) | |
if out is None: | |
out = dict() | |
for key in results[0].keys(): | |
out[key] = np.stack([x[key] for x in results]) | |
return out | |
def set_color_option(self, option, value): | |
n_camera = len(self.cameras) | |
value = repeat_to_list(value, n_camera, numbers.Number) | |
for i, camera in enumerate(self.cameras.values()): | |
camera.set_color_option(option, value[i]) | |
def set_depth_option(self, option, value): | |
n_camera = len(self.cameras) | |
value = repeat_to_list(value, n_camera, numbers.Number) | |
for i, camera in enumerate(self.cameras.values()): | |
camera.set_depth_option(option, value[i]) | |
def set_depth_preset(self, preset): | |
n_camera = len(self.cameras) | |
preset = repeat_to_list(preset, n_camera, str) | |
for i, camera in enumerate(self.cameras.values()): | |
camera.set_depth_preset(preset[i]) | |
def set_exposure(self, exposure=None, gain=None): | |
""" | |
exposure: (1, 10000) 100us unit. (0.1 ms, 1/10000s) | |
gain: (0, 128) | |
""" | |
if exposure is None and gain is None: | |
# auto exposure | |
self.set_color_option(OBPropertyID.OB_PROP_COLOR_AUTO_EXPOSURE_BOOL, 1.0) | |
else: | |
# manual exposure | |
self.set_color_option(OBPropertyID.OB_PROP_COLOR_AUTO_EXPOSURE_BOOL, 0.0) | |
if exposure is not None: | |
self.set_color_option(OBPropertyID.OB_PROP_COLOR_EXPOSURE_INT, exposure) | |
if gain is not None: | |
self.set_color_option(OBPropertyID.OB_PROP_COLOR_GAIN_INT, gain) | |
def set_depth_exposure(self, exposure=None, gain=None): | |
""" | |
exposure: (1, 10000) 100us unit. (0.1 ms, 1/10000s) | |
gain: (0, 128) | |
""" | |
if exposure is None and gain is None: | |
# auto exposure | |
self.set_depth_option(OBPropertyID.OB_PROP_DEPTH_AUTO_EXPOSURE_BOOL, 1.0) | |
else: | |
# manual exposure | |
self.set_depth_option(OBPropertyID.OB_PROP_DEPTH_AUTO_EXPOSURE_BOOL, 0.0) | |
if exposure is not None: | |
self.set_depth_option(OBPropertyID.OB_PROP_DEPTH_EXPOSURE_INT, exposure) | |
if gain is not None: | |
self.set_depth_option(OBPropertyID.OB_PROP_DEPTH_GAIN_INT, gain) | |
def set_white_balance(self, white_balance=None): | |
if white_balance is None: | |
self.set_color_option(OBPropertyID.OB_PROP_COLOR_AUTO_WHITE_BALANCE_BOOL, 1.0) | |
else: | |
self.set_color_option(OBPropertyID.OB_PROP_COLOR_AUTO_WHITE_BALANCE_BOOL, 0.0) | |
self.set_color_option(OBPropertyID.OB_PROP_COLOR_WHITE_BALANCE_INT, white_balance) | |
def get_intrinsics(self): | |
return np.array([c.get_intrinsics() for c in self.cameras.values()]) | |
def get_depth_scale(self): | |
return np.array([c.get_depth_scale() for c in self.cameras.values()]) | |
def start_recording(self, video_path: Union[str, List[str]], start_time: float): | |
if isinstance(video_path, str): | |
# directory | |
video_dir = pathlib.Path(video_path) | |
assert video_dir.parent.is_dir() | |
video_dir.mkdir(parents=True, exist_ok=True) | |
video_path = list() | |
for i in range(self.n_cameras): | |
video_path.append( | |
str(video_dir.joinpath(f'{i}.mp4').absolute())) | |
assert len(video_path) == self.n_cameras | |
for i, camera in enumerate(self.cameras.values()): | |
camera.start_recording(video_path[i], start_time) | |
def stop_recording(self): | |
for i, camera in enumerate(self.cameras.values()): | |
camera.stop_recording() | |
def restart_put(self, start_time): | |
for camera in self.cameras.values(): | |
camera.restart_put(start_time) | |
def calibrate_extrinsics(self, visualize=True, board_size=(6, 9), squareLength=0.03, markerLength=0.022): | |
for camera in self.cameras.values(): | |
camera.calibrate_extrinsics(visualize=visualize, board_size=board_size, squareLength=squareLength, markerLength=markerLength) | |
def visualize_calibration_result(): | |
with MultiFemto( | |
resolution=(1280, 960), | |
put_downsample=False, | |
enable_color=True, | |
enable_depth=True, | |
enable_infrared=False, | |
verbose=False | |
) as realsense: | |
for _ in range(200): | |
out = realsense.get() | |
# # visualize depth | |
# import matplotlib.cm as cm | |
# import cv2 | |
# depth_1 = out[1]['depth'] / 1000. | |
# cmap = cm.get_cmap('jet') | |
# depth_min = 0.2 | |
# depth_max = 0.8 | |
# depth_1 = (depth_1 - depth_min) / (depth_max - depth_min) | |
# depth_1 = np.clip(depth_1, 0, 1) | |
# depth_1_vis = cmap(depth_1.reshape(-1))[..., :3].reshape(depth_1.shape + (3,)) | |
# depth_1_vis = depth_1_vis[..., ::-1] | |
# cv2.imshow('depth_1', depth_1_vis) | |
# cv2.waitKey(1) | |
colors = np.stack(value['color'] for value in out.values())[..., ::-1] | |
depths = np.stack(value['depth'] for value in out.values()) / 1000. | |
intrinsics = np.stack(value['intrinsics'] for value in out.values()) | |
extrinsics = np.stack(value['extrinsics'] for value in out.values()) | |
pcd = aggr_point_cloud_from_data(colors=colors, depths=depths, Ks=intrinsics, poses=extrinsics, downsample=False) | |
origin = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1) | |
o3d.visualization.draw_geometries([pcd, origin]) | |
def test_aloha_calibration(sides=['right', 'left']): | |
from diffusion_policy.common.kinematics_utils import KinHelper | |
from d3fields.utils.draw_utils import np2o3d | |
# pose manual guess | |
robot_base_in_world = np.array([[[0,-1,0,-0.13], | |
[1,0,0,-0.51], | |
[0,0,1,0.02], | |
[0,0,0,1]], | |
[[0,1,0,-0.13], | |
[-1,0,0,0.27], | |
[0,0,1,0.02], | |
[0,0,0,1]],]) | |
# robot_base_in_world = np.array([[1.0, 0, 0.0, -0.52], | |
# [0, 1.0, 0.0, -0.06], | |
# [0.0, 0.0, 1.0, 0.03], | |
# [0.0, 0.0, 0.0, 1.0]]) | |
# save current robot base pose in world | |
curr_path = os.path.dirname(os.path.abspath(__file__)) | |
save_dir = os.path.join(curr_path, 'aloha_extrinsics') | |
os.system(f'mkdir -p {save_dir}') | |
for rob_i, side in enumerate(sides): | |
np.save(os.path.join(save_dir, f'{side}_base_pose_in_world.npy'), robot_base_in_world[rob_i]) | |
# visualize robot base point cloud | |
kin_helper = KinHelper(robot_name='trossen_vx300s_v3') | |
shm_manager = SharedMemoryManager() | |
shm_manager.start() | |
init_qpos = np.array([0.0, -0.865, 0.6, 0.0, 1.35, 0.0, -0.5, 0.0, -0.94, 0.72, 0.0, 1.31, 0.0, -0.5]) | |
tool_names = ['pusher', 'dustpan'] | |
puppet_robot = AlohaBimanualPuppet(shm_manager=shm_manager, verbose=False, frequency=50, robot_sides=['right', 'left'], init_qpos=init_qpos) | |
puppet_robot.start() | |
time.sleep(10) | |
curr_robot_joint_pos = puppet_robot.get_state()['curr_full_joint_pos'] | |
# base_pcd = kin_helper.compute_robot_pcd(np.zeros(8), link_names=['vx300s/base_link'], num_pts=[10000]) | |
base_pcd_o3d_ls = [] | |
for rob_i, side in enumerate(sides): | |
base_pcd = kin_helper.compute_robot_pcd(curr_robot_joint_pos[rob_i*8:(rob_i+1)*8]) | |
tool_pcd = kin_helper.compute_tool_pcd(curr_robot_joint_pos[rob_i*8:(rob_i+1)*8], tool_name=tool_names[rob_i]) | |
base_pcd = np.concatenate([base_pcd, tool_pcd], axis=0) | |
base_pcd_i = base_pcd.copy() | |
base_pcd_i = robot_base_in_world[rob_i] @ np.concatenate([base_pcd_i, np.ones((base_pcd_i.shape[0], 1))], axis=-1).T | |
base_pcd_i = base_pcd_i[:3].T | |
base_pcd_o3d = np2o3d(base_pcd_i) | |
base_pcd_o3d_ls.append(base_pcd_o3d) | |
# visualize realsense point cloud | |
with MultiFemto( | |
resolution=(640, 360), | |
put_downsample=False, | |
enable_color=True, | |
enable_depth=True, | |
enable_infrared=False, | |
verbose=False | |
) as realsense: | |
realsense.set_exposure(exposure=200, gain=64) | |
realsense.set_white_balance(white_balance=2800) | |
realsense.set_depth_preset('High Density') | |
realsense.set_depth_exposure(7000, 16) | |
for _ in range(30): | |
out = realsense.get() | |
time.sleep(0.1) | |
colors = np.stack(value['color'] for value in out.values())[..., ::-1] | |
depths = np.stack(value['depth'] for value in out.values()) / 1000. | |
intrinsics = np.stack(value['intrinsics'] for value in out.values()) | |
extrinsics = np.stack(value['extrinsics'] for value in out.values()) | |
for i in range(colors.shape[0]): | |
pcd_i = aggr_point_cloud_from_data(colors=colors[i:i+1], | |
depths=depths[i:i+1], | |
Ks=intrinsics[i:i+1], | |
poses=extrinsics[i:i+1], | |
downsample=False) | |
origin = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1) | |
# visualize pcd_i | |
o3d.visualization.draw_geometries([pcd_i, origin] + base_pcd_o3d_ls) | |
# visualize all | |
pcd = aggr_point_cloud_from_data(colors=colors, | |
depths=depths, | |
Ks=intrinsics, | |
poses=extrinsics, | |
downsample=False) | |
origin = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1) | |
o3d.visualization.draw_geometries([pcd, origin] + base_pcd_o3d_ls) | |
def calibrate_all(): | |
with MultiFemto( | |
put_downsample=False, | |
enable_color=True, | |
enable_depth=True, | |
enable_infrared=False, | |
verbose=False | |
) as realsense: | |
while True: | |
# realsense.calibrate_extrinsics(visualize=True, board_size=(6,9), squareLength=0.03, markerLength=0.022) | |
realsense.set_exposure(exposure=200, gain=16) | |
realsense.calibrate_extrinsics(visualize=True, board_size=(6,5), squareLength=0.04, markerLength=0.03) | |
time.sleep(0.1) | |
def repeat_to_list(x, n: int, cls): | |
if x is None: | |
x = [None] * n | |
if isinstance(x, cls): | |
x = [x] * n | |
assert len(x) == n | |
return x | |
if __name__ == '__main__': | |
# calibrate_all() | |
visualize_calibration_result() | |
# test_aloha_calibration() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment