Skip to content

Instantly share code, notes, and snippets.

@surajRathi
Created July 12, 2022 12:54
Show Gist options
  • Save surajRathi/94f1fa6eb328acfa600d6fde1230217c to your computer and use it in GitHub Desktop.
Save surajRathi/94f1fa6eb328acfa600d6fde1230217c to your computer and use it in GitHub Desktop.
Subscribes to the ROS 'costmap', i.e. `nav_msgs/OccupancyGrid` and the linked `map_msgs/OccupancyGridUpdate`
#! /usr/bin/python3
from typing import Optional
import numpy as np
import rospy
from map_msgs.msg import OccupancyGridUpdate
from nav_msgs.msg import OccupancyGrid
from rospy.numpy_msg import numpy_msg
class CommonData:
img: Optional[np.ndarray] = None
def update(self):
if self.img is None:
return
pass
def callback(msg: numpy_msg(OccupancyGrid), c: CommonData):
rospy.loginfo("Received a cost map")
c.img = msg.data.reshape(msg.info.height, msg.info.width).astype(np.uint8)
c.update()
def callback_updates(msg: numpy_msg(OccupancyGridUpdate), c: CommonData):
rospy.logdebug("Received a cost map update")
if c.img is None:
rospy.logwarn("Received update before receiving the cost map")
return
# Note: Axes are flipped
c.img[msg.y:msg.y + msg.height, msg.x:msg.x + msg.width] = msg.data.reshape(msg.height, msg.width).astype(np.uint8)
c.update()
def main():
rospy.init_node("costmap_subscriber", anonymous=True)
topic: str = '/map'
data = CommonData()
sub = rospy.Subscriber(topic, numpy_msg(OccupancyGrid), queue_size=1, callback=callback, callback_args=data)
sub_updates = rospy.Subscriber(topic + "_updates", numpy_msg(OccupancyGridUpdate), queue_size=1,
callback=callback_updates,
callback_args=data)
rospy.spin()
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment