Last active
May 18, 2022 12:22
-
-
Save surajRathi/53c29765808b8357227bf15a369883a7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python3 | |
import time | |
from math import pi | |
from typing import List, Optional, Tuple | |
import serial | |
# Received garbage | |
class InvalidResponse(Exception): | |
def __init__(self, cmd, response=None): | |
# Call the base class constructor with the parameters it needs | |
super().__init__(f"Invalid response to {cmd}" + f" gave {response}" if response else "") | |
# Received negative response | |
class FailResponse(Exception): | |
def __init__(self, cmd): | |
# Call the base class constructor with the parameters it needs | |
super().__init__(f"Running {cmd} failed") | |
# @dataclasses.dataclass | |
# class LR: | |
# left: int = 0 | |
# right: int = 0 | |
# run_cmd !.. no reply !?! | |
# run_query ?.. ..=.. | |
# query_history | |
# maintainance | |
# read_config ~.. - on fail | |
# write_config ^.. .. + / - | |
class Roboteq: | |
# HEDM => 1024 PPR | |
# Gear Ratio => 66 | |
_pulse_per_rev: int = 1024 * 66 | |
def __init__(self, port="/dev/ttyACM0", baud=115200, timeout=1, print_io=False, speed_scaler=1) -> None: | |
assert (type(speed_scaler) == int) | |
assert (speed_scaler > 0) | |
assert (self._pulse_per_rev % speed_scaler == 0) | |
self._speed_scaler = speed_scaler | |
self._last_rpm: List[int] = [0, 0] | |
self.print_io = print_io | |
self.ser = serial.Serial( | |
port, | |
baud, | |
parity=serial.PARITY_NONE, | |
stopbits=serial.STOPBITS_ONE, | |
bytesize=serial.EIGHTBITS, | |
timeout=timeout, | |
) | |
self.ser.reset_input_buffer() | |
self.ser.read_until(b"\r") | |
# def _get_line(self) -> bytes: | |
# while True: | |
# ret = self.ser.read_until(b"\r") | |
# # print("===", ret) | |
# if ret.decode()[0] != "&": | |
# return ret | |
# IO Related | |
def _clear_input(self): | |
self.ser.reset_input_buffer() | |
if self.print_io: | |
print("===\tCleared Input Buffer") | |
def _write_str(self, line: str): | |
self.ser.write(f"{line}\r".encode("ASCII")) | |
if self.print_io: | |
print(f">>>\t{line}") | |
def _write_bytes(self, line: bytes): | |
self.ser.write(line + b'\r') | |
if self.print_io: | |
print(f">>>\t{line.decode('ASCII')}") | |
def _get_line(self) -> str: | |
ret = self.ser.read_until(b"\r").decode('ASCII')[:-1] | |
if self.print_io: | |
print(f"<<<\t{ret}") | |
return ret | |
# Response Handling Related | |
def _check_success_fail(self, cmd=""): | |
ret = self._get_line() | |
if ret == "+": | |
return | |
if ret == "-": | |
raise FailResponse(cmd) | |
raise InvalidResponse(cmd, ret) | |
def _get_str_response(self, cmd="") -> str: | |
name = cmd.split(" ")[0] | |
ret = self._get_line() | |
try: | |
if not (ret.startswith(name) and ret[len(name)] == "="): | |
raise InvalidResponse(cmd, ret) | |
print(f"{cmd}:\t{ret[len(name) + 1:]}") | |
return ret[len(name) + 1:] | |
except IndexError: | |
raise InvalidResponse(cmd, ret) | |
# Disable Outputs | |
def disable_echo(self) -> None: | |
cmd = b"^ECHOF 1" | |
self._clear_input() | |
self._write_bytes(cmd) | |
# TODO: Why is there no response? | |
# print(cmd) | |
ret = self._get_line() | |
if ret == cmd: | |
ret = self._get_line() | |
if not ret == '+': | |
print("Echo Failed") | |
print(self._get_line(), self._get_line()) | |
raise InvalidResponse(cmd[1:-1], ret) | |
print("Setting Echo Off Success") | |
def disable_other_outputs(self, retrys=20): | |
self._write_str('# ') | |
self._write_str('# C') | |
self._write_str('# ') | |
time.sleep(0.2) | |
self._clear_input() | |
self._write_str('!R 0') | |
for i in range(20): | |
if self._get_line() == '+': | |
return | |
raise InvalidResponse('!R 0') | |
# Call roboteq command/config/etc | |
def run_cmd(self, cmd: str) -> None: | |
self._write_str(f"!{cmd}") | |
self._check_success_fail(cmd) | |
def run_query(self, cmd: str) -> str: | |
self._write_str(f"?{cmd}") | |
return self._get_str_response(cmd) | |
# TODO: query history | |
def set_config(self, cmd: str) -> None: | |
self._write_str(f"^{cmd}") | |
self._check_success_fail(cmd) | |
def get_config(self, cmd: str) -> str: | |
self._write_str(f"~{cmd}") | |
return self._get_str_response(cmd) | |
# Flags Checker | |
def check_flags(self) -> None: | |
self.check_runtime_status_flags() | |
self.check_fault_flags() | |
self.check_status_flags() | |
def check_runtime_status_flags(self) -> None: | |
# Run the required queries | |
# Parse the output | |
# Print which all fail conditions are true | |
cmd = "FM 1" | |
val = self.run_query(cmd) | |
val = int(val) | |
if val & 1: | |
print("Runtime Status flag detected: Motor 1: Amps Limit currently active") | |
if val & 2: | |
print("Runtime Status flag detected: Motor 1: Motor stalled") | |
if val & 4: | |
print("Runtime Status flag detected: Motor 1: Loop Error detected") | |
if val & 8: | |
print("Runtime Status flag detected: Motor 1: Safety Stop active") | |
if val & 16: | |
print("Runtime Status flag detected: Motor 1: Forward Limit triggered") | |
if val & 32: | |
print("Runtime Status flag detected: Motor 1: Reverse Limit triggered") | |
if val & 64: | |
print("Runtime Status flag detected: Motor 1: Amps Trigger activated") | |
if val & 255 == 0: | |
print("No Status flags") | |
cmd = "FM 2" | |
val = self.run_query(cmd) | |
val = int(val) | |
if val & 1: | |
print("Runtime Status flag detected: Motor 2: Amps Limit currently active") | |
if val & 2: | |
print("Runtime Status flag detected: Motor 2: Motor stalled") | |
if val & 4: | |
print("Runtime Status flag detected: Motor 2: Loop Error detected") | |
if val & 8: | |
print("Runtime Status flag detected: Motor 2: Safety Stop active") | |
if val & 16: | |
print("Runtime Status flag detected: Motor 2: Forward Limit triggered") | |
if val & 32: | |
print("Runtime Status flag detected: Motor 2: Reverse Limit triggered") | |
if val & 64: | |
print("Runtime Status flag detected: Motor 2: Amps Trigger activated") | |
if val & 255 == 0: | |
print("No Status flags") | |
def check_status_flags(self) -> None: | |
# Run the required query | |
# Parse the output | |
# Print which all fail conditions are true | |
cmd = "FS" | |
val = self.run_query(cmd) | |
val = int(val) | |
if val & 1: | |
print("Status: Serial mode") | |
if val & 2: | |
print("Status: Pulse mode") | |
if val & 4: | |
print("Status: Analog mode") | |
if val & 8: | |
print("Status: Power stage off") | |
if val & 16: | |
print("Status: Stall detected") | |
if val & 32: | |
print("Status: At limit") | |
if val & 128: | |
print("Status: MicroBasic script running") | |
if val & 255 == 0: | |
print("No Status flags") | |
def check_fault_flags(self) -> None: | |
# Run the required querys | |
# Parse the output | |
# Print which all fail conditions are true | |
cmd = "FF" | |
val = self.run_query(cmd) | |
val = int(val) | |
if val & 1: | |
print("Fault detected: Overheat") | |
if val & 2: | |
print("Fault detected: Overvoltage") | |
if val & 4: | |
print("Fault detected: Undervoltage") | |
if val & 8: | |
print("Fault detected: Short circuit") | |
if val & 16: | |
print("Fault detected: Emergency stop") | |
if val & 32: | |
print("Fault detected: Brushless sensor fault") | |
if val & 64: | |
print("Fault detected: MOSFET failure") | |
if val & 128: | |
print("Fault detected: Default configuration loaded at startup") | |
if val & 255 == 0: | |
print("No faults") | |
# Set Speed | |
def _set_rpm(self, v1: Optional[int] = None, v2: Optional[int] = None, force=False): | |
if v1 is not None: | |
if force or self._last_rpm[0] != v1: | |
self.run_cmd(f"S 1 {v1}") | |
self._last_rpm[0] = v1 | |
if v2 is not None: | |
if force or self._last_rpm[1] != v2: | |
self.run_cmd(f"S 2 {v2}") | |
self._last_rpm[1] = v2 | |
# Setup | |
def setup_closed_loop(self): | |
self.run_cmd("EX") | |
print("Set E-Stop") | |
print("Setting up Configuration") | |
setup_configs = [ | |
"RWD 0", # Disable Watchdog | |
# Voltage Limits | |
"OVL 300", | |
"UVL 200", | |
# Current Limits | |
"ALIM 1 200", # Current Limit 20 A - motor 1 | |
"ALIM 2 200", # Current Limit 20 A - motor 2 | |
"ATRIG 1 180", # Current Trigger 18 A | |
"ATRIG 2 180", # Current Trigger 18 A | |
# Setup encoder counts | |
f"EPPR 1 {int(self._pulse_per_rev / self._speed_scaler)}", | |
f"EPPR 2 {int(self._pulse_per_rev / self._speed_scaler)}", | |
# Setup PID Params | |
"KP 1 10", | |
"KP 2 13", | |
"KI 1 10", | |
"KI 2 10", | |
"KD 1 0", | |
"KD 2 0", | |
# Setup encoder modes | |
"EMOD 1 18", | |
"EMOD 2 34", | |
# Motor Mode | |
# 0: Open-loop | |
# 1: Closed-loop speed | |
# 2: Closed-loop position relative | |
# 3: Closed-loop count position | |
# 4: Closed-loop position tracking | |
# 5: Closed-loop torque | |
# 6: Closed-loop speed position | |
"MMOD 1 1", | |
"MMOD 2 1", | |
# Mixed Mode | |
"MXMD 0", | |
] | |
try: | |
for cmd in setup_configs: | |
self.set_config(cmd) | |
except InvalidResponse: | |
print("Setup Commands Failed") | |
return | |
self._set_rpm(v1=0, v2=0, force=True) | |
self.run_cmd("MG") | |
print("Released E-Stop") | |
def setup(self): | |
print("Disabling other outputs") | |
self.disable_other_outputs() | |
print("Disabling echos") | |
self.disable_echo() | |
self.setup_closed_loop() | |
self.run_query("CR") # Zero the Encoder Ticks | |
# Runtime | |
# returns fraction of | |
def get_radians_turned(self) -> Tuple[float, float]: | |
# TODO: Check for overflow | |
wheel_ticks = self.run_query("CR") # Read Encoder Counter Absolute | |
n1, n2 = map(int, wheel_ticks.split('')) | |
return 2 * pi * n1 / self._pulse_per_rev, 2 * pi * n2 / self._pulse_per_rev | |
def set_rpm(self, v1: Optional[float] = None, v2: Optional[float] = None, force=False): | |
self._set_rpm(v1=None if v1 is None else int(v1 * self._speed_scaler), | |
v2=None if v2 is None else int(v2 * self._speed_scaler), | |
force=force) | |
# HEDM => 1024 PPR | |
# Gear Ratio => 66 | |
# PPR = 67584 | |
"""The formula below gives the pulse frequency at a given RPM and encoder resolution in | |
Pulses per Revolution. | |
Pulse Frequency in counts per second = RPM / 60 * PPR * 4 | |
Connecting Sensors and Actuators to Input/Outputs | |
58 Advanced Digital Motor Controller User Manual V2.0, July 8, 2019 | |
Example: a motor spinning at 10,000 RPM max, with an encoder with 200 Pulses per Revolution would generate: | |
10,000 / 60 * 200 * 4 = 133.3 kHz which is well within the 1MHz maximum supported by | |
the encoder input. | |
An encoder with 200 Pulses per Revolutions is a good choice for most applications.""" | |
def main(): | |
c = Roboteq(port="/dev/tty.usbmodemSDC2XXX1", print_io=False) | |
c.setup() | |
c.print_io = True | |
speeds = [0, 1, 1, 0, -1, -1] | |
i = 0 | |
deg1, deg2 = 0, 0 | |
try: | |
while True: | |
start = time.time() | |
c.set_rpm(speeds[i], speeds[i]) | |
rad1, rad2 = c.get_radians_turned() | |
deg1 += 180 * rad1 / pi | |
deg2 += 180 * rad2 / pi | |
print(deg1, deg2, sep='\t') | |
print() | |
c.check_flags() | |
i += 1 | |
i %= len(speeds) | |
time.sleep(1. - (time.time() - start)) | |
except (KeyboardInterrupt, Exception) as e: | |
c.run_cmd("EX") | |
print("E-Stopped!") | |
if type(e) != KeyboardInterrupt: | |
raise e | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import rospy | |
from geometry_msgs.msg import Twist, Pose, Quaternion, Vector3, Point | |
from nav_msgs.msg import Odometry | |
from tf.transformations import quaternion_from_euler | |
from roboteq_drver import Roboteq | |
class RoboteqROSDriver: | |
wheel_dist: float = 0.67 | |
wheel_radius: float = 0.25 / 2 | |
def __init__(self, odom_topic: str = 'odom', cmd_vel_topic: str = 'cmd_vel', | |
rate: float = 10, | |
port: str = "/dev/ttyACM0", baud: int = 115200): | |
rospy.init_node('roboteq_ros_driver') | |
self.rate = rate | |
self.x, self.y, self.yaw = 0, 0, 0 | |
self.odom_pub = rospy.Publisher(odom_topic, Odometry, queue_size=20) | |
self.cmd_vel: Twist = Twist() | |
rospy.Subscriber(cmd_vel_topic, Twist, self.cmd_vel_callback) | |
# A speed scalar of n provides resolution of 1/n revolutions | |
# n must be a factor of `Roboteq._pulse_per_rev` | |
self.roboteq = Roboteq(port="/dev/ttyACM0", baud=115200, speed_scaler=1) | |
self.roboteq.setup() | |
rospy.loginfo(f"{rospy.get_name()} started") | |
def send_odom(self): | |
odom = Odometry() | |
odom.header.stamp = rospy.Time.now() | |
odom.header.frame_id = "odom" | |
odom.child_frame_id = "base_link" | |
# set the position | |
odom.pose.pose = Pose( | |
Point(self.x, self.y, 0.0), Quaternion(*quaternion_from_euler(0, 0, self.yaw)) | |
) | |
odom.pose.covariance[0] = 0.1 | |
odom.pose.covariance[1 + 6] = 0.1 | |
odom.pose.covariance[2 + 12] = 0.1 | |
odom.pose.covariance[5 + 30] = 0.1 | |
self.odom_pub.publish(odom) | |
def cmd_vel_callback(self, msg): | |
self.cmd_vel = msg | |
def run(self): | |
r = rospy.Rate(self.rate) | |
msg = Odometry() | |
msg.header.frame_id = "map" | |
while not rospy.is_shutdown(): | |
v, w = self.cmd_vel.linear.x, self.cmd_vel.angular.z | |
# Check omega sign | |
self.roboteq.set_rpm((v + w * self.wheel_dist / 2) / self.wheel_radius, | |
(v - w * self.wheel_dist / 2) / self.wheel_radius) | |
rad1, rad2 = self.roboteq.get_radians_turned() | |
s1, s2 = rad1 * self.wheel_radius, rad2 * self.wheel_radius | |
self.x += ((s1 + s2) / 2) * math.cos(self.yaw) | |
self.y += ((s1 + s2) / 2) * math.sin(self.yaw) | |
self.yaw += (s1 - s2) / self.wheel_dist | |
self.send_odom() | |
r.sleep() | |
def main(): | |
driver = RoboteqROSDriver() | |
driver.run() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment