Created
February 11, 2019 09:07
-
-
Save tim-fan/ab6c12afa89bb24da5f150ade3f65537 to your computer and use it in GitHub Desktop.
Ros node for running Rovio from output of realsense driver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#ROS node to adapt output of realsense camera driver (testing with D435i) | |
#to be used for input to Rovio | |
# | |
# Specifically, the node resolves two issues: | |
# | |
# 1) The realsense driver publishes images with encoding set as "8UC1" | |
# This leads to the following error in Rovio: | |
# "CvBridgeError: [8UC1] is not a color format. but [mono8] is" | |
# This node works around the issue by republishing incoming images | |
# with encoding set to "mono8" | |
# | |
# 2) Rovio expects imu timestamps to be strictly increasing, but at times | |
# the Realsense driver publishes consecutive imu messages with the | |
# same timestamp (expected to be related to the way the driver merges | |
# separate accelerometer and gyro streams). | |
# This node works around the issue by discarding IMU messages with the | |
# same timestamp as the previous message | |
# | |
# See: | |
# http://wiki.ros.org/RealSense | |
# https://github.com/ethz-asl/maplab | |
import rospy | |
from sensor_msgs.msg import Image, Imu | |
class ImuFilter: | |
def __init__(self): | |
self.lastImuTime = rospy.Time(0) | |
def filterImu(self, imuMsg): | |
if imuMsg.header.stamp == self.lastImuTime: | |
rospy.logwarn("Dropping IMU message with duplicate timestamp") | |
return None | |
else: | |
self.lastImuTime = imuMsg.header.stamp | |
return imuMsg | |
def updateImgEncoding(imgMsg): | |
imgMsg.encoding = "mono8" | |
return imgMsg | |
def adaptor(): | |
rospy.init_node('realsense_rovio_adaptor') | |
imgPub = rospy.Publisher('image_mono8', Image, queue_size=10) | |
def imgCallback(msg): | |
imgPub.publish(updateImgEncoding(msg)) | |
rospy.Subscriber("image", Image, imgCallback) | |
imuFilter = ImuFilter() | |
imuPub = rospy.Publisher('imu_filtered', Imu, queue_size=10) | |
def imuCallback(msg): | |
filteredMsg = imuFilter.filterImu(msg) | |
if filteredMsg is not None: | |
imuPub.publish(filteredMsg) | |
rospy.Subscriber("imu", Imu, imuCallback) | |
rospy.spin() | |
if __name__ == '__main__': | |
adaptor() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment