Skip to content

Instantly share code, notes, and snippets.

@ibaiGorordo
Created June 18, 2024 08:07
Show Gist options
  • Save ibaiGorordo/1941ee00754166305f2f56522b985b1a to your computer and use it in GitHub Desktop.
Save ibaiGorordo/1941ee00754166305f2f56522b985b1a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import cv2
import numpy as np
import depthai as dai
from datetime import timedelta
def getMesh(calibData, camSocket, ispSize):
M1 = np.array(calibData.getCameraIntrinsics(camSocket, ispSize[0], ispSize[1]))
d1 = np.array(calibData.getDistortionCoefficients(camSocket))
R1 = np.identity(3)
mapX, mapY = cv2.initUndistortRectifyMap(M1, d1, R1, M1, ispSize, cv2.CV_32FC1)
meshCellSize = 16
mesh0 = []
# Creates subsampled mesh which will be loaded on to device to undistort the image
for y in range(mapX.shape[0] + 1): # iterating over height of the image
if y % meshCellSize == 0:
rowLeft = []
for x in range(mapX.shape[1]): # iterating over width of the image
if x % meshCellSize == 0:
if y == mapX.shape[0] and x == mapX.shape[1]:
rowLeft.append(mapX[y - 1, x - 1])
rowLeft.append(mapY[y - 1, x - 1])
elif y == mapX.shape[0]:
rowLeft.append(mapX[y - 1, x])
rowLeft.append(mapY[y - 1, x])
elif x == mapX.shape[1]:
rowLeft.append(mapX[y, x - 1])
rowLeft.append(mapY[y, x - 1])
else:
rowLeft.append(mapX[y, x])
rowLeft.append(mapY[y, x])
if (mapX.shape[1] % meshCellSize) % 2 != 0:
rowLeft.append(0)
rowLeft.append(0)
mesh0.append(rowLeft)
mesh0 = np.array(mesh0)
meshWidth = mesh0.shape[1] // 2
meshHeight = mesh0.shape[0]
mesh0.resize(meshWidth * meshHeight, 2)
mesh = list(map(tuple, mesh0))
return mesh, meshWidth, meshHeight
def colorize_depth(depth, max_depth):
depth = np.clip(depth, 0, max_depth)
depth = (depth / max_depth * 255).astype(np.uint8)
return cv2.applyColorMap(depth, cv2.COLORMAP_JET)
def create_device():
pipeline = dai.Pipeline()
device = dai.Device()
calibData = device.readCalibration()
# RGB camera
rgb_socket = dai.CameraBoardSocket.CAM_A
camRgb = pipeline.create(dai.node.ColorCamera)
camRgb.setBoardSocket(rgb_socket)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_12_MP)
camRgb.setIspScale(4, 13) # 1248x936 ~720p
# Undistortion
manip = pipeline.create(dai.node.ImageManip)
mesh, meshWidth, meshHeight = getMesh(calibData, rgb_socket, camRgb.getIspSize())
manip.setWarpMesh(mesh, meshWidth, meshHeight)
manip.setMaxOutputFrameSize(camRgb.getIspWidth() * camRgb.getIspHeight() * 3 // 2)
camRgb.isp.link(manip.inputImage)
# Left camera
left = pipeline.create(dai.node.MonoCamera)
left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
left.setCamera("left")
# Right camera
right = pipeline.create(dai.node.MonoCamera)
right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
right.setCamera("right")
# Stereo
stereo = pipeline.create(dai.node.StereoDepth)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
stereo.initialConfig.setMedianFilter(dai.MedianFilter.KERNEL_7x7)
stereo.setLeftRightCheck(True)
# stereo.setDepthAlign(rgb_socket)
# Align
align = pipeline.create(dai.node.ImageAlign)
stereo.depth.link(align.input)
camRgb.isp.link(align.inputAlignTo)
# Synchronization
sync = pipeline.create(dai.node.Sync)
sync.setSyncThreshold(timedelta(milliseconds=50))
# Output
xoutGrp = pipeline.create(dai.node.XLinkOut)
xoutGrp.setStreamName("xout")
# Linking
left.out.link(stereo.left)
right.out.link(stereo.right)
# stereo.depth.link(sync.inputs["depth"])
align.outputAligned.link(sync.inputs["depth"])
manip.out.link(sync.inputs["video"])
sync.out.link(xoutGrp.input)
return device, pipeline
if __name__ == "__main__":
device, pipeline = create_device()
with (device):
device.startPipeline(pipeline)
queue = device.getOutputQueue("xout", 10, False)
cv2.namedWindow("Aligned Depth")
max_distance = 5 # meters
while True:
groupMessage = queue.get()
depth = groupMessage["depth"].getFrame()
rgb = groupMessage["video"].getCvFrame()
colorized = colorize_depth(depth, max_distance * 1000)
combined = cv2.addWeighted(rgb, 0.5, colorized, 0.8, 0)
cv2.imshow("Aligned Depth", combined)
if cv2.waitKey(1) == ord('q'):
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment