Skip to content

Instantly share code, notes, and snippets.

@surajRathi
Last active May 18, 2022 12:22
Show Gist options
  • Save surajRathi/53c29765808b8357227bf15a369883a7 to your computer and use it in GitHub Desktop.
Save surajRathi/53c29765808b8357227bf15a369883a7 to your computer and use it in GitHub Desktop.
#! /usr/bin/python3
import time
from math import pi
from typing import List, Optional, Tuple
import serial
# Received garbage
class InvalidResponse(Exception):
def __init__(self, cmd, response=None):
# Call the base class constructor with the parameters it needs
super().__init__(f"Invalid response to {cmd}" + f" gave {response}" if response else "")
# Received negative response
class FailResponse(Exception):
def __init__(self, cmd):
# Call the base class constructor with the parameters it needs
super().__init__(f"Running {cmd} failed")
# @dataclasses.dataclass
# class LR:
# left: int = 0
# right: int = 0
# run_cmd !.. no reply !?!
# run_query ?.. ..=..
# query_history
# maintainance
# read_config ~.. - on fail
# write_config ^.. .. + / -
class Roboteq:
# HEDM => 1024 PPR
# Gear Ratio => 66
_pulse_per_rev: int = 1024 * 66
def __init__(self, port="/dev/ttyACM0", baud=115200, timeout=1, print_io=False, speed_scaler=1) -> None:
assert (type(speed_scaler) == int)
assert (speed_scaler > 0)
assert (self._pulse_per_rev % speed_scaler == 0)
self._speed_scaler = speed_scaler
self._last_rpm: List[int] = [0, 0]
self.print_io = print_io
self.ser = serial.Serial(
port,
baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=timeout,
)
self.ser.reset_input_buffer()
self.ser.read_until(b"\r")
# def _get_line(self) -> bytes:
# while True:
# ret = self.ser.read_until(b"\r")
# # print("===", ret)
# if ret.decode()[0] != "&":
# return ret
# IO Related
def _clear_input(self):
self.ser.reset_input_buffer()
if self.print_io:
print("===\tCleared Input Buffer")
def _write_str(self, line: str):
self.ser.write(f"{line}\r".encode("ASCII"))
if self.print_io:
print(f">>>\t{line}")
def _write_bytes(self, line: bytes):
self.ser.write(line + b'\r')
if self.print_io:
print(f">>>\t{line.decode('ASCII')}")
def _get_line(self) -> str:
ret = self.ser.read_until(b"\r").decode('ASCII')[:-1]
if self.print_io:
print(f"<<<\t{ret}")
return ret
# Response Handling Related
def _check_success_fail(self, cmd=""):
ret = self._get_line()
if ret == "+":
return
if ret == "-":
raise FailResponse(cmd)
raise InvalidResponse(cmd, ret)
def _get_str_response(self, cmd="") -> str:
name = cmd.split(" ")[0]
ret = self._get_line()
try:
if not (ret.startswith(name) and ret[len(name)] == "="):
raise InvalidResponse(cmd, ret)
print(f"{cmd}:\t{ret[len(name) + 1:]}")
return ret[len(name) + 1:]
except IndexError:
raise InvalidResponse(cmd, ret)
# Disable Outputs
def disable_echo(self) -> None:
cmd = b"^ECHOF 1"
self._clear_input()
self._write_bytes(cmd)
# TODO: Why is there no response?
# print(cmd)
ret = self._get_line()
if ret == cmd:
ret = self._get_line()
if not ret == '+':
print("Echo Failed")
print(self._get_line(), self._get_line())
raise InvalidResponse(cmd[1:-1], ret)
print("Setting Echo Off Success")
def disable_other_outputs(self, retrys=20):
self._write_str('# ')
self._write_str('# C')
self._write_str('# ')
time.sleep(0.2)
self._clear_input()
self._write_str('!R 0')
for i in range(20):
if self._get_line() == '+':
return
raise InvalidResponse('!R 0')
# Call roboteq command/config/etc
def run_cmd(self, cmd: str) -> None:
self._write_str(f"!{cmd}")
self._check_success_fail(cmd)
def run_query(self, cmd: str) -> str:
self._write_str(f"?{cmd}")
return self._get_str_response(cmd)
# TODO: query history
def set_config(self, cmd: str) -> None:
self._write_str(f"^{cmd}")
self._check_success_fail(cmd)
def get_config(self, cmd: str) -> str:
self._write_str(f"~{cmd}")
return self._get_str_response(cmd)
# Flags Checker
def check_flags(self) -> None:
self.check_runtime_status_flags()
self.check_fault_flags()
self.check_status_flags()
def check_runtime_status_flags(self) -> None:
# Run the required queries
# Parse the output
# Print which all fail conditions are true
cmd = "FM 1"
val = self.run_query(cmd)
val = int(val)
if val & 1:
print("Runtime Status flag detected: Motor 1: Amps Limit currently active")
if val & 2:
print("Runtime Status flag detected: Motor 1: Motor stalled")
if val & 4:
print("Runtime Status flag detected: Motor 1: Loop Error detected")
if val & 8:
print("Runtime Status flag detected: Motor 1: Safety Stop active")
if val & 16:
print("Runtime Status flag detected: Motor 1: Forward Limit triggered")
if val & 32:
print("Runtime Status flag detected: Motor 1: Reverse Limit triggered")
if val & 64:
print("Runtime Status flag detected: Motor 1: Amps Trigger activated")
if val & 255 == 0:
print("No Status flags")
cmd = "FM 2"
val = self.run_query(cmd)
val = int(val)
if val & 1:
print("Runtime Status flag detected: Motor 2: Amps Limit currently active")
if val & 2:
print("Runtime Status flag detected: Motor 2: Motor stalled")
if val & 4:
print("Runtime Status flag detected: Motor 2: Loop Error detected")
if val & 8:
print("Runtime Status flag detected: Motor 2: Safety Stop active")
if val & 16:
print("Runtime Status flag detected: Motor 2: Forward Limit triggered")
if val & 32:
print("Runtime Status flag detected: Motor 2: Reverse Limit triggered")
if val & 64:
print("Runtime Status flag detected: Motor 2: Amps Trigger activated")
if val & 255 == 0:
print("No Status flags")
def check_status_flags(self) -> None:
# Run the required query
# Parse the output
# Print which all fail conditions are true
cmd = "FS"
val = self.run_query(cmd)
val = int(val)
if val & 1:
print("Status: Serial mode")
if val & 2:
print("Status: Pulse mode")
if val & 4:
print("Status: Analog mode")
if val & 8:
print("Status: Power stage off")
if val & 16:
print("Status: Stall detected")
if val & 32:
print("Status: At limit")
if val & 128:
print("Status: MicroBasic script running")
if val & 255 == 0:
print("No Status flags")
def check_fault_flags(self) -> None:
# Run the required querys
# Parse the output
# Print which all fail conditions are true
cmd = "FF"
val = self.run_query(cmd)
val = int(val)
if val & 1:
print("Fault detected: Overheat")
if val & 2:
print("Fault detected: Overvoltage")
if val & 4:
print("Fault detected: Undervoltage")
if val & 8:
print("Fault detected: Short circuit")
if val & 16:
print("Fault detected: Emergency stop")
if val & 32:
print("Fault detected: Brushless sensor fault")
if val & 64:
print("Fault detected: MOSFET failure")
if val & 128:
print("Fault detected: Default configuration loaded at startup")
if val & 255 == 0:
print("No faults")
# Set Speed
def _set_rpm(self, v1: Optional[int] = None, v2: Optional[int] = None, force=False):
if v1 is not None:
if force or self._last_rpm[0] != v1:
self.run_cmd(f"S 1 {v1}")
self._last_rpm[0] = v1
if v2 is not None:
if force or self._last_rpm[1] != v2:
self.run_cmd(f"S 2 {v2}")
self._last_rpm[1] = v2
# Setup
def setup_closed_loop(self):
self.run_cmd("EX")
print("Set E-Stop")
print("Setting up Configuration")
setup_configs = [
"RWD 0", # Disable Watchdog
# Voltage Limits
"OVL 300",
"UVL 200",
# Current Limits
"ALIM 1 200", # Current Limit 20 A - motor 1
"ALIM 2 200", # Current Limit 20 A - motor 2
"ATRIG 1 180", # Current Trigger 18 A
"ATRIG 2 180", # Current Trigger 18 A
# Setup encoder counts
f"EPPR 1 {int(self._pulse_per_rev / self._speed_scaler)}",
f"EPPR 2 {int(self._pulse_per_rev / self._speed_scaler)}",
# Setup PID Params
"KP 1 10",
"KP 2 13",
"KI 1 10",
"KI 2 10",
"KD 1 0",
"KD 2 0",
# Setup encoder modes
"EMOD 1 18",
"EMOD 2 34",
# Motor Mode
# 0: Open-loop
# 1: Closed-loop speed
# 2: Closed-loop position relative
# 3: Closed-loop count position
# 4: Closed-loop position tracking
# 5: Closed-loop torque
# 6: Closed-loop speed position
"MMOD 1 1",
"MMOD 2 1",
# Mixed Mode
"MXMD 0",
]
try:
for cmd in setup_configs:
self.set_config(cmd)
except InvalidResponse:
print("Setup Commands Failed")
return
self._set_rpm(v1=0, v2=0, force=True)
self.run_cmd("MG")
print("Released E-Stop")
def setup(self):
print("Disabling other outputs")
self.disable_other_outputs()
print("Disabling echos")
self.disable_echo()
self.setup_closed_loop()
self.run_query("CR") # Zero the Encoder Ticks
# Runtime
# returns fraction of
def get_radians_turned(self) -> Tuple[float, float]:
# TODO: Check for overflow
wheel_ticks = self.run_query("CR") # Read Encoder Counter Absolute
n1, n2 = map(int, wheel_ticks.split(''))
return 2 * pi * n1 / self._pulse_per_rev, 2 * pi * n2 / self._pulse_per_rev
def set_rpm(self, v1: Optional[float] = None, v2: Optional[float] = None, force=False):
self._set_rpm(v1=None if v1 is None else int(v1 * self._speed_scaler),
v2=None if v2 is None else int(v2 * self._speed_scaler),
force=force)
# HEDM => 1024 PPR
# Gear Ratio => 66
# PPR = 67584
"""The formula below gives the pulse frequency at a given RPM and encoder resolution in
Pulses per Revolution.
Pulse Frequency in counts per second = RPM / 60 * PPR * 4
Connecting Sensors and Actuators to Input/Outputs
58 Advanced Digital Motor Controller User Manual V2.0, July 8, 2019
Example: a motor spinning at 10,000 RPM max, with an encoder with 200 Pulses per Revolution would generate:
10,000 / 60 * 200 * 4 = 133.3 kHz which is well within the 1MHz maximum supported by
the encoder input.
An encoder with 200 Pulses per Revolutions is a good choice for most applications."""
def main():
c = Roboteq(port="/dev/tty.usbmodemSDC2XXX1", print_io=False)
c.setup()
c.print_io = True
speeds = [0, 1, 1, 0, -1, -1]
i = 0
deg1, deg2 = 0, 0
try:
while True:
start = time.time()
c.set_rpm(speeds[i], speeds[i])
rad1, rad2 = c.get_radians_turned()
deg1 += 180 * rad1 / pi
deg2 += 180 * rad2 / pi
print(deg1, deg2, sep='\t')
print()
c.check_flags()
i += 1
i %= len(speeds)
time.sleep(1. - (time.time() - start))
except (KeyboardInterrupt, Exception) as e:
c.run_cmd("EX")
print("E-Stopped!")
if type(e) != KeyboardInterrupt:
raise e
if __name__ == "__main__":
main()
import math
import rospy
from geometry_msgs.msg import Twist, Pose, Quaternion, Vector3, Point
from nav_msgs.msg import Odometry
from tf.transformations import quaternion_from_euler
from roboteq_drver import Roboteq
class RoboteqROSDriver:
wheel_dist: float = 0.67
wheel_radius: float = 0.25 / 2
def __init__(self, odom_topic: str = 'odom', cmd_vel_topic: str = 'cmd_vel',
rate: float = 10,
port: str = "/dev/ttyACM0", baud: int = 115200):
rospy.init_node('roboteq_ros_driver')
self.rate = rate
self.x, self.y, self.yaw = 0, 0, 0
self.odom_pub = rospy.Publisher(odom_topic, Odometry, queue_size=20)
self.cmd_vel: Twist = Twist()
rospy.Subscriber(cmd_vel_topic, Twist, self.cmd_vel_callback)
# A speed scalar of n provides resolution of 1/n revolutions
# n must be a factor of `Roboteq._pulse_per_rev`
self.roboteq = Roboteq(port="/dev/ttyACM0", baud=115200, speed_scaler=1)
self.roboteq.setup()
rospy.loginfo(f"{rospy.get_name()} started")
def send_odom(self):
odom = Odometry()
odom.header.stamp = rospy.Time.now()
odom.header.frame_id = "odom"
odom.child_frame_id = "base_link"
# set the position
odom.pose.pose = Pose(
Point(self.x, self.y, 0.0), Quaternion(*quaternion_from_euler(0, 0, self.yaw))
)
odom.pose.covariance[0] = 0.1
odom.pose.covariance[1 + 6] = 0.1
odom.pose.covariance[2 + 12] = 0.1
odom.pose.covariance[5 + 30] = 0.1
self.odom_pub.publish(odom)
def cmd_vel_callback(self, msg):
self.cmd_vel = msg
def run(self):
r = rospy.Rate(self.rate)
msg = Odometry()
msg.header.frame_id = "map"
while not rospy.is_shutdown():
v, w = self.cmd_vel.linear.x, self.cmd_vel.angular.z
# Check omega sign
self.roboteq.set_rpm((v + w * self.wheel_dist / 2) / self.wheel_radius,
(v - w * self.wheel_dist / 2) / self.wheel_radius)
rad1, rad2 = self.roboteq.get_radians_turned()
s1, s2 = rad1 * self.wheel_radius, rad2 * self.wheel_radius
self.x += ((s1 + s2) / 2) * math.cos(self.yaw)
self.y += ((s1 + s2) / 2) * math.sin(self.yaw)
self.yaw += (s1 - s2) / self.wheel_dist
self.send_odom()
r.sleep()
def main():
driver = RoboteqROSDriver()
driver.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment