Created
April 11, 2018 18:55
-
-
Save njoubert/355cd76a6cdbcb4c480d373bd1b5eb77 to your computer and use it in GitHub Desktop.
kleeman sbp analysis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import logging | |
import argparse | |
import itertools | |
import numpy as np | |
from gnss_analysis.io import log_utils | |
from gnss_analysis.io import sbp_utils | |
logger = logging.getLogger(os.path.basename(__file__)) | |
_sbp_message_types = {"piksi_solution_time": 258, | |
"piksi_absolute_ecef": 521} | |
def temporal_slice(log_iterator, start_time, end_time): | |
current_epoch = None | |
for msg, data in log_iterator: | |
if msg.msg_type == _sbp_message_types['piksi_solution_time']: | |
key, solution_time = sbp_utils._processors[msg.msg_type](msg, data) | |
current_epoch = solution_time['solution_time(gpst)'] | |
if current_epoch is not None and current_epoch > end_time: | |
break | |
if current_epoch is not None and current_epoch >= start_time: | |
yield msg, data | |
def filter_message_types(log_iterator, msg_types=None): | |
""" | |
Loops over a log and generates tuples of field names | |
and dictionaries containing the corresponding messages. | |
""" | |
for msg, data in log_iterator: | |
if msg_types is None or msg.msg_type in msg_types: | |
try: | |
processor = sbp_utils._processors.get(msg.msg_type, | |
sbp_utils.flattened_generic_processor) | |
key, obs = processor(msg, data) | |
except BadSBPMessageException, e: | |
logger.warn(e) | |
continue | |
# make sure the Observations contains data. | |
if not len(obs): | |
logger.debug("Found a %s observation, but it didn't contain data" | |
% key) | |
continue | |
yield key, obs | |
if __name__ == "__main__": | |
p = argparse.ArgumentParser() | |
p.add_argument("input") | |
p.add_argument("start", type=np.datetime64) | |
p.add_argument("end", type=np.datetime64) | |
args = p.parse_args() | |
msg_types = [_sbp_message_types['piksi_absolute_ecef']] | |
log_file = args.input | |
log_iterator = log_utils.complete_messages_only(log_file) | |
log_iterator = temporal_slice(log_iterator, args.start, args.end) | |
log_iterator = filter_message_types(log_iterator, msg_types) | |
for i, (key, msg) in itertools.izip(range(100), log_iterator): | |
print msg |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment