Last active
September 21, 2021 09:43
-
-
Save rosterloh/7c15359aee3f221c792dada381507546 to your computer and use it in GitHub Desktop.
Cli application to test dynamixel servos
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import atexit | |
import argparse | |
from datetime import datetime, timedelta | |
from dynamixel_sdk import PortHandler | |
from dynamixel_sdk.robotis_def import COMM_SUCCESS | |
from dynamixel_sdk.packet_handler import PacketHandler | |
import logging | |
from logging.handlers import TimedRotatingFileHandler | |
import os | |
import sys | |
from time import sleep, time | |
ADDR_OPERATING_MODE = 11 | |
ADDR_TORQUE_ENABLE = 64 | |
ADDR_PROFILE_VELOCITY = 112 | |
ADDR_GOAL_POSITION = 116 | |
ADDR_MOVING = 122 | |
ADDR_CURRENT = 126 | |
ADDR_PRESENT_POSITION = 132 | |
ADDR_PRESENT_VOLTAGE = 144 | |
ADDR_PRESENT_TEMPERATURE = 146 | |
POSITION_CONTROL_MODE = 3 | |
EXT_POSITION_CONTROL_MODE = 4 | |
TORQUE_ENABLE = 1 | |
TORQUE_DISABLE = 0 | |
MAX_VELOCITY = 534 # 120 rev/min at 0.229 [rev/min] | |
class DynamixelTester: | |
def __init__(self, args): | |
self.init_logging(args) | |
self.devices = [] | |
self.currents = [] | |
self.voltages = [] | |
self.temperatures = [] | |
self.test_number = 0 | |
if self.init_motors(args): | |
atexit.register(self.cleanup) | |
if args.test.upper() == "A": | |
if args.motor1 is None: | |
raise RuntimeError("No motor ID given for motor 1") | |
logging.info("Running test type A") | |
test_start = datetime.now() | |
test_end = test_start + timedelta(days=3) | |
while datetime.now() < test_end: | |
self.test_number += 1 | |
self.full_rotation(self.devices[0]) | |
self.get_test_stats() | |
logging.info(f'Test completed {self.test_number} iterations in {str(test_end - test_start)}') | |
elif args.test.upper() == "B": | |
if args.motor2 is None: | |
raise RuntimeError("No motor ID given for motor 2") | |
logging.info("Running test type B") | |
test_start = datetime.now() | |
test_end = test_start + timedelta(days=3) | |
if len(self.devices) == 2: | |
mid = self.devices[1] | |
else: | |
mid = args.motor2 | |
while datetime.now() < test_end: | |
self.test_number += 1 | |
self.full_rotation(mid) | |
self.get_test_stats() | |
logging.info(f'Test completed {self.test_number} iterations in {str(test_end - test_start)}') | |
elif args.test.upper() == "C": | |
if args.motor1 is None: | |
raise RuntimeError("No motor ID given for motor 1") | |
if args.motor2 is None: | |
raise RuntimeError("No motor ID given for motor 2") | |
logging.info("Running test type C") | |
test_start = datetime.now() | |
test_end = test_start + timedelta(days=3) | |
pan_id = args.motor1 | |
tilt_id = args.motor2 | |
pan_pose = self.degrees_to_position(10) # 10 <-> 350 degrees | |
tilt_pose = self.degrees_to_position(90) # 90 <-> -90 degrees | |
self.move(pan_id, pan_pose) | |
self.move(tilt_id, tilt_pose) | |
while self.is_moving(pan_id) or self.is_moving(tilt_id): | |
pass | |
while datetime.now() < test_end: | |
self.test_number += 1 | |
start_time = time() | |
for i in range(40): | |
if i < 20: | |
pan_pose += self.degrees_to_position(17) | |
tilt_pose -= self.degrees_to_position(9) | |
else: | |
pan_pose -= self.degrees_to_position(17) | |
tilt_pose += self.degrees_to_position(9) | |
self.move(pan_id, pan_pose) | |
self.move(tilt_id, tilt_pose) | |
while self.is_moving(pan_id) or self.is_moving(tilt_id): | |
self.get_status(pan_id) | |
self.get_status(tilt_id) | |
sleep(0.05) | |
logging.debug(f'Motors at {self.position_to_degrees(pan_pose)} {self.position_to_degrees(tilt_pose)}') | |
logging.info(f'Test {self.test_number} took {time()-start_time:.2f} s') | |
self.get_test_stats() | |
logging.info(f'Test completed {self.test_number} iterations in {str(test_end - test_start)}') | |
else: | |
logging.error("Unknown test type") | |
def init_logging(self, args): | |
if not os.path.exists('logs'): | |
os.makedirs('logs', exist_ok=True) | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s.%(msecs)03d :: %(levelname)s :: %(message)s', | |
datefmt="%H:%M:%S" | |
) | |
fh = TimedRotatingFileHandler(f'logs/{args.device.replace("/dev/", "")}-Test{args.test}.log', when='D') | |
fh.setLevel(logging.DEBUG) | |
fh_format = logging.Formatter('%(asctime)s.%(msecs)d :: %(levelname)s :: %(name)s :: %(message)s :: %(lineno)d', datefmt="%H:%M:%S") | |
fh.setFormatter(fh_format) | |
logging.getLogger().addHandler(fh) | |
def init_motors(self, args): | |
try: | |
self.portHandler = PortHandler(args.device) | |
self.packetHandler = PacketHandler(2.0) | |
if not self.portHandler.openPort(): | |
raise RuntimeError('Unable to communicate with serial device') | |
if not self.portHandler.setBaudRate(57600): | |
raise RuntimeError('Unable to set baudrate') | |
for m in [args.motor1, args.motor2]: | |
if m is not None: | |
_, result, err = self.packetHandler.ping(self.portHandler, m) | |
if result != COMM_SUCCESS or err != 0: | |
logging.error(f'Failed to find motor with ID {m}. [{result} {err}]') | |
self.scan_for_motors() | |
return False | |
else: | |
result, err = self.packetHandler.write1ByteTxRx(self.portHandler, m, ADDR_TORQUE_ENABLE, TORQUE_DISABLE) | |
self.check_packet(result, err) | |
# Enable motor | |
result, err = self.packetHandler.write1ByteTxRx(self.portHandler, m, ADDR_OPERATING_MODE, EXT_POSITION_CONTROL_MODE) | |
if not self.check_packet(result, err): | |
raise RuntimeError('Could not set opperating mode') | |
# Set Profile Velocity to 120 rev/min | |
result, err = self.packetHandler.write4ByteTxRx(self.portHandler, m, ADDR_PROFILE_VELOCITY, MAX_VELOCITY) | |
if not self.check_packet(result, err): | |
raise RuntimeError('Could not set opperating mode') | |
result, err = self.packetHandler.write1ByteTxRx(self.portHandler, m, ADDR_TORQUE_ENABLE, TORQUE_ENABLE) | |
if not self.check_packet(result, err): | |
raise RuntimeError('Could not enable motor torque') | |
position, result, err = self.packetHandler.read4ByteTxRx(self.portHandler, m, ADDR_PRESENT_POSITION) | |
if not self.check_packet(result, err): | |
raise RuntimeError('Could not read motor position') | |
else: | |
logging.info(f'Motor with ID {m} is ready at position {position}') | |
self.devices.append(m) | |
except Exception as e: | |
logging.error(f'Failed to init motor connection. {e}') | |
return False | |
return True | |
def scan_for_motors(self): | |
logging.info('Scanning bus for motors') | |
device_ids = [] | |
for i in range(0, 252): | |
id, result, err = self.packetHandler.ping(self.portHandler, i) | |
if result == COMM_SUCCESS and err == 0: | |
if id == 1190: | |
mtype = f'XL330-M077 ({id})' | |
elif id == 1200: | |
mtype = f'XL330-M288 ({id})' | |
else: | |
mtype = f'Unknown ({id})' | |
logging.info(f'Motor found at ID {i} of type {mtype}') | |
device_ids.append(i) | |
if len(device_ids) == 0: | |
logging.warning('No devices found') | |
def check_packet(self, dxl_comm_result, dxl_error): | |
if dxl_comm_result != COMM_SUCCESS: | |
logging.error(f'{self.packetHandler.getTxRxResult(dxl_comm_result)}') | |
return False | |
elif dxl_error != 0: | |
logging.error(f'{self.packetHandler.getRxPacketError(dxl_error)}') | |
return False | |
return True | |
def cleanup(self): | |
for m in self.devices: | |
result, err = self.packetHandler.write1ByteTxRx(self.portHandler, m, ADDR_TORQUE_ENABLE, TORQUE_DISABLE) | |
self.check_packet(result, err) | |
self.portHandler.closePort() | |
def position_to_degrees(self, position): | |
return round((position / 4096) * 360, 2) | |
def degrees_to_position(self, angle): | |
return round((angle / 360) * 4096) | |
def move(self, id, position): | |
logging.debug(f'Moving {id} to {self.position_to_degrees(position)} degrees') | |
result, err = self.packetHandler.write4ByteTxRx(self.portHandler, id, ADDR_GOAL_POSITION, position) | |
return self.check_packet(result, err) | |
def get_position(self, id): | |
position, result, err = self.packetHandler.read4ByteTxRx(self.portHandler, id, ADDR_PRESENT_POSITION) | |
if not self.check_packet(result, err): | |
return -1 | |
else: | |
return position | |
def is_moving(self, id): | |
moving, result, err = self.packetHandler.read1ByteTxRx(self.portHandler, id, ADDR_MOVING) | |
if not self.check_packet(result, err): | |
return False | |
else: | |
return moving == 1 | |
# TODO: Convert this to a GroupRead | |
def get_status(self, id): | |
# Current value is between +/- Current Limit(38) default 1750 | |
current, result, err = self.packetHandler.read2ByteTxRx(self.portHandler, id, ADDR_CURRENT) | |
if self.check_packet(result, err): | |
if current > 1750: | |
current -= 0xFFFF | |
self.currents.append(current) | |
# Voltage is between Min Voltage Limit(32) and Max Voltage Limit(34) in units of 0.1 [V] | |
volts, result, err = self.packetHandler.read2ByteTxRx(self.portHandler, id, ADDR_PRESENT_VOLTAGE) | |
if self.check_packet(result, err): | |
self.voltages.append(volts) | |
temp, result, err = self.packetHandler.read1ByteTxRx(self.portHandler, id, ADDR_PRESENT_TEMPERATURE) | |
if self.check_packet(result, err): | |
self.temperatures.append(temp) | |
def full_rotation(self, mid): | |
for sp in [4095, 0]: | |
start_time = time() | |
self.move(mid, sp) | |
sleep(0.05) | |
while (self.is_moving(mid)): | |
self.get_status(mid) | |
sleep(0.1) | |
logging.debug(f'Move took {int((time()-start_time)*1e3)} ms') | |
sleep(0.1) | |
def get_test_stats(self): | |
if len(self.currents) > 0: | |
abs_i = [abs(i) for i in self.currents] | |
max_i = max([max(self.currents), min(abs_i)]) | |
mean_i = sum(abs_i) / len(self.currents) | |
logging.info(f'CURRENT: max: {max_i:.2f} mean {mean_i:.2f}') | |
if len(self.voltages) > 0: | |
max_v = max(self.voltages) | |
mean_v = sum(self.voltages) / len(self.voltages) | |
logging.info(f'VOLTAGE: max: {max_v/10:.2f} mean {mean_v/10:.2f}') | |
if len(self.temperatures) > 0: | |
max_t = max(self.temperatures) | |
mean_t = sum(self.temperatures) / len(self.temperatures) | |
logging.info(f'TEMPERATURE: max {max_t:.1f} mean {mean_t:.1f}') | |
self.currents.clear() | |
self.voltages.clear() | |
self.temperatures.clear() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description="Gardin Dynamixel Tester") | |
parser.add_argument("-D", "--device", help="Serial device", required=True) | |
parser.add_argument("-m1", "--motor1", type=int, help="Pan Motor ID") | |
parser.add_argument("-m2", "--motor2", type=int, help="Tilt Motor ID") | |
parser.add_argument("-t", "--test", help="Test to run", default="A") | |
args = parser.parse_args() | |
try: | |
test = DynamixelTester(args) | |
except Exception as e: | |
logging.error(f'Exception event... error is: {e}') | |
except KeyboardInterrupt: | |
logging.info("Terminating script") | |
finally: | |
logging.info("Script Exit") | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment