Created
April 23, 2020 16:10
-
-
Save mohkale/742de87775e37dad9ee90520b0f68e59 to your computer and use it in GitHub Desktop.
turtlebot3 move square script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import enum | |
import rospy | |
import math | |
import functools | |
import sys | |
from nav_msgs.msg import Odometry | |
from geometry_msgs.msg import Twist | |
from tf.transformations import euler_from_quaternion as efq | |
SQUARE_SIZE = 1 # whatever arbitrary unit /odom uses | |
odom_to_pos = lambda data: (data.pose.pose.position.x, data.pose.pose.position.y, data.pose.pose.position.z) | |
def odom_to_rpy(data): | |
orient = data.pose.pose.orientation | |
roll, pitch, yaw = efq([orient.x, orient.y, orient.z, orient.w], 'sxyz') | |
return roll, pitch, yaw | |
LINEAR_VELOCITY = 0.3 | |
ANGULAR_VELOCITY = 0.3 | |
# yaw' = yaw + pi | |
# 0 | |
# +------------------+ | |
# | | | | |
# | F | | |
# | | | | |
# pi/2 |----R---+---L-----| 3 * pi/2 | |
# | | | | |
# | B | | |
# | | | | |
# +------------------+ | |
# pi | |
# NOTE twist.linear.x is forward (positive), backward (negative) | |
# twist.angular.z is leftward (positive), rightward (negative) | |
# pose.position.x is position in forward direction | |
# pose.position.y is position in sidewards direction | |
PI_4 = math.pi / 4 | |
class Direction(enum.Enum): | |
up = 1 | |
down = 2 | |
left = 3 | |
right = 4 | |
@classmethod | |
def from_orient(cls, orient): | |
"""Get direction from a roll, pitch, yaw array. | |
1 * pi/4 7 * pi/4 | |
+-------+ | |
|\ B /| | |
| \ / | | |
| \ / | | |
|R + L| | |
| / \ | | |
| / \ | | |
|/ F \| | |
+-------+ | |
3 * pi/4 5 * pi/4 | |
""" | |
yaw = orient[2] + math.pi | |
if yaw > PI_4 and yaw <= 3 * PI_4: | |
return cls.right | |
elif yaw > 3 * PI_4 and yaw <= 5 * PI_4: | |
return cls.up | |
elif yaw > 5 * PI_4 and yaw <= 7 * PI_4: | |
return cls.left | |
else: | |
return cls.down | |
def skip_if_unassigned(attrib): | |
def decorator(func): | |
@functools.wraps(func) | |
def wrapped(self, *args, **kwargs): | |
if hasattr(self, attrib) and getattr(self, attrib): | |
return func(self, *args, **kwargs) | |
return wrapped | |
return decorator | |
class State(object): | |
def __init__(self, start_pos=None, start_orient=None, direction=None): | |
self.start_pos = start_pos | |
self.start_orient = start_orient | |
self.pos = start_pos | |
self.orient = start_orient | |
self.start_direction = direction | |
def update(self, data): | |
self.pos = odom_to_pos(data) | |
self.orient = odom_to_rpy(data) | |
if not self.start_pos: | |
self.start_pos = self.pos | |
if not self.start_orient: | |
self.start_orient = self.orient | |
if not self.start_direction: | |
self.start_direction = Direction.from_orient(self.orient) | |
rospy.loginfo('currently facing direction: %s', self.start_direction) | |
class StateChangeSignal(Exception): | |
def __init__(self, new_state): | |
self.new_state = new_state | |
class TurningState(State): | |
@skip_if_unassigned('start_orient') | |
def loop(self, ms): | |
if self._reached_destination(): | |
raise StateChangeSignal(ForwardState( | |
self.pos, self.orient)) | |
data = Twist() | |
data.angular.z = ANGULAR_VELOCITY | |
ms.vel_pub.publish(data) | |
def _reached_destination(self): | |
yaw = self.orient[2] + math.pi | |
if self.start_direction == Direction.right: | |
return yaw > math.pi | |
elif self.start_direction == Direction.up: | |
return yaw > 3 * math.pi / 2 | |
elif self.start_direction == Direction.left: | |
return yaw < PI_4 | |
elif self.start_direction == Direction.down: | |
return yaw > math.pi / 2 | |
else: | |
# print("error(direction) : unknown direction: %s" % self.start_direction, file=sys.stderr) | |
print("error(direction) : unknown direction: %s" % self.start_direction) | |
class ForwardState(State): | |
def __init__(self, *args, **kwargs): | |
self.forwards = None | |
self.sideways = None | |
super(ForwardState, self).__init__(*args, **kwargs) | |
@skip_if_unassigned('start_pos') | |
def loop(self, ms): | |
if self._reached_destination(): | |
raise StateChangeSignal(TurningState( | |
self.pos, self.orient)) | |
data = Twist() | |
data.linear.x = LINEAR_VELOCITY | |
ms.vel_pub.publish(data) | |
def _set_direction(self): | |
if self.start_direction == Direction.up: | |
self.sideways = False | |
self.forwards = True | |
elif self.start_direction == Direction.left: | |
self.sideways = True | |
self.forwards = True | |
elif self.start_direction == Direction.down: | |
self.sideways = False | |
self.forwards = False | |
elif self.start_direction == Direction.right: | |
self.sideways = True | |
self.forwards = False | |
else: | |
# print("error(direction) : unknown direction: %s" % self.start_direction, file=sys.stderr) | |
print("error(direction) : unknown direction: %s" % self.start_direction) | |
def _reached_destination(self): | |
if self.sideways is None or self.forwards is None: | |
self._set_direction() | |
axis = 1 if self.sideways else 0 | |
mult = 1 if self.forwards else -1 | |
if mult > 0: | |
return self.pos[axis] > self.start_pos[axis] + (mult * SQUARE_SIZE) | |
else: | |
return self.pos[axis] < self.start_pos[axis] + (mult * SQUARE_SIZE) | |
class MoveSquare: | |
def __init__(self): | |
self.start_position = None | |
self.start_orientation = None | |
self.state = ForwardState() | |
# self.state = TurningState() | |
self.vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) | |
self.odom_sub = rospy.Subscriber('/odom', Odometry, self.update) | |
rospy.init_node('square-node', anonymous=True) | |
self.rate = rospy.Rate(10) # hz | |
self.reset_delay = rospy.Rate(1000) | |
self.ctrl_c = False | |
rospy.on_shutdown(self.shutdown_hook) | |
rospy.loginfo('square node-is active.') | |
def shutdown_hook(self): | |
self.ctrl_c = True | |
self.reset_topics() | |
rospy.Publisher('/odom', Odometry, queue_size=1).publish(Odometry()) | |
print('stopping square-node at: %s' % rospy.get_time()) | |
def update(self, data): | |
self.state.update(data) | |
self.reset_delay.sleep() | |
# direction = Direction.from_orient(odom_to_rpy(data)) | |
# print(direction) | |
def reset_topics(self): | |
self.vel_pub.publish(Twist()) | |
def main_loop(self): | |
while not self.ctrl_c: | |
try: | |
self.state.loop(self) | |
except StateChangeSignal as e: | |
self.state = e.new_state | |
rospy.loginfo('switching state: %s', type(e.new_state).__name__) | |
self.reset_topics() | |
# self.ctrl_c = True | |
self.rate.sleep() | |
if __name__ == '__main__': | |
instance = MoveSquare() | |
try: instance.main_loop() | |
except rospy.ROSInterruptException: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment