Skip to content

Instantly share code, notes, and snippets.

@awesomebytes
Created August 15, 2015 18:20
Show Gist options
  • Select an option

  • Save awesomebytes/b3d69422dc845623947d to your computer and use it in GitHub Desktop.

Select an option

Save awesomebytes/b3d69422dc845623947d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# Author: Sammy Pfeiffer
import rospy
from std_msgs.msg import String
from pal_detection_msgs.msg import PersonDetections, PersonDetection, FaceDetections, FaceDetection
#from pal_user_io_msgs.msg import Input
from pal_control_msgs.msg import MotionManagerActionGoal, MotionManagerAction, MotionManagerGoal
import actionlib
from pal_interaction_msgs.msg import TtsActionGoal
from std_srvs.srv import Empty, EmptyRequest, EmptyResponse
import random
DEFAULT_WAIT_TIME = 15
NEAR_TIMER = 30
# TODO: The possible movements and sentences should be set from params to generalize the node
# Also, it probably should be executed as sentences + movements which are funnier
# Also, initiating a full dialogue could be a good idea too, starting with handshake, if we detect
# someone following it, it will be our servant, doing hi im reem, whats ur name, im from bcn, where r u from
# lets take a photo, smile, thank you gets everyone amazed (adding some smartness on timings by detection
# would make it wonderful)
# TODO: Detect people using phone to take photo, depth with person with arm in front may be a hint
# TODO: Detect speech recog "hi" "hand shake". with that you cover sooo many interactions
alone = 0
far = 1
close = 2
screentouched = 3
class SimpleYasMallBehaviour(object):
def __init__(self):
rospy.loginfo("Initializing SimpleYasMallBehaviour")
self.sub_people = None #rospy.Subscriber('/pal_person/detections', PersonDetections, self.people_cb, queue_size=1)
self.sub_face = None #rospy.Subscriber('/pal_face/faces', FaceDetections, self.faces_cb, queue_size=1)
self.sub_screen = None #rospy.Subscriber('/user/input', Input, self.screen_cb, queue_size=1)
self.motion_manager_ac = None
self.tablet_says_user_near_sub = None
self.pub_tts = None
self.close_level = alone #alone far close screentouched == 0 1 2 3
self.near_timer = None
self.running = False
self.start_srv = rospy.Service('yasmall_behaviour_start', Empty, self.start_cb)
self.stop_srv = rospy.Service('yasmall_behaviour_stop', Empty, self.stop_cb)
self.start_cb(EmptyRequest())
self.run()
def set_people_near_false(self, timerdata):
rospy.loginfo("No people detected in " + str(NEAR_TIMER) + "s")
self.close_level = alone
def update_near_status(self, close_level):
# the most close we saw someone, the most it counts
if close_level >= self.close_level:
self.close_level = close_level
if self.near_timer is not None:
self.near_timer.shutdown()
else:
self.near_timer = rospy.Timer(rospy.Duration(NEAR_TIMER), self.set_people_near_false, oneshot=True)
def start_cb(self, req):
rospy.loginfo("Got a request to start")
self.sub_people = rospy.Subscriber('/pal_person/detections', PersonDetections, self.people_cb, queue_size=1)
self.sub_face = rospy.Subscriber('/pal_face/faces', FaceDetections, self.faces_cb, queue_size=1)
self.self.tablet_says_user_near_sub = rospy.Subscriber('/tablet_people_near', String, self.tablet_cb queue_size=1)
#self.sub_screen = rospy.Subscriber('/user/input', Input, self.screen_cb, queue_size=1)
#self.pub_mov = rospy.Publisher('/motion_manager/goal', MotionManagerActionGoal, queue_size=1)
self.motion_manager_ac = actionlib.SimpleActionClient('/motion_manager', MotionManagerAction)
self.motion_manager_ac.wait_for_server()
self.pub_tts = rospy.Publisher('/tts/goal', TtsActionGoal, queue_size=1)
self.running = True
return EmptyResponse()
def stop_cb(self, req):
rospy.loginfo("Got a request to stop")
self.running = False
if self.sub_people is not None:
self.sub_people.unregister()
self.sub_people = None
self.sub_people = None #rospy.Subscriber('/pal_person/detections', PersonDetections, self.people_cb, queue_size=1)
if self.sub_face is not None:
self.sub_face.unregister()
self.sub_face = None #rospy.Subscriber('/pal_face/faces', FaceDetections, self.faces_cb, queue_size=1)
if self.sub_screen is not None:
self.sub_screen.unregister()
self.sub_screen = None #rospy.Subscriber('/user/input', Input, self.screen_cb, queue_size=1)
if self.motion_manager_ac is not None:
# There is no unregister for action clients
self.motion_manager_ac = None
if self.pub_tts is not None:
self.pub_tts.unregister()
self.pun_tts = None
if self.self.tablet_says_user_near_sub is not None:
self.self.tablet_says_user_near_sub.unregister()
self.self.tablet_says_user_near_sub = None
return EmptyResponse()
def tablet_cb(self, data):
rospy.loginfo("Tablet says there is someone")
self.update_near_status(close)
def people_cb(self, data):
"""
:type data: PersonDetections
"""
rospy.loginfo("peoplecb")
self.update_near_status(far)
def faces_cb(self, data):
"""
:type data: FaceDetections
"""
# faces message is published continuously even without data, we need to check if there are faces
if len(data.faces) > 0:
rospy.loginfo("facescb")
self.update_near_status(close)
def screen_cb(self, data):
"""
:type data:
"""
rospy.loginfo("screencb")
self.update_near_status(screentouched)
def send_movement(self, movement):
mag = MotionManagerGoal()
mag.checkSafety = False
mag.plan = True
mag.filename = movement
mag.priority = 50
rospy.loginfo(" Executing movement: " + str(movement))
self.motion_manager_ac.send_goal_and_wait(mag)
def send_sentence(self, group, key):
tag = TtsActionGoal()
tag.goal.text.section = group
tag.goal.text.key = key
rospy.loginfo(" Saying: " + str(group) + " " + str(key))
self.pub_tts.publish(tag)
def far_people_behaviour(self):
rospy.loginfo("Far people behaviour executing")
sentences_list = [["entertainment", "greetings"]]
sentence = random.choice(sentences_list)
self.send_sentence(sentence[0], sentence[1])
# do entertaining stuff
movements_list = ["taking_picture_mimic", "wave", "bow", "come_here"]
movement = random.choice(movements_list)
self.send_movement(movement)
def close_people_behaviour(self):
rospy.loginfo("Close people behaviour executing")
sentences_list = [["entertainment", "icebreaker"],
["entertainment", "compliments"],
["general", "reemName"],
["general", "iAmFromBarcelona"],
["entertainment", "takephoto"]]
sentence = random.choice(sentences_list)
self.send_sentence(sentence[0], sentence[1])
# Do entertaining stuff (consider have dialogue)
movements_list = ["shake_right", "shake_right", "wave", "taking_picture_mimic"]
movement = random.choice(movements_list)
self.send_movement(movement)
def idle_behaviour(self):
rospy.loginfo("Executing idle behaviour")
sentences_list = [["entertainment","greetings"], ["ferrari", "cascade"]]
sentence = random.choice(sentences_list)
self.send_sentence(sentence[0], sentence[1])
# do potentially attracting (And unsafe) stuff
movements_list = ["taking_picture_mimic", "human_form", "car_drive_ferrari"]
movement = random.choice(movements_list)
self.send_movement(movement)
def run(self):
rospy.loginfo("Running!")
while not rospy.is_shutdown():
if self.running:
# If no perception of people was received in the last 30s we consider the robot alone
# So we can do some bigger movements (ala ferrari driving)
if self.close_level == alone:
self.idle_behaviour()
# If perception found people close by face detection (works only quite close on REEM)
# or by touches in the screen, we can do smaller movements and try to start conversation
elif self.close_level == close:
self.close_people_behaviour()
# If perception detected people far, we try to attract them with attracting movements
elif self.close_level == far:
self.far_people_behaviour()
elif self.close_level == screentouched:
# If the screen was touched, do nothing for a while, then return to have people close
# This is the only place where we reset from screentouched
rospy.sleep(DEFAULT_WAIT_TIME)
self.close_level = close
rospy.sleep(DEFAULT_WAIT_TIME)
else: # Do nothing while waiting
rospy.sleep(0.1)
if __name__ == '__main__':
rospy.init_node('simpleyasmallbehaviour')
s = SimpleYasMallBehaviour()
rospy.spin()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment