Skip to content

Instantly share code, notes, and snippets.

@PeterMitrano
Created August 4, 2025 10:45
Show Gist options
  • Save PeterMitrano/44f7530e7a5f0e22082f2310a1162681 to your computer and use it in GitHub Desktop.
Save PeterMitrano/44f7530e7a5f0e22082f2310a1162681 to your computer and use it in GitHub Desktop.
How to cause the entire ros2 launch process to die when a specific node exits with an error code.
# This is useful for testing buggy ros launch setups,
# because you can run it like `while ros2 launch pkg debug.launch.py; do sleep 0.5; done;`
# and it will keep restarting util the test node returns an error.
# this was really difficult to figure out how to do.... :(
import os
import sys
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, RegisterEventHandler, EmitEvent
from launch.event_handlers import OnProcessExit
from launch.events import Shutdown
from launch.events.process import ProcessExited
from launch.substitutions import LaunchConfiguration, Command
from launch_ros.actions import Node
from launch_ros.parameter_descriptions import ParameterValue
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
# Declare launch arguments
fake_hardware_arg = DeclareLaunchArgument("fake_hardware", default_value="false")
start_hands_arg = DeclareLaunchArgument("start_hands", default_value="false")
ip_left_arm_arg = DeclareLaunchArgument(
"ip_left_arm", default_value="192.168.100.85"
)
ip_right_arm_arg = DeclareLaunchArgument(
"ip_right_arm", default_value="192.168.100.63"
)
p2_config_file_arg = DeclareLaunchArgument(
"p2_config_file",
default_value=os.path.join(
get_package_share_directory("p2_4_hardware"), "config/p2_4_hardware.yaml"
),
)
# Robot description using xacro
robot_description = Command(
[
"xacro ",
os.path.join(
get_package_share_directory("p2_4_description"), "robot/p2_4.urdf.xacro"
),
" p2_config_file:=",
LaunchConfiguration("p2_config_file"),
" fake_hardware:=",
LaunchConfiguration("fake_hardware"),
" ip_left_arm:=",
LaunchConfiguration("ip_left_arm"),
" ip_right_arm:=",
LaunchConfiguration("ip_right_arm"),
" start_hands:=",
LaunchConfiguration("start_hands"),
]
)
# Nodes
controller_manager_node = Node(
package="controller_manager",
executable="ros2_control_node",
arguments=["--ros-args", "--log-level", "controller_manager:=INFO"],
parameters=[LaunchConfiguration("p2_config_file")],
remappings=[("~/robot_description", "/robot_description")],
)
spawner_node = Node(
package="controller_manager",
executable="spawner",
name="broadcaster_spawner",
arguments=[
"--controller-manager-timeout",
"50",
"joint_state_broadcaster",
"joint_trajectory_controller_head",
"joint_trajectory_controller_torso",
"joint_trajectory_controller_left",
"joint_trajectory_controller_right",
],
)
robot_state_publisher_node = Node(
package="robot_state_publisher",
executable="robot_state_publisher",
parameters=[
{"robot_description": ParameterValue(robot_description, value_type=str)}
],
remappings=[("joint_states", "/joint_states")],
)
test_jtc_topics_node = Node(
package="p2_4_hardware",
executable="test_jtc_topics.py",
name="test_jtc_topics",
output="screen",
)
def on_node_exit(event: ProcessExited, *args):
return_code = event.returncode
if return_code == 1:
# This allows us to tell easily from the result of the full `ros2 launch` command
# whether the test failed or not.
sys.exit(2)
return EmitEvent(event=Shutdown())
# Event handler: when test_jtc_topics_node exits, shutdown all
shutdown_on_test_exit = RegisterEventHandler(
OnProcessExit(target_action=test_jtc_topics_node, on_exit=on_node_exit),
)
# import pdb; pdb.set_trace()
return LaunchDescription(
[
fake_hardware_arg,
ip_left_arm_arg,
ip_right_arm_arg,
p2_config_file_arg,
start_hands_arg,
controller_manager_node,
robot_state_publisher_node,
spawner_node,
test_jtc_topics_node,
shutdown_on_test_exit,
]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment