Skip to content

Instantly share code, notes, and snippets.

@AndBondStyle
Last active April 5, 2024 07:57
Show Gist options
  • Save AndBondStyle/c171952328fec807a0615bb76a69bc16 to your computer and use it in GitHub Desktop.
Save AndBondStyle/c171952328fec807a0615bb76a69bc16 to your computer and use it in GitHub Desktop.
import re
from dataclasses import dataclass, field
from mcap.reader import make_reader
from mcap_ros2.decoder import DecoderFactory
from heapq import heappush, heappop
from pathlib import Path
PATH = "stereo_calib.mcap"
OUTPUT = "images"
TOPIC = "/camera_(\d+)/bgr/image"
SYNC_EPS = 0.1
CAMERAS = 2
@dataclass(order=True)
class Frame:
time: float
cam: int = field(compare=False)
image: bytes = field(compare=False)
def __repr__(self):
return f"Frame(cam={self.cam}, time={self.time:.5f})"
def dump_images(output_dir, index, frames):
for frame in frames:
path = output_dir / f"cam_{frame.cam}" / f"{index:04d}.png"
with open(path, "wb") as file:
file.write(frame.image)
def process(reader, output_dir):
frames = [[] for _ in range(CAMERAS)]
ignored_topics = set()
min_time = None
sync_index = 0
for _, channel, _, ros_msg in reader.iter_decoded_messages():
match = re.match(TOPIC, channel.topic)
if match is None:
if channel.topic not in ignored_topics:
print(f"Ignored topic: {channel.topic}")
ignored_topics.add(channel.topic)
continue
abs_time = ros_msg.header.stamp.sec + ros_msg.header.stamp.nanosec / 1e9
if min_time is None:
min_time = abs_time - 1 # Some error margin for first messages
frame = Frame(
cam=int(match.group(1)) - 1,
time=abs_time - min_time,
image=ros_msg.data,
)
cam_frames = frames[frame.cam]
if cam_frames and cam_frames[0] > frame:
print(f"! wrong order: {cam_frames[0]} > {frame}")
print(f"+ read: {frame}")
heappush(cam_frames, frame)
if all(len(x) >= 2 for x in frames):
while all(frames):
selected_frames = [x[0] for x in frames]
latest_frame = max(selected_frames)
bad_cams = [
frame.cam
for frame in selected_frames
if abs(frame.time - latest_frame.time) > SYNC_EPS
]
for index in bad_cams:
frame = heappop(frames[index])
print(f"! out of sync: {frame}")
if not bad_cams:
selected_frames = [heappop(x) for x in frames]
diff = max(selected_frames).time - min(selected_frames).time
print(f"--> sync: {selected_frames}, diff: {diff:.3f}s")
dump_images(output_dir, sync_index, selected_frames)
sync_index += 1
def main():
output_dir = Path(OUTPUT)
output_dir.mkdir(parents=True, exist_ok=False)
for index in range(CAMERAS):
(output_dir / f"cam_{index}").mkdir()
with open(PATH, "rb") as input_file:
reader = make_reader(input_file, decoder_factories=[DecoderFactory()])
process(reader, output_dir)
if __name__ == "__main__":
main()
ffmpeg -framerate 2 -pattern_type glob -i "images/cam_0/*.png" -framerate 2 -pattern_type glob -i "images/cam_1/*.png" -filter_complex vstack=inputs=2 -c:v libx264 -movflags faststart -y out.mp4
from pathlib import Path
from scipy.spatial.transform import Rotation
import numpy as np
import pickle
import yaml
import cv2
PATH = Path("images")
CAMERAS = 2
BOARD_DICT = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_50)
BOARD_SIZE = (7, 10)
BOARD_CELL_LENGTH = 0.04
BOARD_MARKER_LENGTH = 0.02
BOARD_LEGACY = True
BOARD_DET_PARAMS = cv2.aruco.DetectorParameters()
def debug_text(img, msg):
cv2.putText(
img,
msg,
(20, 80),
cv2.FONT_HERSHEY_SIMPLEX,
fontScale=2,
color=(0, 0, 255),
thickness=3,
)
def process(img, index, board, calib):
matrix, dist = calib[index]
size = tuple(img.shape[:2][::-1])
new_matrix, _ = cv2.getOptimalNewCameraMatrix(matrix, dist, size, 1, size)
img = cv2.undistort(img, matrix, dist, None, new_matrix)
corners, ids, _ = cv2.aruco.detectMarkers(
img, BOARD_DICT, parameters=BOARD_DET_PARAMS
)
if ids is None or not len(ids):
debug_text(img, "No aruco detected")
return img, None
cv2.aruco.drawDetectedMarkers(img, corners, ids)
ok, board_corners, board_ids = cv2.aruco.interpolateCornersCharuco(
corners, ids, img, board
)
if not ok:
debug_text(img, "No corners detected")
return img, None
cv2.aruco.drawDetectedCornersCharuco(img, board_corners, board_ids, (255, 0, 255))
world_points, img_points = cv2.aruco.getBoardObjectAndImagePoints(
board, board_corners, board_ids
)
return img, (world_points, img_points, new_matrix, list(board_ids.flatten()))
def calc_rms(world_points, img_points, rvec, tvec, matrix):
actual_img_points, _ = cv2.projectPoints(world_points, rvec, tvec, matrix, None)
rms = np.sqrt(((img_points - actual_img_points) ** 2).sum() / len(img_points))
return float(rms)
def fmt_arr(arr):
return " ".join(f"{float(x):+.3f}" for x in np.array(arr).flatten())
def quickshow(img):
cv2.namedWindow("frame", cv2.WINDOW_NORMAL)
cv2.imshow("frame", img)
cv2.waitKey(0)
cv2.destroyAllWindows()
def main():
board = cv2.aruco.CharucoBoard(
BOARD_SIZE, BOARD_CELL_LENGTH, BOARD_MARKER_LENGTH, BOARD_DICT
)
board.setLegacyPattern(BOARD_LEGACY)
calib = [None] * CAMERAS
with open("calibration.yaml") as file:
data = yaml.safe_load(file)
for i in range(CAMERAS):
params = data[f"cam_{i}"]
matrix = np.array(params["camera_matrix"]).reshape((3, 3))
dist = np.array(params["distortion_coefs"])
calib[i] = (matrix, dist)
# img = board.generateImage([x * 100 for x in BOARD_SIZE])
# cv2.imshow("board", img)
# cv2.imwrite("board.png", img)
width, height = data["cam_0"]["image_size"]
height *= CAMERAS
# writer = cv2.VideoWriter(
# "output.mp4",
# cv2.VideoWriter_fourcc(*"mp4v"),
# fps=5,
# frameSize=(width, height),
# )
globs = [sorted((PATH / f"cam_{i}").glob("*.png")) for i in range(CAMERAS)]
all_results = []
for image_paths in zip(*globs):
images = [cv2.imread(str(x)) for x in image_paths]
sync_imgs, sync_results = [], []
for i, img in enumerate(images):
img, result = process(img, i, board, calib)
sync_imgs.append(img)
sync_results.append(result)
# winname = f"cam {i}"
# cv2.namedWindow(winname, cv2.WINDOW_NORMAL)
# cv2.imshow(winname, img)
# combined_img = np.vstack(sync_imgs)
# writer.write(combined_img)
if all(x is not None and len(x[0]) >= 6 for x in sync_results):
common_ids = None
for i in range(CAMERAS):
*_, board_ids = sync_results[i]
curr_ids = set(board_ids)
common_ids = curr_ids if common_ids is None else (curr_ids & common_ids)
if len(common_ids) >= 4:
cam_results = []
for i in range(CAMERAS):
world_points, img_points, matrix, board_ids = sync_results[i]
indices = [i for i, x in enumerate(board_ids) if x in common_ids]
world_points = np.take(world_points, indices, axis=0)
img_points = np.take(img_points, indices, axis=0)
cam_results.append((img_points, matrix))
all_results.append((world_points, cam_results))
# if cv2.waitKey(0) == ord("q"):
# cv2.destroyAllWindows()
# break
print(".", end="", flush=True)
print()
#######################
# QUICKSAVE / QUICKLOAD
#######################
# with open("results.pkl", "wb") as file:
# pickle.dump(all_results, file)
# with open("results.pkl", "rb") as file:
# all_results = pickle.load(file)
print("Total results:", len(all_results))
# writer.release()
# def concat(arrs):
# result = []
# for item in arrs:
# result.append(item)
# return item
all_world_points = list([x[0] for x in all_results])
img_points0 = list([x[1][0][0] for x in all_results])
img_points1 = list([x[1][1][0] for x in all_results])
# print(all_world_points.shape, img_points0.shape, img_points1.shape)
matrix0 = all_results[0][1][0][1]
matrix1 = all_results[0][1][1][1]
retval, _, _, _, _, rvec, tvec, _, _ = cv2.stereoCalibrate(
all_world_points,
img_points0,
img_points1,
matrix0,
None,
matrix1,
None,
(width, height),
flags=cv2.CALIB_FIX_INTRINSIC,
)
euler = Rotation.from_matrix(rvec).as_euler("xyz", degrees=True)
print(f"rms: {retval:.3f}")
print(f"rvec: {fmt_arr(euler)} deg")
print(f"tvec: {fmt_arr(tvec)} m")
# test_img = cv2.imread("images/cam_0/0000.png")
# for world_points, cam_results in all_results:
# new_results = []
# for i, (img_points, matrix) in enumerate(cam_results):
# retval, rvec, tvec, _ = cv2.solvePnPRansac(
# world_points, img_points, matrix, None, flags=cv2.SOLVEPNP_ITERATIVE
# )
# if retval:
# rms = calc_rms(world_points, img_points, rvec, tvec, matrix)
# # print(f"cam {i} rvec: {fmt_arr(rvec)} ?")
# # print(f"cam {i} tvec: {fmt_arr(rvec)} m")
# # print(f"cam {i} rms: {rms:.2f} px")
# new_results.append((rvec, tvec, matrix) if retval else None)
# else:
# new_results.append(None)
# if None in new_results:
# print("failed")
# continue
# parts = []
# for i, (rvec, tvec, _) in enumerate(new_results):
# msg = f"| cam{i} | rvec: {fmt_arr(rvec)}, tvec: {fmt_arr(tvec)}"
# parts.append(msg)
# print(" ".join(parts))
# for i in range(1, CAMERAS):
# rvec0, tvec0, matrix0 = new_results[0]
# rvec, tvec, matrix = new_results[i]
# rvec0, _ = cv2.Rodrigues(rvec0)
# rvec, _ = cv2.Rodrigues(rvec)
# rvec = np.linalg.inv(rvec)
# tvec = -rvec.dot(tvec)
# rvec_res = rvec0.dot(rvec)
# tvec_res = tvec - tvec0
# rvec_res, _ = cv2.Rodrigues(rvec_res)
# # print("cam {i} relative to 0")
# print(f"cam {i} -> 0 rvec: {fmt_arr(rvec_res)}, tvec: {fmt_arr(tvec_res)}")
# img = test_img.copy()
# cv2.drawFrameAxes(img, matrix0, None, rvec_res, tvec_res, 1)
# # quickshow(img)
if __name__ == "__main__":
main()
# python 3.11
cffi==1.16.0
lz4==4.3.3
mcap-ros2-support==0.5.3
mcap==1.1.1
numpy==1.26.4
opencv-contrib-python==4.8.1.78
pycparser==2.22
zstandard==0.22.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment