Created
July 12, 2022 12:54
-
-
Save surajRathi/94f1fa6eb328acfa600d6fde1230217c to your computer and use it in GitHub Desktop.
Subscribes to the ROS 'costmap', i.e. `nav_msgs/OccupancyGrid` and the linked `map_msgs/OccupancyGridUpdate`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python3 | |
from typing import Optional | |
import numpy as np | |
import rospy | |
from map_msgs.msg import OccupancyGridUpdate | |
from nav_msgs.msg import OccupancyGrid | |
from rospy.numpy_msg import numpy_msg | |
class CommonData: | |
img: Optional[np.ndarray] = None | |
def update(self): | |
if self.img is None: | |
return | |
pass | |
def callback(msg: numpy_msg(OccupancyGrid), c: CommonData): | |
rospy.loginfo("Received a cost map") | |
c.img = msg.data.reshape(msg.info.height, msg.info.width).astype(np.uint8) | |
c.update() | |
def callback_updates(msg: numpy_msg(OccupancyGridUpdate), c: CommonData): | |
rospy.logdebug("Received a cost map update") | |
if c.img is None: | |
rospy.logwarn("Received update before receiving the cost map") | |
return | |
# Note: Axes are flipped | |
c.img[msg.y:msg.y + msg.height, msg.x:msg.x + msg.width] = msg.data.reshape(msg.height, msg.width).astype(np.uint8) | |
c.update() | |
def main(): | |
rospy.init_node("costmap_subscriber", anonymous=True) | |
topic: str = '/map' | |
data = CommonData() | |
sub = rospy.Subscriber(topic, numpy_msg(OccupancyGrid), queue_size=1, callback=callback, callback_args=data) | |
sub_updates = rospy.Subscriber(topic + "_updates", numpy_msg(OccupancyGridUpdate), queue_size=1, | |
callback=callback_updates, | |
callback_args=data) | |
rospy.spin() | |
if __name__ == '__main__': | |
try: | |
main() | |
except rospy.ROSInterruptException: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment