Created
June 21, 2022 07:18
-
-
Save ES-Alexander/d679252763b0b94f23e17a8a17823b5a to your computer and use it in GitHub Desktop.
Minimal example for course tracking of a GPS-enabled MAVLink vehicle, using a low-pass IIR filter for averaging values over time.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from pymavlink import mavutil, mavlink | |
class CourseAverager: | |
MSG_TYPE = 'GLOBAL_POSITION_INT' | |
def __init__(self, mavlink_connection, request_rate=1, decay_rate=0.95, initial_course=(0, 0)): | |
self.master = mavlink_connection | |
self.request_message(request_rate) | |
self.decay_rate = decay_rate | |
self._remainder = 1 - decay_rate | |
self.average_course = list(initial_values) | |
def request_message(self, request_rate): | |
if not request_rate: | |
return | |
# convert Hz to microsecond period | |
interval = int(1e6 / request_rate) | |
# send request | |
self.master.command_long_send( | |
self.master.target_system, self.master.target_component, | |
mavlink.MAV_CMD_SET_MESSAGE_INTERVAL, | |
0, # confirmation | |
getattr(mavlink, f'MAVLINK_MSG_ID_{self.MSG_TYPE}'), interval, | |
0, 0, 0, 0, 0) # *unused, response_target | |
# TODO: ensure command was received and accepted (per Command Protocol) | |
# Generally not essential, but doing so makes the program more robust | |
# (e.g. in case the message was lost on the way) | |
def update_course(self, message): | |
for index, axis in enumerate('xy'): | |
new_velocity = getattr(message, f'v{axis}') | |
old_average = self.average_course[index] | |
# Infinite impulse response (IIR) low-pass (averaging) filter | |
# -> every past sample matters, but recent ones matter more than old ones | |
# -> minimal memory and calculation overhead | |
self.average_course[index] = ( | |
old_average * self.decay_rate | |
+ new_velocity * self._remainder | |
) | |
if __name__ == '__main__': | |
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter | |
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) | |
parser.add_argument('-p', '--port', default='14660', | |
help='MAVLink connection specifier (UDP client port)') | |
parser.add_argument('-r', '--request_rate', default=1, type=float, | |
help='GPS message request rate in Hertz. Lower -> less CPU usage but more missing data.') | |
parser.add_argument('--decay_rate', default=0.95, type=float, | |
help='Proportional decay at each timestep. Smaller -> earlier samples matter less.') | |
args = parser.parse_args() | |
master = mavutil.mavlink_connection(f'udpin:0.0.0.0:{args.port}') | |
master.wait_heartbeat() | |
course_averager = CourseAverager(master, args.request_rate, args.decay_rate) | |
output_schedule = mavutil.periodic_event(1/5) # trigger once every 5 seconds | |
output_schedule.force() # make sure it triggers once at the start | |
# TODO: create (and use) schedule for sending HEARTBEAT messages, like in | |
# https://discuss.bluerobotics.com/t/heartbeat/10940/6 | |
# Likely not essential if the only connection to an endpoint, but the MAVLink | |
# protocol technically requires every component to send a regular heartbeat, | |
# and some proxy managers may kick silent connections to reduce CPU usage. | |
while "program running": | |
if output_schedule.trigger(): | |
print('Course:', course_averager.average_course) | |
msg = master.recv_match(type=course_averager.MSG_TYPE, blocking=True) | |
course_averager.update_course(msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment