Last active
April 5, 2024 07:57
-
-
Save AndBondStyle/c171952328fec807a0615bb76a69bc16 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from dataclasses import dataclass, field | |
from mcap.reader import make_reader | |
from mcap_ros2.decoder import DecoderFactory | |
from heapq import heappush, heappop | |
from pathlib import Path | |
PATH = "stereo_calib.mcap" | |
OUTPUT = "images" | |
TOPIC = "/camera_(\d+)/bgr/image" | |
SYNC_EPS = 0.1 | |
CAMERAS = 2 | |
@dataclass(order=True) | |
class Frame: | |
time: float | |
cam: int = field(compare=False) | |
image: bytes = field(compare=False) | |
def __repr__(self): | |
return f"Frame(cam={self.cam}, time={self.time:.5f})" | |
def dump_images(output_dir, index, frames): | |
for frame in frames: | |
path = output_dir / f"cam_{frame.cam}" / f"{index:04d}.png" | |
with open(path, "wb") as file: | |
file.write(frame.image) | |
def process(reader, output_dir): | |
frames = [[] for _ in range(CAMERAS)] | |
ignored_topics = set() | |
min_time = None | |
sync_index = 0 | |
for _, channel, _, ros_msg in reader.iter_decoded_messages(): | |
match = re.match(TOPIC, channel.topic) | |
if match is None: | |
if channel.topic not in ignored_topics: | |
print(f"Ignored topic: {channel.topic}") | |
ignored_topics.add(channel.topic) | |
continue | |
abs_time = ros_msg.header.stamp.sec + ros_msg.header.stamp.nanosec / 1e9 | |
if min_time is None: | |
min_time = abs_time - 1 # Some error margin for first messages | |
frame = Frame( | |
cam=int(match.group(1)) - 1, | |
time=abs_time - min_time, | |
image=ros_msg.data, | |
) | |
cam_frames = frames[frame.cam] | |
if cam_frames and cam_frames[0] > frame: | |
print(f"! wrong order: {cam_frames[0]} > {frame}") | |
print(f"+ read: {frame}") | |
heappush(cam_frames, frame) | |
if all(len(x) >= 2 for x in frames): | |
while all(frames): | |
selected_frames = [x[0] for x in frames] | |
latest_frame = max(selected_frames) | |
bad_cams = [ | |
frame.cam | |
for frame in selected_frames | |
if abs(frame.time - latest_frame.time) > SYNC_EPS | |
] | |
for index in bad_cams: | |
frame = heappop(frames[index]) | |
print(f"! out of sync: {frame}") | |
if not bad_cams: | |
selected_frames = [heappop(x) for x in frames] | |
diff = max(selected_frames).time - min(selected_frames).time | |
print(f"--> sync: {selected_frames}, diff: {diff:.3f}s") | |
dump_images(output_dir, sync_index, selected_frames) | |
sync_index += 1 | |
def main(): | |
output_dir = Path(OUTPUT) | |
output_dir.mkdir(parents=True, exist_ok=False) | |
for index in range(CAMERAS): | |
(output_dir / f"cam_{index}").mkdir() | |
with open(PATH, "rb") as input_file: | |
reader = make_reader(input_file, decoder_factories=[DecoderFactory()]) | |
process(reader, output_dir) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ffmpeg -framerate 2 -pattern_type glob -i "images/cam_0/*.png" -framerate 2 -pattern_type glob -i "images/cam_1/*.png" -filter_complex vstack=inputs=2 -c:v libx264 -movflags faststart -y out.mp4 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import Path | |
from scipy.spatial.transform import Rotation | |
import numpy as np | |
import pickle | |
import yaml | |
import cv2 | |
PATH = Path("images") | |
CAMERAS = 2 | |
BOARD_DICT = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_50) | |
BOARD_SIZE = (7, 10) | |
BOARD_CELL_LENGTH = 0.04 | |
BOARD_MARKER_LENGTH = 0.02 | |
BOARD_LEGACY = True | |
BOARD_DET_PARAMS = cv2.aruco.DetectorParameters() | |
def debug_text(img, msg): | |
cv2.putText( | |
img, | |
msg, | |
(20, 80), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
fontScale=2, | |
color=(0, 0, 255), | |
thickness=3, | |
) | |
def process(img, index, board, calib): | |
matrix, dist = calib[index] | |
size = tuple(img.shape[:2][::-1]) | |
new_matrix, _ = cv2.getOptimalNewCameraMatrix(matrix, dist, size, 1, size) | |
img = cv2.undistort(img, matrix, dist, None, new_matrix) | |
corners, ids, _ = cv2.aruco.detectMarkers( | |
img, BOARD_DICT, parameters=BOARD_DET_PARAMS | |
) | |
if ids is None or not len(ids): | |
debug_text(img, "No aruco detected") | |
return img, None | |
cv2.aruco.drawDetectedMarkers(img, corners, ids) | |
ok, board_corners, board_ids = cv2.aruco.interpolateCornersCharuco( | |
corners, ids, img, board | |
) | |
if not ok: | |
debug_text(img, "No corners detected") | |
return img, None | |
cv2.aruco.drawDetectedCornersCharuco(img, board_corners, board_ids, (255, 0, 255)) | |
world_points, img_points = cv2.aruco.getBoardObjectAndImagePoints( | |
board, board_corners, board_ids | |
) | |
return img, (world_points, img_points, new_matrix, list(board_ids.flatten())) | |
def calc_rms(world_points, img_points, rvec, tvec, matrix): | |
actual_img_points, _ = cv2.projectPoints(world_points, rvec, tvec, matrix, None) | |
rms = np.sqrt(((img_points - actual_img_points) ** 2).sum() / len(img_points)) | |
return float(rms) | |
def fmt_arr(arr): | |
return " ".join(f"{float(x):+.3f}" for x in np.array(arr).flatten()) | |
def quickshow(img): | |
cv2.namedWindow("frame", cv2.WINDOW_NORMAL) | |
cv2.imshow("frame", img) | |
cv2.waitKey(0) | |
cv2.destroyAllWindows() | |
def main(): | |
board = cv2.aruco.CharucoBoard( | |
BOARD_SIZE, BOARD_CELL_LENGTH, BOARD_MARKER_LENGTH, BOARD_DICT | |
) | |
board.setLegacyPattern(BOARD_LEGACY) | |
calib = [None] * CAMERAS | |
with open("calibration.yaml") as file: | |
data = yaml.safe_load(file) | |
for i in range(CAMERAS): | |
params = data[f"cam_{i}"] | |
matrix = np.array(params["camera_matrix"]).reshape((3, 3)) | |
dist = np.array(params["distortion_coefs"]) | |
calib[i] = (matrix, dist) | |
# img = board.generateImage([x * 100 for x in BOARD_SIZE]) | |
# cv2.imshow("board", img) | |
# cv2.imwrite("board.png", img) | |
width, height = data["cam_0"]["image_size"] | |
height *= CAMERAS | |
# writer = cv2.VideoWriter( | |
# "output.mp4", | |
# cv2.VideoWriter_fourcc(*"mp4v"), | |
# fps=5, | |
# frameSize=(width, height), | |
# ) | |
globs = [sorted((PATH / f"cam_{i}").glob("*.png")) for i in range(CAMERAS)] | |
all_results = [] | |
for image_paths in zip(*globs): | |
images = [cv2.imread(str(x)) for x in image_paths] | |
sync_imgs, sync_results = [], [] | |
for i, img in enumerate(images): | |
img, result = process(img, i, board, calib) | |
sync_imgs.append(img) | |
sync_results.append(result) | |
# winname = f"cam {i}" | |
# cv2.namedWindow(winname, cv2.WINDOW_NORMAL) | |
# cv2.imshow(winname, img) | |
# combined_img = np.vstack(sync_imgs) | |
# writer.write(combined_img) | |
if all(x is not None and len(x[0]) >= 6 for x in sync_results): | |
common_ids = None | |
for i in range(CAMERAS): | |
*_, board_ids = sync_results[i] | |
curr_ids = set(board_ids) | |
common_ids = curr_ids if common_ids is None else (curr_ids & common_ids) | |
if len(common_ids) >= 4: | |
cam_results = [] | |
for i in range(CAMERAS): | |
world_points, img_points, matrix, board_ids = sync_results[i] | |
indices = [i for i, x in enumerate(board_ids) if x in common_ids] | |
world_points = np.take(world_points, indices, axis=0) | |
img_points = np.take(img_points, indices, axis=0) | |
cam_results.append((img_points, matrix)) | |
all_results.append((world_points, cam_results)) | |
# if cv2.waitKey(0) == ord("q"): | |
# cv2.destroyAllWindows() | |
# break | |
print(".", end="", flush=True) | |
print() | |
####################### | |
# QUICKSAVE / QUICKLOAD | |
####################### | |
# with open("results.pkl", "wb") as file: | |
# pickle.dump(all_results, file) | |
# with open("results.pkl", "rb") as file: | |
# all_results = pickle.load(file) | |
print("Total results:", len(all_results)) | |
# writer.release() | |
# def concat(arrs): | |
# result = [] | |
# for item in arrs: | |
# result.append(item) | |
# return item | |
all_world_points = list([x[0] for x in all_results]) | |
img_points0 = list([x[1][0][0] for x in all_results]) | |
img_points1 = list([x[1][1][0] for x in all_results]) | |
# print(all_world_points.shape, img_points0.shape, img_points1.shape) | |
matrix0 = all_results[0][1][0][1] | |
matrix1 = all_results[0][1][1][1] | |
retval, _, _, _, _, rvec, tvec, _, _ = cv2.stereoCalibrate( | |
all_world_points, | |
img_points0, | |
img_points1, | |
matrix0, | |
None, | |
matrix1, | |
None, | |
(width, height), | |
flags=cv2.CALIB_FIX_INTRINSIC, | |
) | |
euler = Rotation.from_matrix(rvec).as_euler("xyz", degrees=True) | |
print(f"rms: {retval:.3f}") | |
print(f"rvec: {fmt_arr(euler)} deg") | |
print(f"tvec: {fmt_arr(tvec)} m") | |
# test_img = cv2.imread("images/cam_0/0000.png") | |
# for world_points, cam_results in all_results: | |
# new_results = [] | |
# for i, (img_points, matrix) in enumerate(cam_results): | |
# retval, rvec, tvec, _ = cv2.solvePnPRansac( | |
# world_points, img_points, matrix, None, flags=cv2.SOLVEPNP_ITERATIVE | |
# ) | |
# if retval: | |
# rms = calc_rms(world_points, img_points, rvec, tvec, matrix) | |
# # print(f"cam {i} rvec: {fmt_arr(rvec)} ?") | |
# # print(f"cam {i} tvec: {fmt_arr(rvec)} m") | |
# # print(f"cam {i} rms: {rms:.2f} px") | |
# new_results.append((rvec, tvec, matrix) if retval else None) | |
# else: | |
# new_results.append(None) | |
# if None in new_results: | |
# print("failed") | |
# continue | |
# parts = [] | |
# for i, (rvec, tvec, _) in enumerate(new_results): | |
# msg = f"| cam{i} | rvec: {fmt_arr(rvec)}, tvec: {fmt_arr(tvec)}" | |
# parts.append(msg) | |
# print(" ".join(parts)) | |
# for i in range(1, CAMERAS): | |
# rvec0, tvec0, matrix0 = new_results[0] | |
# rvec, tvec, matrix = new_results[i] | |
# rvec0, _ = cv2.Rodrigues(rvec0) | |
# rvec, _ = cv2.Rodrigues(rvec) | |
# rvec = np.linalg.inv(rvec) | |
# tvec = -rvec.dot(tvec) | |
# rvec_res = rvec0.dot(rvec) | |
# tvec_res = tvec - tvec0 | |
# rvec_res, _ = cv2.Rodrigues(rvec_res) | |
# # print("cam {i} relative to 0") | |
# print(f"cam {i} -> 0 rvec: {fmt_arr(rvec_res)}, tvec: {fmt_arr(tvec_res)}") | |
# img = test_img.copy() | |
# cv2.drawFrameAxes(img, matrix0, None, rvec_res, tvec_res, 1) | |
# # quickshow(img) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python 3.11 | |
cffi==1.16.0 | |
lz4==4.3.3 | |
mcap-ros2-support==0.5.3 | |
mcap==1.1.1 | |
numpy==1.26.4 | |
opencv-contrib-python==4.8.1.78 | |
pycparser==2.22 | |
zstandard==0.22.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment