Skip to content

Instantly share code, notes, and snippets.

@anug7
Last active April 26, 2024 13:30
Show Gist options
  • Save anug7/a0cc86c4517066e47f125426704a259e to your computer and use it in GitHub Desktop.
Save anug7/a0cc86c4517066e47f125426704a259e to your computer and use it in GitHub Desktop.
Implementation of wait_for_message in ROS2
from typing import Any, Tuple
import rclpy
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.node import Node
from rclpy.signals import SignalHandlerGuardCondition
from rclpy.utilities import timeout_sec_to_nsec
def wait_for_message(msg_type: Any, node: Node, topic: str,
time_to_wait: int = 1) -> Tuple[bool, Any]:
"""
Waiting for a particular topic
@param: msg_type: message type
@param: node: node using which subscriber is created
@param: topic: topic to be waited
@param: time_to_wait: timeout in s
@return: bool: true if message received otherwise False
@return: received message if success otherwise None
"""
context = node.context
wait_set = _rclpy.WaitSet(1, 1, 0, 0, 0, 0, context.handle)
wait_set.clear_entities()
sub = node.create_subscription(msg_type, topic, lambda _: None, 1)
wait_set.add_subscription(sub.handle)
sigint_gc = SignalHandlerGuardCondition(context=context)
wait_set.add_guard_condition(sigint_gc.handle)
timeout_nsec = timeout_sec_to_nsec(time_to_wait)
wait_set.wait(timeout_nsec)
subs_ready = wait_set.get_ready_entities('subscription')
guards_ready = wait_set.get_ready_entities('guard_condition')
if guards_ready:
if sigint_gc.handle.pointer in guards_ready:
return (False, None)
if subs_ready:
if sub.handle.pointer in subs_ready:
msg_info = sub.handle.take_message(sub.msg_type, sub.raw)
return (True, msg_info[0])
return (False, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment