Created
June 16, 2015 14:58
-
-
Save awesomebytes/6c3acc945fc839da5ca1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Created on 6/16/15 | |
| @author: sampfeiffer | |
| test_get_original_topic.py contains... | |
| """ | |
| __author__ = 'sampfeiffer' | |
| import rosgraph | |
| import socket | |
| def get_original_publisher_topic(topic): | |
| master = rosgraph.Master('/rostopic') | |
| try: | |
| state = master.getSystemState() | |
| pubs, subs, _ = state | |
| pubs_that_start_with_topic = [] | |
| for pub_name, other_info in pubs: | |
| # Check if publisher starts with the same name than the provided topic | |
| if topic.startswith(pub_name): | |
| pubs_that_start_with_topic.append(pub_name) | |
| if len(pubs_that_start_with_topic) == 0: | |
| return None | |
| else: | |
| # We should get the publisher with the longer name that contains this | |
| # The way to understand it is because if we have | |
| # a publisher of PoseStamped in the topic /posestamped | |
| # and a publisher of Pose in the topic /posestamped/pose | |
| # The original topic we want is /posestamped/pose as I would consider the standard | |
| # use of topics to subscribe to the real topic, not a subfield | |
| return max(pubs_that_start_with_topic) | |
| except socket.error: | |
| print("Unable to communicate with master!") | |
| if __name__ == '__main__': | |
| import rospy | |
| from geometry_msgs.msg import PoseStamped, Pose | |
| rospy.init_node('test_get_original_publisher_topic', anonymous=True) | |
| print "Setting up publisher '/posestamped' with type: PoseStamped" | |
| pub_posestamped = rospy.Publisher('/posestamped', PoseStamped) | |
| print "Setting up publisher '/posestamped/pose' of type Pose" | |
| pub_pose = rospy.Publisher('/posestamped/pose', Pose) | |
| print "Try to get the original publisher of stuff published under '/posestamped/pose':\n" | |
| print "get_original_publisher_topic('/posestamped/pose/position/x'):" | |
| print get_original_publisher_topic('/posestamped/pose/position/x') | |
| print "get_original_publisher_topic('/posestamped/pose/position'):" | |
| print get_original_publisher_topic('/posestamped/pose/position') | |
| print "get_original_publisher_topic('/posestamped/pose'):" | |
| print get_original_publisher_topic('/posestamped/pose') | |
| print "\nNow get stuff on the publisher '/posestamped':\n" | |
| print "get_original_publisher_topic('/posestamped/header'):" | |
| print get_original_publisher_topic('/posestamped/header') | |
| print "get_original_publisher_topic('/posestamped'):" | |
| print get_original_publisher_topic('/posestamped') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment