Created
August 15, 2015 18:20
-
-
Save awesomebytes/b3d69422dc845623947d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # Author: Sammy Pfeiffer | |
| import rospy | |
| from std_msgs.msg import String | |
| from pal_detection_msgs.msg import PersonDetections, PersonDetection, FaceDetections, FaceDetection | |
| #from pal_user_io_msgs.msg import Input | |
| from pal_control_msgs.msg import MotionManagerActionGoal, MotionManagerAction, MotionManagerGoal | |
| import actionlib | |
| from pal_interaction_msgs.msg import TtsActionGoal | |
| from std_srvs.srv import Empty, EmptyRequest, EmptyResponse | |
| import random | |
| DEFAULT_WAIT_TIME = 15 | |
| NEAR_TIMER = 30 | |
| # TODO: The possible movements and sentences should be set from params to generalize the node | |
| # Also, it probably should be executed as sentences + movements which are funnier | |
| # Also, initiating a full dialogue could be a good idea too, starting with handshake, if we detect | |
| # someone following it, it will be our servant, doing hi im reem, whats ur name, im from bcn, where r u from | |
| # lets take a photo, smile, thank you gets everyone amazed (adding some smartness on timings by detection | |
| # would make it wonderful) | |
| # TODO: Detect people using phone to take photo, depth with person with arm in front may be a hint | |
| # TODO: Detect speech recog "hi" "hand shake". with that you cover sooo many interactions | |
| alone = 0 | |
| far = 1 | |
| close = 2 | |
| screentouched = 3 | |
| class SimpleYasMallBehaviour(object): | |
| def __init__(self): | |
| rospy.loginfo("Initializing SimpleYasMallBehaviour") | |
| self.sub_people = None #rospy.Subscriber('/pal_person/detections', PersonDetections, self.people_cb, queue_size=1) | |
| self.sub_face = None #rospy.Subscriber('/pal_face/faces', FaceDetections, self.faces_cb, queue_size=1) | |
| self.sub_screen = None #rospy.Subscriber('/user/input', Input, self.screen_cb, queue_size=1) | |
| self.motion_manager_ac = None | |
| self.tablet_says_user_near_sub = None | |
| self.pub_tts = None | |
| self.close_level = alone #alone far close screentouched == 0 1 2 3 | |
| self.near_timer = None | |
| self.running = False | |
| self.start_srv = rospy.Service('yasmall_behaviour_start', Empty, self.start_cb) | |
| self.stop_srv = rospy.Service('yasmall_behaviour_stop', Empty, self.stop_cb) | |
| self.start_cb(EmptyRequest()) | |
| self.run() | |
| def set_people_near_false(self, timerdata): | |
| rospy.loginfo("No people detected in " + str(NEAR_TIMER) + "s") | |
| self.close_level = alone | |
| def update_near_status(self, close_level): | |
| # the most close we saw someone, the most it counts | |
| if close_level >= self.close_level: | |
| self.close_level = close_level | |
| if self.near_timer is not None: | |
| self.near_timer.shutdown() | |
| else: | |
| self.near_timer = rospy.Timer(rospy.Duration(NEAR_TIMER), self.set_people_near_false, oneshot=True) | |
| def start_cb(self, req): | |
| rospy.loginfo("Got a request to start") | |
| self.sub_people = rospy.Subscriber('/pal_person/detections', PersonDetections, self.people_cb, queue_size=1) | |
| self.sub_face = rospy.Subscriber('/pal_face/faces', FaceDetections, self.faces_cb, queue_size=1) | |
| self.self.tablet_says_user_near_sub = rospy.Subscriber('/tablet_people_near', String, self.tablet_cb queue_size=1) | |
| #self.sub_screen = rospy.Subscriber('/user/input', Input, self.screen_cb, queue_size=1) | |
| #self.pub_mov = rospy.Publisher('/motion_manager/goal', MotionManagerActionGoal, queue_size=1) | |
| self.motion_manager_ac = actionlib.SimpleActionClient('/motion_manager', MotionManagerAction) | |
| self.motion_manager_ac.wait_for_server() | |
| self.pub_tts = rospy.Publisher('/tts/goal', TtsActionGoal, queue_size=1) | |
| self.running = True | |
| return EmptyResponse() | |
| def stop_cb(self, req): | |
| rospy.loginfo("Got a request to stop") | |
| self.running = False | |
| if self.sub_people is not None: | |
| self.sub_people.unregister() | |
| self.sub_people = None | |
| self.sub_people = None #rospy.Subscriber('/pal_person/detections', PersonDetections, self.people_cb, queue_size=1) | |
| if self.sub_face is not None: | |
| self.sub_face.unregister() | |
| self.sub_face = None #rospy.Subscriber('/pal_face/faces', FaceDetections, self.faces_cb, queue_size=1) | |
| if self.sub_screen is not None: | |
| self.sub_screen.unregister() | |
| self.sub_screen = None #rospy.Subscriber('/user/input', Input, self.screen_cb, queue_size=1) | |
| if self.motion_manager_ac is not None: | |
| # There is no unregister for action clients | |
| self.motion_manager_ac = None | |
| if self.pub_tts is not None: | |
| self.pub_tts.unregister() | |
| self.pun_tts = None | |
| if self.self.tablet_says_user_near_sub is not None: | |
| self.self.tablet_says_user_near_sub.unregister() | |
| self.self.tablet_says_user_near_sub = None | |
| return EmptyResponse() | |
| def tablet_cb(self, data): | |
| rospy.loginfo("Tablet says there is someone") | |
| self.update_near_status(close) | |
| def people_cb(self, data): | |
| """ | |
| :type data: PersonDetections | |
| """ | |
| rospy.loginfo("peoplecb") | |
| self.update_near_status(far) | |
| def faces_cb(self, data): | |
| """ | |
| :type data: FaceDetections | |
| """ | |
| # faces message is published continuously even without data, we need to check if there are faces | |
| if len(data.faces) > 0: | |
| rospy.loginfo("facescb") | |
| self.update_near_status(close) | |
| def screen_cb(self, data): | |
| """ | |
| :type data: | |
| """ | |
| rospy.loginfo("screencb") | |
| self.update_near_status(screentouched) | |
| def send_movement(self, movement): | |
| mag = MotionManagerGoal() | |
| mag.checkSafety = False | |
| mag.plan = True | |
| mag.filename = movement | |
| mag.priority = 50 | |
| rospy.loginfo(" Executing movement: " + str(movement)) | |
| self.motion_manager_ac.send_goal_and_wait(mag) | |
| def send_sentence(self, group, key): | |
| tag = TtsActionGoal() | |
| tag.goal.text.section = group | |
| tag.goal.text.key = key | |
| rospy.loginfo(" Saying: " + str(group) + " " + str(key)) | |
| self.pub_tts.publish(tag) | |
| def far_people_behaviour(self): | |
| rospy.loginfo("Far people behaviour executing") | |
| sentences_list = [["entertainment", "greetings"]] | |
| sentence = random.choice(sentences_list) | |
| self.send_sentence(sentence[0], sentence[1]) | |
| # do entertaining stuff | |
| movements_list = ["taking_picture_mimic", "wave", "bow", "come_here"] | |
| movement = random.choice(movements_list) | |
| self.send_movement(movement) | |
| def close_people_behaviour(self): | |
| rospy.loginfo("Close people behaviour executing") | |
| sentences_list = [["entertainment", "icebreaker"], | |
| ["entertainment", "compliments"], | |
| ["general", "reemName"], | |
| ["general", "iAmFromBarcelona"], | |
| ["entertainment", "takephoto"]] | |
| sentence = random.choice(sentences_list) | |
| self.send_sentence(sentence[0], sentence[1]) | |
| # Do entertaining stuff (consider have dialogue) | |
| movements_list = ["shake_right", "shake_right", "wave", "taking_picture_mimic"] | |
| movement = random.choice(movements_list) | |
| self.send_movement(movement) | |
| def idle_behaviour(self): | |
| rospy.loginfo("Executing idle behaviour") | |
| sentences_list = [["entertainment","greetings"], ["ferrari", "cascade"]] | |
| sentence = random.choice(sentences_list) | |
| self.send_sentence(sentence[0], sentence[1]) | |
| # do potentially attracting (And unsafe) stuff | |
| movements_list = ["taking_picture_mimic", "human_form", "car_drive_ferrari"] | |
| movement = random.choice(movements_list) | |
| self.send_movement(movement) | |
| def run(self): | |
| rospy.loginfo("Running!") | |
| while not rospy.is_shutdown(): | |
| if self.running: | |
| # If no perception of people was received in the last 30s we consider the robot alone | |
| # So we can do some bigger movements (ala ferrari driving) | |
| if self.close_level == alone: | |
| self.idle_behaviour() | |
| # If perception found people close by face detection (works only quite close on REEM) | |
| # or by touches in the screen, we can do smaller movements and try to start conversation | |
| elif self.close_level == close: | |
| self.close_people_behaviour() | |
| # If perception detected people far, we try to attract them with attracting movements | |
| elif self.close_level == far: | |
| self.far_people_behaviour() | |
| elif self.close_level == screentouched: | |
| # If the screen was touched, do nothing for a while, then return to have people close | |
| # This is the only place where we reset from screentouched | |
| rospy.sleep(DEFAULT_WAIT_TIME) | |
| self.close_level = close | |
| rospy.sleep(DEFAULT_WAIT_TIME) | |
| else: # Do nothing while waiting | |
| rospy.sleep(0.1) | |
| if __name__ == '__main__': | |
| rospy.init_node('simpleyasmallbehaviour') | |
| s = SimpleYasMallBehaviour() | |
| rospy.spin() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment