Skip to content

Instantly share code, notes, and snippets.

@kekeblom
Created March 10, 2021 19:18
Show Gist options
  • Save kekeblom/0b8c880bdfea02f5d866336ef379648f to your computer and use it in GitHub Desktop.
Save kekeblom/0b8c880bdfea02f5d866336ef379648f to your computer and use it in GitHub Desktop.
A script to visualize data collected with Stray Scanner http://keke.dev/blog/2021/03/10/Stray-Scanner.html
import os
import open3d as o3d
import numpy as np
from scipy.spatial.transform import Rotation
from argparse import ArgumentParser
from PIL import Image
import skvideo.io
description = """
This script visualizes datasets collected using the Stray Scanner app.
"""
usage = """
Basic usage: python stray_visualize.py <path-to-dataset-folder>
Dependencies:
numpy
open3d==0.12.0
scipy==1.6.0
scikit-video==1.1.11
pillow==8.1.0
"""
DEPTH_WIDTH = 256
DEPTH_HEIGHT = 192
def read_args():
parser = ArgumentParser(description=description, usage=usage)
parser.add_argument('path', type=str, help="Path to StrayScanner dataset to process.")
parser.add_argument('--trajectory', action='store_true', help="Visualize the trajectory of the camera as a line.")
parser.add_argument('--frames', action='store_true', help="Visualize camera coordinate frames from the odometry file.")
parser.add_argument('--point-clouds', action='store_true', help="Show concatenated point clouds.")
parser.add_argument('--integrate', action='store_true', help="Integrate point clouds using the Open3D RGB-D integration pipeline, and visualize it.")
parser.add_argument('--every', type=int, default=60, help="Show only every nth point cloud and coordinate frames. Only used for point cloud and odometry visualization.")
parser.add_argument('--voxel-size', type=float, default=0.015, help="Voxel size in meters to use in RGB-D integration.")
return parser.parse_args()
def _resize_camera_matrix(camera_matrix, scale_x, scale_y):
fx = camera_matrix[0, 0]
fy = camera_matrix[1, 1]
cx = camera_matrix[0, 2]
cy = camera_matrix[1, 2]
return np.array([[fx * scale_x, 0.0, cx * scale_x],
[0., fy * scale_y, cy * scale_y],
[0., 0., 1.0]])
def read_data(flags):
intrinsics = np.loadtxt(os.path.join(flags.path, 'camera_matrix.csv'), delimiter=',')
odometry = np.loadtxt(os.path.join(flags.path, 'odometry.csv'), delimiter=',', skiprows=1)
poses = []
for line in odometry:
# x, y, z, qx, qy, qz, qw
position = line[:3]
quaternion = line[3:]
T_WC = np.eye(4)
T_WC[:3, :3] = Rotation.from_quat(quaternion).as_matrix()
T_WC[:3, 3] = position
poses.append(T_WC)
return { 'poses': poses, 'intrinsics': intrinsics }
def load_depth(path):
depth_mm = np.load(path)
depth_m = depth_mm.astype(np.float32) / 1000.0
return o3d.geometry.Image(depth_m)
def get_intrinsics(intrinsics):
"""
Scales the intrinsics matrix to be of the appropriate scale for the depth maps.
"""
intrinsics_scaled = _resize_camera_matrix(intrinsics, DEPTH_WIDTH / 1920, DEPTH_HEIGHT / 1280)
return o3d.camera.PinholeCameraIntrinsic(width=DEPTH_WIDTH, height=DEPTH_HEIGHT, fx=intrinsics_scaled[0, 0],
fy=intrinsics_scaled[1, 1], cx=intrinsics_scaled[0, 2], cy=intrinsics_scaled[1, 2])
def trajectory(flags, data):
"""
Returns a set of lines connecting each camera poses world frame position.
returns: [open3d.geometry.LineSet]
"""
line_sets = []
previous_pose = None
for i, T_WC in enumerate(data['poses']):
if previous_pose is not None:
points = o3d.utility.Vector3dVector([previous_pose[:3, 3], T_WC[:3, 3]])
lines = o3d.utility.Vector2iVector([[0, 1]])
line = o3d.geometry.LineSet(points=points, lines=lines)
line_sets.append(line)
previous_pose = T_WC
return line_sets
def show_frames(flags, data):
"""
Returns a list of meshes of coordinate axes that have been transformed to represent the camera matrix
at each --every:th frame.
flags: Command line arguments
data: dict with keys ['poses', 'intrinsics']
returns: [open3d.geometry.TriangleMesh]
"""
frames = [o3d.geometry.TriangleMesh.create_coordinate_frame().scale(0.25, np.zeros(3))]
for i, T_WC in enumerate(data['poses']):
if not i % flags.every == 0:
continue
print(f"Frame {i}", end="\r")
mesh = o3d.geometry.TriangleMesh.create_coordinate_frame().scale(0.1, np.zeros(3))
frames.append(mesh.transform(T_WC))
return frames
def point_clouds(flags, data):
"""
Converts depth maps to point clouds and merges them all into one global point cloud.
flags: command line arguments
data: dict with keys ['intrinsics', 'poses']
returns: [open3d.geometry.PointCloud]
"""
pcs = []
intrinsics = get_intrinsics(data['intrinsics'])
pc = o3d.geometry.PointCloud()
meshes = []
for i, T_WC in enumerate(data['poses']):
if i % flags.every != 0:
continue
print(f"Point cloud {i}", end="\r")
T_CW = np.linalg.inv(T_WC)
depth = load_depth(os.path.join(flags.path, 'depth', f'{i:06}.npy'))
X = o3d.geometry.PointCloud.create_from_depth_image(depth, intrinsics, extrinsic=T_CW, depth_scale=1.0)
pc += X.uniform_down_sample(every_k_points=5)
return [pc]
def integrate(flags, data):
"""
Integrates collected RGB-D maps using the Open3D integration pipeline.
flags: command line arguments
data: dict with keys ['intrinsics', 'poses']
Returns: [open3d.geometry.TriangleMesh]
"""
volume = o3d.pipelines.integration.ScalableTSDFVolume(
voxel_length=flags.voxel_size,
sdf_trunc=0.05,
color_type=o3d.pipelines.integration.TSDFVolumeColorType.RGB8)
intrinsics = get_intrinsics(data['intrinsics'])
rgb_path = os.path.join(flags.path, 'rgb.mp4')
video = skvideo.io.vreader(rgb_path)
for i, (T_WC, rgb) in enumerate(zip(data['poses'], video)):
print(f"Integrating frame {i:06}", end='\r')
depth_path = os.path.join(flags.path, 'depth', f'{i:06}.npy')
depth = load_depth(depth_path)
rgb = Image.fromarray(rgb)
rgb = rgb.resize((DEPTH_WIDTH, DEPTH_HEIGHT))
rgb = np.array(rgb)
rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(
o3d.geometry.Image(rgb), depth,
depth_scale=1.0, depth_trunc=4.0, convert_rgb_to_intensity=False)
volume.integrate(rgbd, intrinsics, np.linalg.inv(T_WC))
mesh = volume.extract_triangle_mesh()
mesh.compute_vertex_normals()
return [mesh]
def validate(flags):
if not os.path.exists(os.path.join(flags.path, 'rgb.mp4')):
absolute_path = os.path.abspath(flags.path)
print(f"The directory {absolute_path} does not appear to be a directory created by the Stray Scanner app.")
return False
return True
def main():
flags = read_args()
if not validate(flags):
return
if not flags.frames and not flags.point_clouds and not flags.integrate:
flags.frames = True
flags.point_clouds = True
flags.trajectory = True
data = read_data(flags)
geometries = []
if flags.trajectory:
geometries += trajectory(flags, data)
if flags.frames:
geometries += show_frames(flags, data)
if flags.point_clouds:
geometries += point_clouds(flags, data)
if flags.integrate:
geometries += integrate(flags, data)
o3d.visualization.draw_geometries(geometries)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment