Created
June 15, 2017 12:20
-
-
Save SamuelDudley/a016f6796f2fb448f183301e530ee4b9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
APM DataFlash log file reader | |
Copyright Andrew Tridgell 2011 | |
Released under GNU GPL version 3 or later | |
Partly based on SDLog2Parser by Anton Babushkin | |
''' | |
from __future__ import print_function | |
from builtins import range | |
from builtins import object | |
import struct | |
from . import mavutil | |
try: | |
long # Python 2 has long | |
except NameError: | |
long = int # But Python 3 does not | |
FORMAT_TO_STRUCT = { | |
"b": ("b", None, int), | |
"B": ("B", None, int), | |
"h": ("h", None, int), | |
"H": ("H", None, int), | |
"i": ("i", None, int), | |
"I": ("I", None, int), | |
"f": ("f", None, float), | |
"n": ("4s", None, str), | |
"N": ("16s", None, str), | |
"Z": ("64s", None, str), | |
"c": ("h", 0.01, float), | |
"C": ("H", 0.01, float), | |
"e": ("i", 0.01, float), | |
"E": ("I", 0.01, float), | |
"L": ("i", 1.0e-7, float), | |
"d": ("d", None, float), | |
"M": ("b", None, int), | |
"q": ("q", None, long), # Backward compat | |
"Q": ("Q", None, long), # Backward compat | |
} | |
class DFFormat(object): | |
def __init__(self, type, name, flen, format, columns): | |
self.type = type | |
self.name = name | |
self.len = flen | |
self.format = format | |
try: | |
self.columns = columns.decode().rstrip('\x00').split(',') | |
except Exception as e: | |
self.columns = columns.rstrip('\x00').split(',') | |
if self.columns == ['']: | |
self.columns = [] | |
msg_struct = "<" | |
msg_mults = [] | |
msg_types = [] | |
for c in format: | |
if c == 0: | |
break | |
try: | |
(s, mul, type) = FORMAT_TO_STRUCT[c] | |
msg_struct += s | |
msg_mults.append(mul) | |
msg_types.append(type) | |
except KeyError as e: | |
print("DFFormat: Unsupported format char: '%s' in message %s" % (c, name)) | |
raise Exception("Unsupported format char: '%s' in message %s" % (c, name)) | |
self.msg_struct = msg_struct | |
self.msg_types = msg_types | |
self.msg_mults = msg_mults | |
self.colhash = {} | |
for i in range(len(self.columns)): | |
self.colhash[self.columns[i]] = i | |
def __str__(self): | |
return "DFFormat(%s,%s,%s,%s)" % (self.type, self.name, self.format, self.columns) | |
def null_term(_str): | |
'''null terminate a string''' | |
try: | |
_str = _str.decode() | |
except Exception as e: | |
pass | |
return _str.rstrip('\x00') | |
class DFMessage(object): | |
def __init__(self, fmt, elements, apply_multiplier): | |
self.fmt = fmt | |
self._elements = elements | |
self._apply_multiplier = apply_multiplier | |
self._fieldnames = fmt.columns | |
def to_dict(self): | |
d = {'mavpackettype': self.fmt.name} | |
for field in self._fieldnames: | |
d[field] = self.__getattr__(field) | |
return d | |
def __getattr__(self, field): | |
'''override field getter''' | |
try: | |
i = self.fmt.colhash[field] | |
except Exception: | |
raise AttributeError(field) | |
v = self._elements[i] | |
if self.fmt.format[i] != 'M' or self._apply_multiplier: | |
v = self.fmt.msg_types[i](v) | |
if self.fmt.msg_types[i] == str: | |
v = null_term(v) | |
if self.fmt.msg_mults[i] is not None and self._apply_multiplier: | |
v *= self.fmt.msg_mults[i] | |
return v | |
def get_type(self): | |
return self.fmt.name | |
def __str__(self): | |
ret = "%s {" % self.fmt.name | |
col_count = 0 | |
for c in self.fmt.columns: | |
ret += "%s : %s, " % (c, self.__getattr__(c)) | |
col_count += 1 | |
if col_count != 0: | |
ret = ret[:-2] | |
return ret + '}' | |
def get_msgbuf(self): | |
'''create a binary message buffer for a message''' | |
values = [] | |
for i in range(len(self.fmt.columns)): | |
if i >= len(self.fmt.msg_mults): | |
continue | |
mul = self.fmt.msg_mults[i] | |
name = self.fmt.columns[i] | |
if name == 'Mode' and 'ModeNum' in self.fmt.columns: | |
name = 'ModeNum' | |
v = self.__getattr__(name) | |
if mul is not None: | |
v /= mul | |
values.append(v) | |
return struct.pack("BBB", 0xA3, 0x95, self.fmt.type) + struct.pack(self.fmt.msg_struct, *values) | |
def get_fieldnames(self): | |
return self._fieldnames | |
class DFReaderClock(object): | |
'''base class for all the different ways we count time in logs''' | |
def __init__(self): | |
self.set_timebase(0) | |
self.timestamp = 0 | |
def _gpsTimeToTime(self, week, msec): | |
'''convert GPS week and TOW to a time in seconds since 1970''' | |
epoch = 86400*(10*365 + (1980-1969)/4 + 1 + 6 - 2) | |
return epoch + 86400*7*week + msec*0.001 - 15 | |
def set_timebase(self, base): | |
self.timebase = base | |
def message_arrived(self, m): | |
pass | |
def rewind_event(self): | |
pass | |
class DFReaderClock_usec(DFReaderClock): | |
'''DFReaderClock_usec - use microsecond timestamps from messages''' | |
def __init__(self): | |
DFReaderClock.__init__(self) | |
def find_time_base(self, gps, first_us_stamp): | |
'''work out time basis for the log - even newer style''' | |
t = self._gpsTimeToTime(gps.GWk, gps.GMS) | |
self.set_timebase(t - gps.TimeUS*0.000001) | |
# this ensures FMT messages get appropriate timestamp: | |
self.timestamp = self.timebase + first_us_stamp*0.000001 | |
def type_has_good_TimeMS(self, type): | |
'''The TimeMS in some messages is not from *our* clock!''' | |
if type.startswith('ACC'): | |
return False; | |
if type.startswith('GYR'): | |
return False; | |
return True | |
def should_use_msec_field0(self, m): | |
if not self.type_has_good_TimeMS(m.get_type()): | |
return False | |
if 'TimeMS' != m._fieldnames[0]: | |
return False | |
if self.timebase + m.TimeMS*0.001 < self.timestamp: | |
return False | |
return True; | |
def set_message_timestamp(self, m): | |
if 'TimeUS' == m._fieldnames[0]: | |
# only format messages don't have a TimeUS in them... | |
m._timestamp = self.timebase + m.TimeUS*0.000001 | |
elif self.should_use_msec_field0(m): | |
# ... in theory. I expect there to be some logs which are not | |
# "pure": | |
m._timestamp = self.timebase + m.TimeMS*0.001 | |
else: | |
m._timestamp = self.timestamp | |
self.timestamp = m._timestamp | |
class DFReaderClock_msec(DFReaderClock): | |
'''DFReaderClock_msec - a format where many messages have TimeMS in their formats, and GPS messages have a "T" field giving msecs ''' | |
def find_time_base(self, gps, first_ms_stamp): | |
'''work out time basis for the log - new style''' | |
t = self._gpsTimeToTime(gps.Week, gps.TimeMS) | |
self.set_timebase(t - gps.T*0.001) | |
self.timestamp = self.timebase + first_ms_stamp*0.001 | |
def set_message_timestamp(self, m): | |
if 'TimeMS' == m._fieldnames[0]: | |
m._timestamp = self.timebase + m.TimeMS*0.001 | |
elif m.get_type() in ['GPS','GPS2']: | |
m._timestamp = self.timebase + m.T*0.001 | |
else: | |
m._timestamp = self.timestamp | |
self.timestamp = m._timestamp | |
class DFReaderClock_px4(DFReaderClock): | |
'''DFReaderClock_px4 - a format where a starting time is explicitly given in a message''' | |
def __init__(self): | |
DFReaderClock.__init__(self) | |
self.px4_timebase = 0 | |
def find_time_base(self, gps): | |
'''work out time basis for the log - PX4 native''' | |
t = gps.GPSTime * 1.0e-6 | |
self.timebase = t - self.px4_timebase | |
def set_px4_timebase(self, time_msg): | |
self.px4_timebase = time_msg.StartTime * 1.0e-6 | |
def set_message_timestamp(self, m): | |
m._timestamp = self.timebase + self.px4_timebase | |
def message_arrived(self, m): | |
type = m.get_type() | |
if type == 'TIME' and 'StartTime' in m._fieldnames: | |
self.set_px4_timebase(m) | |
class DFReaderClock_gps_interpolated(DFReaderClock): | |
'''DFReaderClock_gps_interpolated - for when the only real references in a message are GPS timestamps ''' | |
def __init__(self): | |
DFReaderClock.__init__(self) | |
self.msg_rate = {} | |
self.counts = {} | |
self.counts_since_gps = {} | |
def rewind_event(self): | |
'''reset counters on rewind''' | |
self.counts = {} | |
self.counts_since_gps = {} | |
def message_arrived(self, m): | |
type = m.get_type() | |
if not type in self.counts: | |
self.counts[type] = 1 | |
else: | |
self.counts[type] += 1 | |
# this preserves existing behaviour - but should we be doing this | |
# if type == 'GPS'? | |
if not type in self.counts_since_gps: | |
self.counts_since_gps[type] = 1 | |
else: | |
self.counts_since_gps[type] += 1 | |
if type == 'GPS' or type == 'GPS2': | |
self.gps_message_arrived(m) | |
def gps_message_arrived(self, m): | |
'''adjust time base from GPS message''' | |
# msec-style GPS message? | |
gps_week = getattr(m, 'Week', None) | |
gps_timems = getattr(m, 'TimeMS', None) | |
if gps_week is None: | |
# usec-style GPS message? | |
gps_week = getattr(m, 'GWk', None) | |
gps_timems = getattr(m, 'GMS', None) | |
if gps_week is None: | |
if getattr(m, 'GPSTime', None) is not None: | |
# PX4-style timestamp; we've only been called | |
# because we were speculatively created in case no | |
# better clock was found. | |
return; | |
t = self._gpsTimeToTime(gps_week, gps_timems) | |
deltat = t - self.timebase | |
if deltat <= 0: | |
return | |
for type in self.counts_since_gps: | |
rate = self.counts_since_gps[type] / deltat | |
if rate > self.msg_rate.get(type, 0): | |
self.msg_rate[type] = rate | |
self.msg_rate['IMU'] = 50.0 | |
self.timebase = t | |
self.counts_since_gps = {} | |
def set_message_timestamp(self, m): | |
rate = self.msg_rate.get(m.fmt.name, 50.0) | |
if int(rate) == 0: | |
rate = 50 | |
count = self.counts_since_gps.get(m.fmt.name, 0) | |
m._timestamp = self.timebase + count/rate | |
class DFReader(object): | |
'''parse a generic dataflash file''' | |
def __init__(self): | |
# read the whole file into memory for simplicity | |
self.clock = None | |
self.timestamp = 0 | |
self.mav_type = mavutil.mavlink.MAV_TYPE_FIXED_WING | |
self.verbose = False | |
self.params = {} | |
def _rewind(self): | |
'''reset state on rewind''' | |
self.messages = { 'MAV' : self } | |
self.flightmode = "UNKNOWN" | |
self.percent = 0 | |
if self.clock: | |
self.clock.rewind_event() | |
def init_clock_px4(self, px4_msg_time, px4_msg_gps): | |
self.clock = DFReaderClock_px4() | |
if not self._zero_time_base: | |
self.clock.set_px4_timebase(px4_msg_time) | |
self.clock.find_time_base(px4_msg_gps) | |
return True | |
def init_clock_msec(self): | |
# it is a new style flash log with full timestamps | |
self.clock = DFReaderClock_msec() | |
def init_clock_usec(self): | |
self.clock = DFReaderClock_usec() | |
def init_clock_gps_interpolated(self, clock): | |
self.clock = clock | |
def init_clock(self): | |
'''work out time basis for the log''' | |
self._rewind() | |
# speculatively create a gps clock in case we don't find anything | |
# better | |
gps_clock = DFReaderClock_gps_interpolated() | |
self.clock = gps_clock | |
px4_msg_time = None | |
px4_msg_gps = None | |
gps_interp_msg_gps1 = None | |
first_us_stamp = None | |
first_ms_stamp = None | |
have_good_clock = False | |
while True: | |
m = self.recv_msg() | |
if m is None: | |
break; | |
type = m.get_type() | |
if first_us_stamp is None: | |
first_us_stamp = getattr(m, "TimeUS", None); | |
if first_ms_stamp is None and (type != 'GPS' and type != 'GPS2'): | |
# Older GPS messages use TimeMS for msecs past start | |
# of gps week | |
first_ms_stamp = getattr(m, "TimeMS", None); | |
if type == 'GPS' or type == 'GPS2': | |
if getattr(m, "TimeUS", 0) != 0 and \ | |
getattr(m, "GWk", 0) != 0: # everything-usec-timestamped | |
self.init_clock_usec() | |
if not self._zero_time_base: | |
self.clock.find_time_base(m, first_us_stamp) | |
have_good_clock = True | |
break | |
if getattr(m, "T", 0) != 0 and \ | |
getattr(m, "Week", 0) != 0: # GPS is msec-timestamped | |
if first_ms_stamp is None: | |
first_ms_stamp = m.T | |
self.init_clock_msec() | |
if not self._zero_time_base: | |
self.clock.find_time_base(m, first_ms_stamp) | |
have_good_clock = True | |
break | |
if getattr(m, "GPSTime", 0) != 0: # px4-style-only | |
px4_msg_gps = m | |
if getattr(m, "Week", 0) != 0: | |
if gps_interp_msg_gps1 is not None and \ | |
(gps_interp_msg_gps1.TimeMS != m.TimeMS or \ | |
gps_interp_msg_gps1.Week != m.Week): | |
# we've received two distinct, non-zero GPS | |
# packets without finding a decent clock to | |
# use; fall back to interpolation. Q: should | |
# we wait a few more messages befoe doing | |
# this? | |
self.init_clock_gps_interpolated(gps_clock) | |
have_good_clock = True | |
break | |
gps_interp_msg_gps1 = m | |
elif type == 'TIME': | |
'''only px4-style logs use TIME''' | |
if getattr(m, "StartTime", None) != None: | |
px4_msg_time = m; | |
if px4_msg_time is not None and px4_msg_gps is not None: | |
self.init_clock_px4(px4_msg_time, px4_msg_gps) | |
have_good_clock = True | |
break | |
# print("clock is " + str(self.clock)) | |
if not have_good_clock: | |
# we failed to find any GPS messages to set a time | |
# base for usec and msec clocks. Also, not a | |
# PX4-style log | |
if first_us_stamp is not None: | |
self.init_clock_usec() | |
elif first_ms_stamp is not None: | |
self.init_clock_msec() | |
self._rewind() | |
return | |
def _set_time(self, m): | |
'''set time for a message''' | |
# really just left here for profiling | |
m._timestamp = self.timestamp | |
if len(m._fieldnames) > 0 and self.clock is not None: | |
self.clock.set_message_timestamp(m) | |
def recv_msg(self): | |
return self._parse_next() | |
def _add_msg(self, m): | |
'''add a new message''' | |
type = m.get_type() | |
self.messages[type] = m | |
if self.clock: | |
self.clock.message_arrived(m) | |
if type == 'MSG': | |
if m.Message.find("Rover") != -1: | |
self.mav_type = mavutil.mavlink.MAV_TYPE_GROUND_ROVER | |
elif m.Message.find("Plane") != -1: | |
self.mav_type = mavutil.mavlink.MAV_TYPE_FIXED_WING | |
elif m.Message.find("Copter") != -1: | |
self.mav_type = mavutil.mavlink.MAV_TYPE_QUADROTOR | |
elif m.Message.startswith("Antenna"): | |
self.mav_type = mavutil.mavlink.MAV_TYPE_ANTENNA_TRACKER | |
if type == 'MODE': | |
if isinstance(m.Mode, str): | |
self.flightmode = m.Mode.upper() | |
elif 'ModeNum' in m._fieldnames: | |
mapping = mavutil.mode_mapping_bynumber(self.mav_type) | |
if mapping is not None and m.ModeNum in mapping: | |
self.flightmode = mapping[m.ModeNum] | |
else: | |
self.flightmode = mavutil.mode_string_acm(m.Mode) | |
if type == 'STAT' and 'MainState' in m._fieldnames: | |
self.flightmode = mavutil.mode_string_px4(m.MainState) | |
if type == 'PARM' and getattr(m, 'Name', None) is not None: | |
self.params[m.Name] = m.Value | |
self._set_time(m) | |
def recv_match(self, condition=None, type=None, blocking=False): | |
'''recv the next message that matches the given condition | |
type can be a string or a list of strings''' | |
if type is not None and not isinstance(type, list): | |
type = [type] | |
while True: | |
m = self.recv_msg() | |
if m is None: | |
return None | |
if type is not None and not m.get_type() in type: | |
continue | |
if not mavutil.evaluate_condition(condition, self.messages): | |
continue | |
return m | |
def check_condition(self, condition): | |
'''check if a condition is true''' | |
return mavutil.evaluate_condition(condition, self.messages) | |
def param(self, name, default=None): | |
'''convenient function for returning an arbitrary MAVLink | |
parameter with a default''' | |
if not name in self.params: | |
return default | |
return self.params[name] | |
class DFReader_binary(DFReader): | |
'''parse a binary dataflash file''' | |
def __init__(self, filename, zero_time_base=False): | |
DFReader.__init__(self) | |
# read the whole file into memory for simplicity | |
f = open(filename, mode='rb') | |
self.data = f.read() | |
self.data_len = len(self.data) | |
f.close() | |
self.HEAD1 = 0xA3 | |
self.HEAD2 = 0x95 | |
self.formats = { | |
0x80 : DFFormat(0x80, 'FMT', 89, 'BBnNZ', "Type,Length,Name,Format,Columns") | |
} | |
self._zero_time_base = zero_time_base | |
self.init_clock() | |
self._rewind() | |
def _rewind(self): | |
'''rewind to start of log''' | |
DFReader._rewind(self) | |
self.offset = 0 | |
self.remaining = self.data_len | |
def _parse_next(self): | |
'''read one message, returning it as an object''' | |
if self.data_len - self.offset < 3: | |
return None | |
hdr = self.data[self.offset:self.offset+3] | |
skip_bytes = 0 | |
skip_type = None | |
# skip over bad messages | |
while (hdr[0] != self.HEAD1 or hdr[1] != self.HEAD2 or hdr[2] not in self.formats): | |
if skip_type is None: | |
skip_type = (hdr[0], hdr[1], hdr[2]) | |
skip_start = self.offset | |
skip_bytes += 1 | |
self.offset += 1 | |
if self.data_len - self.offset < 3: | |
return None | |
hdr = self.data[self.offset:self.offset+3] | |
msg_type = hdr[2] | |
if skip_bytes != 0: | |
if self.remaining < 528: | |
return None | |
print("Skipped %u bad bytes in log at offset %u, type=%s" % (skip_bytes, skip_start, skip_type)) | |
self.remaining -= skip_bytes | |
self.offset += 3 | |
self.remaining -= 3 | |
if not msg_type in self.formats: | |
if self.verbose: | |
print("unknown message type %02x" % msg_type) | |
raise Exception("Unknown message type %02x" % msg_type) | |
fmt = self.formats[msg_type] | |
if self.remaining < fmt.len-3: | |
# out of data - can often happen half way through a message | |
if self.verbose: | |
print("out of data") | |
return None | |
body = self.data[self.offset:self.offset+(fmt.len-3)] | |
elements = None | |
try: | |
elements = list(struct.unpack(fmt.msg_struct, body)) | |
except Exception: | |
if self.remaining < 528: | |
# we can have garbage at the end of an APM2 log | |
return None | |
# we should also cope with other corruption; logs | |
# transfered via DataFlash_MAVLink may have blocks of 0s | |
# in them, for example | |
print("Failed to parse %s/%s with len %u (remaining %u)" % (fmt.name, fmt.msg_struct, len(body), self.remaining)) | |
if elements is None: | |
return self._parse_next() | |
name = null_term(fmt.name) | |
if name == 'FMT': | |
# add to formats | |
# name, len, format, headings | |
try: | |
self.formats[elements[0]] = DFFormat(elements[0], | |
null_term(elements[2]), elements[1], | |
null_term(elements[3]), null_term(elements[4])) | |
except Exception: | |
return self._parse_next() | |
self.offset += fmt.len-3 | |
self.remaining -= fmt.len-3 | |
m = DFMessage(fmt, elements, True) | |
self._add_msg(m) | |
self.percent = 100.0 * (self.offset / float(self.data_len)) | |
return m | |
def DFReader_is_text_log(filename): | |
'''return True if a file appears to be a valid text log''' | |
f = open(filename) | |
ret = (f.read(8000).find('FMT, ') != -1) | |
f.close() | |
return ret | |
class DFReader_text(DFReader): | |
'''parse a text dataflash file''' | |
def __init__(self, filename, zero_time_base=False): | |
DFReader.__init__(self) | |
# read the whole file into memory for simplicity | |
f = open(filename, mode='r') | |
self.lines = f.readlines() | |
f.close() | |
self.formats = { | |
'FMT' : DFFormat(0x80, 'FMT', 89, 'BBnNZ', "Type,Length,Name,Format,Columns") | |
} | |
self._rewind() | |
self._zero_time_base = zero_time_base | |
self.init_clock() | |
self._rewind() | |
def _rewind(self): | |
'''rewind to start of log''' | |
DFReader._rewind(self) | |
self.line = 0 | |
# find the first valid line | |
while self.line < len(self.lines): | |
if self.lines[self.line].startswith("FMT, "): | |
break | |
self.line += 1 | |
def _parse_next(self): | |
'''read one message, returning it as an object''' | |
this_line = self.line | |
while self.line < len(self.lines): | |
s = self.lines[self.line].rstrip() | |
elements = s.split(", ") | |
this_line = self.line | |
# move to next line | |
self.line += 1 | |
if len(elements) >= 2: | |
# this_line is good | |
break | |
if this_line >= len(self.lines): | |
return None | |
# cope with empty structures | |
if len(elements) == 5 and elements[-1] == ',': | |
elements[-1] = '' | |
elements.append('') | |
self.percent = 100.0 * (this_line / float(len(self.lines))) | |
msg_type = elements[0] | |
if not msg_type in self.formats: | |
return self._parse_next() | |
fmt = self.formats[msg_type] | |
if len(elements) < len(fmt.format)+1: | |
# not enough columns | |
return self._parse_next() | |
elements = elements[1:] | |
name = fmt.name.rstrip('\0') | |
if name == 'FMT': | |
# add to formats | |
# name, len, format, headings | |
self.formats[elements[2]] = DFFormat(int(elements[0]), elements[2], int(elements[1]), elements[3], elements[4]) | |
try: | |
m = DFMessage(fmt, elements, False) | |
except ValueError: | |
return self._parse_next() | |
self._add_msg(m) | |
return m | |
if __name__ == "__main__": | |
import sys | |
use_profiler = False | |
if use_profiler: | |
from line_profiler import LineProfiler | |
profiler = LineProfiler() | |
profiler.add_function(DFReader_binary._parse_next) | |
profiler.add_function(DFReader_binary._add_msg) | |
profiler.add_function(DFReader._set_time) | |
profiler.enable_by_count() | |
filename = sys.argv[1] | |
if filename.endswith('.log'): | |
log = DFReader_text(filename) | |
else: | |
log = DFReader_binary(filename) | |
while True: | |
m = log.recv_msg() | |
if m is None: | |
break | |
#print(m) | |
if use_profiler: | |
profiler.print_stats() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment