Created
August 4, 2025 10:45
-
-
Save PeterMitrano/44f7530e7a5f0e22082f2310a1162681 to your computer and use it in GitHub Desktop.
How to cause the entire ros2 launch process to die when a specific node exits with an error code.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # This is useful for testing buggy ros launch setups, | |
| # because you can run it like `while ros2 launch pkg debug.launch.py; do sleep 0.5; done;` | |
| # and it will keep restarting util the test node returns an error. | |
| # this was really difficult to figure out how to do.... :( | |
| import os | |
| import sys | |
| from launch import LaunchDescription | |
| from launch.actions import DeclareLaunchArgument, RegisterEventHandler, EmitEvent | |
| from launch.event_handlers import OnProcessExit | |
| from launch.events import Shutdown | |
| from launch.events.process import ProcessExited | |
| from launch.substitutions import LaunchConfiguration, Command | |
| from launch_ros.actions import Node | |
| from launch_ros.parameter_descriptions import ParameterValue | |
| from ament_index_python.packages import get_package_share_directory | |
| def generate_launch_description(): | |
| # Declare launch arguments | |
| fake_hardware_arg = DeclareLaunchArgument("fake_hardware", default_value="false") | |
| start_hands_arg = DeclareLaunchArgument("start_hands", default_value="false") | |
| ip_left_arm_arg = DeclareLaunchArgument( | |
| "ip_left_arm", default_value="192.168.100.85" | |
| ) | |
| ip_right_arm_arg = DeclareLaunchArgument( | |
| "ip_right_arm", default_value="192.168.100.63" | |
| ) | |
| p2_config_file_arg = DeclareLaunchArgument( | |
| "p2_config_file", | |
| default_value=os.path.join( | |
| get_package_share_directory("p2_4_hardware"), "config/p2_4_hardware.yaml" | |
| ), | |
| ) | |
| # Robot description using xacro | |
| robot_description = Command( | |
| [ | |
| "xacro ", | |
| os.path.join( | |
| get_package_share_directory("p2_4_description"), "robot/p2_4.urdf.xacro" | |
| ), | |
| " p2_config_file:=", | |
| LaunchConfiguration("p2_config_file"), | |
| " fake_hardware:=", | |
| LaunchConfiguration("fake_hardware"), | |
| " ip_left_arm:=", | |
| LaunchConfiguration("ip_left_arm"), | |
| " ip_right_arm:=", | |
| LaunchConfiguration("ip_right_arm"), | |
| " start_hands:=", | |
| LaunchConfiguration("start_hands"), | |
| ] | |
| ) | |
| # Nodes | |
| controller_manager_node = Node( | |
| package="controller_manager", | |
| executable="ros2_control_node", | |
| arguments=["--ros-args", "--log-level", "controller_manager:=INFO"], | |
| parameters=[LaunchConfiguration("p2_config_file")], | |
| remappings=[("~/robot_description", "/robot_description")], | |
| ) | |
| spawner_node = Node( | |
| package="controller_manager", | |
| executable="spawner", | |
| name="broadcaster_spawner", | |
| arguments=[ | |
| "--controller-manager-timeout", | |
| "50", | |
| "joint_state_broadcaster", | |
| "joint_trajectory_controller_head", | |
| "joint_trajectory_controller_torso", | |
| "joint_trajectory_controller_left", | |
| "joint_trajectory_controller_right", | |
| ], | |
| ) | |
| robot_state_publisher_node = Node( | |
| package="robot_state_publisher", | |
| executable="robot_state_publisher", | |
| parameters=[ | |
| {"robot_description": ParameterValue(robot_description, value_type=str)} | |
| ], | |
| remappings=[("joint_states", "/joint_states")], | |
| ) | |
| test_jtc_topics_node = Node( | |
| package="p2_4_hardware", | |
| executable="test_jtc_topics.py", | |
| name="test_jtc_topics", | |
| output="screen", | |
| ) | |
| def on_node_exit(event: ProcessExited, *args): | |
| return_code = event.returncode | |
| if return_code == 1: | |
| # This allows us to tell easily from the result of the full `ros2 launch` command | |
| # whether the test failed or not. | |
| sys.exit(2) | |
| return EmitEvent(event=Shutdown()) | |
| # Event handler: when test_jtc_topics_node exits, shutdown all | |
| shutdown_on_test_exit = RegisterEventHandler( | |
| OnProcessExit(target_action=test_jtc_topics_node, on_exit=on_node_exit), | |
| ) | |
| # import pdb; pdb.set_trace() | |
| return LaunchDescription( | |
| [ | |
| fake_hardware_arg, | |
| ip_left_arm_arg, | |
| ip_right_arm_arg, | |
| p2_config_file_arg, | |
| start_hands_arg, | |
| controller_manager_node, | |
| robot_state_publisher_node, | |
| spawner_node, | |
| test_jtc_topics_node, | |
| shutdown_on_test_exit, | |
| ] | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment