Created
October 3, 2019 05:11
-
-
Save ligfx/4fc0731ee4989135b611db6765624128 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
FIELD_BASE_TYPES_TO_NAMES = { | |
0x0: 'enum', | |
0x1: 'sint8', | |
0x2: 'uint8', | |
0x7: 'string', | |
0x84: 'uint16', | |
0x85: 'sint32', | |
0x86: 'uint32', | |
0x8c: 'uint32z', | |
} | |
FIELD_NAME_TO_SIZE = { | |
'enum': 1, | |
'sint8': 1, | |
'uint8': 1, | |
'string': 1, | |
'uint16': 2, | |
'uint32': 4, | |
'uint32z': 4, | |
} | |
GLOBAL_MESSAGE_NUMBER_TO_NAME = { | |
0: "file_id", | |
1: "capabilities", | |
2: "device_settings", | |
3: "user_profile", | |
4: "hrm_profile", | |
5: "sdm_profile", | |
6: "bike_profile", | |
7: "zones_target", | |
8: "hr_zone", | |
9: "power_zone", | |
10: "met_zone", | |
12: "sport", | |
15: "goal", | |
18: "session", | |
19: "lap", | |
20: "record", | |
21: "event", | |
23: "device_info", | |
26: "workout", | |
27: "workout_step", | |
28: "schedule", | |
30: "weight_scale", | |
31: "course", | |
32: "course_point", | |
33: "totals", | |
34: "activity", | |
35: "software", | |
37: "file_capabilities", | |
38: "mesg_capabilities", | |
39: "field_capabilities", | |
49: "file_creator", | |
51: "blood_pressure", | |
53: "speed_zone", | |
55: "monitoring", | |
72: "training_file", | |
78: "hrv", | |
80: "ant_rx", | |
81: "ant_tx", | |
82: "ant_channel_id", | |
101: "length", | |
103: "monitoring_info", | |
105: "pad", | |
106: "slave_device", | |
127: "connectivity", | |
128: "weather_conditions", | |
129: "weather_alert", | |
131: "cadence_zone", | |
132: "hr", | |
142: "segment_lap", | |
145: "memo_glob", | |
148: "segment_id", | |
149: "segment_leaderboard_entry", | |
150: "segment_point", | |
151: "segment_file", | |
158: "workout_session", | |
159: "watchface_settings", | |
160: "gps_metadata", | |
161: "camera_event", | |
162: "timestamp_correlation", | |
164: "gyroscope_data", | |
165: "accelerometer_data", | |
167: "three_d_sensor_calibration", | |
169: "video_frame", | |
174: "obdii_data", | |
177: "nmea_sentence", | |
178: "aviation_attitude", | |
184: "video", | |
185: "video_title", | |
186: "video_description", | |
187: "video_clip", | |
188: "ohr_settings", | |
200: "exd_screen_configuration", | |
201: "exd_data_field_configuration", | |
202: "exd_data_concept_configuration", | |
206: "field_description", | |
207: "developer_data_id", | |
208: "magnetometer_data", | |
209: "barometer_data", | |
210: "one_d_sensor_calibration", | |
225: "set", | |
227: "stress_level", | |
258: "dive_settings", | |
259: "dive_gas", | |
262: "dive_alarm", | |
264: "exercise_title", | |
268: "dive_summary", | |
285: "jump", | |
} | |
def global_message_number_to_name(global_message_number): | |
return GLOBAL_MESSAGE_NUMBER_TO_NAME.get(global_message_number, str(global_message_number)) | |
class Reader: | |
def __init__(self, data): | |
self._data = data | |
self._p = 0 | |
self._definitions = {} | |
def pos(self): | |
return self._p | |
def remaining(self): | |
return len(self._data) - self._p | |
def peek(self, n): | |
if n == 1: | |
val = self._data[self._p] | |
else: | |
val = self._data[self._p:self._p + n] | |
return val | |
def read(self, n): | |
val = self.peek(n) | |
self._p += n | |
return val | |
def add_definition(self, local_message_type, global_message_number, fields): | |
self._definitions[local_message_type] = (global_message_number, fields) | |
def get_definition_fields(self, local_message_type): | |
return self._definitions[local_message_type][1] | |
def get_global_message_number(self, local_message_type): | |
return self._definitions[local_message_type][0] | |
def parse_field(reader, field_type, field_size): | |
buf = reader.read(field_size) | |
if field_type in ('enum', 'uint8', 'sint8'): | |
return buf | |
if field_type in ('uint16',): | |
return struct.unpack('<H', buf)[0] | |
if field_type in ('sint16',): | |
return struct.unpack('<h', buf)[0] | |
if field_type in ('uint32', 'uint32z'): | |
return struct.unpack('<I', buf)[0] | |
if field_type in ('sint32',): | |
return struct.unpack('<i', buf)[0] | |
if field_type == 'string': | |
return buf | |
raise Exception(field_type) | |
def get_field_name(global_message_number, field_id): | |
return { | |
'event': { | |
0: "event", | |
1: "event_type", | |
2: "data16", | |
3: "data", | |
4: "event_group", | |
7: "score", | |
8: "opponent_score", | |
9: "front_gear_num", | |
10: "front_gear", | |
11: "rear_gear_num", | |
12: "rear_gear", | |
13: "device_index", | |
}, | |
'file_id': { | |
0: "type", | |
1: "manufacturer", | |
2: "product", | |
3: "serial_number", | |
4: "time_created", | |
5: "number", | |
8: "product_name", | |
}, | |
'lap': { | |
254: "message_index", | |
253: "timestamp", | |
0: "event", | |
1: "event_type", | |
2: "start_time", | |
3: "start_position_lat", | |
4: "start_position_long", | |
5: "end_position_lat", | |
6: "end_position_long", | |
7: "total_elapsed_time", | |
8: "total_timer_time", | |
9: "total_distance", | |
10: "total_cycles,total_strides", | |
11: "total_calories", | |
12: "total_fat_calories", | |
13: "avg_speed", | |
14: "max_speed", | |
15: "avg_heart_rate", | |
16: "max_heart_rate", | |
17: "avg_cadence,avg_running_cadence", | |
18: "max_cadence,max_running_cadence", | |
19: "avg_power", | |
20: "max_power", | |
21: "total_ascent", | |
22: "total_descent", | |
23: "intensity", | |
24: "lap_trigger", | |
25: "sport", | |
26: "event_group", | |
32: "num_lengths", | |
33: "normalized_power", | |
34: "left_right_balance", | |
35: "first_length_index", | |
37: "avg_stroke_distance", | |
38: "swim_stroke", | |
39: "sub_sport", | |
40: "num_active_lengths", | |
41: "total_work", | |
42: "avg_altitude", | |
43: "max_altitude", | |
44: "gps_accuracy", | |
45: "avg_grade", | |
46: "avg_pos_grade", | |
47: "avg_neg_grade", | |
48: "max_pos_grade", | |
49: "max_neg_grade", | |
50: "avg_temperature", | |
51: "max_temperature", | |
52: "total_moving_time", | |
53: "avg_pos_vertical_speed", | |
54: "avg_neg_vertical_speed", | |
55: "max_pos_vertical_speed", | |
56: "max_neg_vertical_speed", | |
57: "time_in_hr_zone", | |
58: "time_in_speed_zone", | |
59: "time_in_cadence_zone", | |
60: "time_in_power_zone", | |
61: "repetition_num", | |
62: "min_altitude", | |
63: "min_heart_rate", | |
71: "wkt_step_index", | |
74: "opponent_score", | |
75: "stroke_count", | |
76: "zone_count", | |
77: "avg_vertical_oscillation", | |
78: "avg_stance_time_percent", | |
79: "avg_stance_time", | |
80: "avg_fractional_cadence", | |
81: "max_fractional_cadence", | |
82: "total_fractional_cycles", | |
83: "player_score", | |
84: "avg_total_hemoglobin_conc", | |
85: "min_total_hemoglobin_conc", | |
86: "max_total_hemoglobin_conc", | |
87: "avg_saturated_hemoglobin_percent", | |
88: "min_saturated_hemoglobin_percent", | |
89: "max_saturated_hemoglobin_percent", | |
91: "avg_left_torque_effectiveness", | |
92: "avg_right_torque_effectiveness", | |
93: "avg_left_pedal_smoothness", | |
94: "avg_right_pedal_smoothness", | |
95: "avg_combined_pedal_smoothness", | |
98: "time_standing", | |
99: "stand_count", | |
100: "avg_left_pco", | |
101: "avg_right_pco", | |
102: "avg_left_power_phase", | |
103: "avg_left_power_phase_peak", | |
104: "avg_right_power_phase", | |
105: "avg_right_power_phase_peak", | |
106: "avg_power_position", | |
107: "max_power_position", | |
108: "avg_cadence_position", | |
109: "max_cadence_position", | |
110: "enhanced_avg_speed", | |
111: "enhanced_max_speed", | |
112: "enhanced_avg_altitude", | |
113: "enhanced_min_altitude", | |
114: "enhanced_max_altitude", | |
115: "avg_lev_motor_power", | |
116: "max_lev_motor_power", | |
117: "lev_battery_consumption", | |
118: "avg_vertical_ratio", | |
119: "avg_stance_time_balance", | |
120: "avg_step_length", | |
121: "avg_vam", | |
149: "total_grit", | |
150: "total_flow", | |
151: "jump_count", | |
153: "avg_grit", | |
154: "avg_flow", | |
}, | |
'record': { | |
253: "timestamp", | |
0: "position_lat", | |
1: "position_long", | |
2: "altitude", | |
3: "heart_rate", | |
4: "cadence", | |
5: "distance", | |
6: "speed", | |
7: "power", | |
8: "compressed_speed_distance", | |
9: "grade", | |
10: "resistance", | |
11: "time_from_course", | |
12: "cycle_length", | |
13: "temperature", | |
17: "speed_1s", | |
18: "cycles", | |
19: "total_cycles", | |
28: "compressed_accumulated_power", | |
29: "accumulated_power", | |
30: "left_right_balance", | |
31: "gps_accuracy", | |
32: "vertical_speed", | |
33: "calories", | |
39: "vertical_oscillation", | |
40: "stance_time_percent", | |
41: "stance_time", | |
42: "activity_type", | |
43: "left_torque_effectiveness", | |
44: "right_torque_effectiveness", | |
45: "left_pedal_smoothness", | |
46: "right_pedal_smoothness", | |
47: "combined_pedal_smoothness", | |
48: "time128", | |
49: "stroke_type", | |
50: "zone", | |
51: "ball_speed", | |
52: "cadence256", | |
53: "fractional_cadence", | |
54: "total_hemoglobin_conc", | |
55: "total_hemoglobin_conc_min", | |
56: "total_hemoglobin_conc_max", | |
57: "saturated_hemoglobin_percent", | |
58: "saturated_hemoglobin_percent_min", | |
59: "saturated_hemoglobin_percent_max", | |
62: "device_index", | |
67: "left_pco", | |
68: "right_pco", | |
69: "left_power_phase", | |
70: "left_power_phase_peak", | |
71: "right_power_phase", | |
72: "right_power_phase_peak", | |
73: "enhanced_speed", | |
78: "enhanced_altitude", | |
81: "battery_soc", | |
82: "motor_power", | |
83: "vertical_ratio", | |
84: "stance_time_balance", | |
85: "step_length", | |
91: "absolute_pressure", | |
92: "depth", | |
93: "next_stop_depth", | |
94: "next_stop_time", | |
95: "time_to_surface", | |
96: "ndl_time", | |
97: "cns_load", | |
98: "n2_load", | |
114: "grit", | |
115: "flow", | |
} | |
}.get(global_message_number_to_name(global_message_number), {}).get(field_id, field_id) | |
def parse_record(reader): | |
header = reader.read(1) | |
normal = (header >> 7) & 0b1 | |
if normal == 1: | |
# compressed timestamp | |
print("compressed timestamp header") | |
local_message_type = (header >> 5) & 0b11 | |
# local_message_type = local_message_type ^ 0b11 | |
print(bin(header)) | |
# local_message_type = 7 | |
print(local_message_type) | |
global_message_number = reader.get_global_message_number(local_message_type) | |
print(global_message_number_to_name(global_message_number)) | |
time_offset = header & 0b11111 | |
print("time_offset", time_offset) | |
fields = reader.get_definition_fields(local_message_type) | |
# if fields[0][0] != 253: # timestamp | |
# raise Exception(fields) | |
for (field_id, field_size, field_type) in fields[1:]: | |
if field_id == 253: # timestamp | |
continue | |
print(" %s = %s, # %s" % (get_field_name(global_message_number, field_id), parse_field(reader, field_type, field_size), field_type)) | |
return | |
message_type = (header >> 6) & 1 | |
message_type_specific = (header >> 5) & 1 | |
reserved = (header >> 4) & 1 | |
local_message_type = header & 0b1111 | |
if reserved != 0: | |
raise Exception() | |
if message_type_specific != 0: | |
if global_message_number_to_name(reader.get_global_message_number(local_message_type)) == "file_creator": | |
print("WARNING: message_type_specific field in message header is 0 in a data message of type file_creator, not sure what that means") | |
else: | |
print("message_type %i local_message_type %i (%s) message_type_specific %i" % (message_type, local_message_type, global_message_number_to_name(reader.get_global_message_number(local_message_type)), message_type_specific)) | |
raise Exception() | |
if message_type == 1: | |
# definition | |
reserved = reader.read(1) | |
if reserved != 0: | |
raise Exception() | |
architecture = reader.read(1) | |
if architecture != 0: | |
# if it's 1, big-endian instead | |
raise Exception() | |
global_message_number = struct.unpack('<H', reader.read(2))[0] | |
num_fields = reader.read(1) | |
fields = [] | |
for i in range(num_fields): | |
definition_number = reader.read(1) | |
field_size = reader.read(1) | |
# if field_size is multiple of size of base type, then is an array | |
field_base_type = FIELD_BASE_TYPES_TO_NAMES[reader.read(1)] | |
fields.append((definition_number, field_size, field_base_type)) | |
# fields = sorted(fields, key=lambda x: x[0]) | |
# fields = [x for x in fields] | |
# developer data flag?? | |
print("definition", local_message_type, "=", global_message_number_to_name(global_message_number), "{", fields, "}") | |
reader.add_definition(local_message_type, global_message_number, fields) | |
else: | |
# data | |
# print("data") | |
# assert it starts with file_id definition and message | |
# assert that this is an activity file | |
# print("header", header) | |
global_message_number = reader.get_global_message_number(local_message_type) | |
# print("message type:", local_message_type, "(%s)" % global_message_number_to_name(global_message_number)) | |
fields = reader.get_definition_fields(local_message_type) | |
print(global_message_number_to_name(global_message_number) + "(") | |
# print(" local_message_type='%s'," % global_message_number_to_name(global_message_number)) | |
for (field_id, field_size, field_type) in fields: | |
print(" %s = %s, # %s" % (get_field_name(global_message_number, field_id), parse_field(reader, field_type, field_size), field_type)) | |
# print("%s %s %s" % (get_field_name(global_message_number, field_id), field_type, parse_field(reader, field_type, field_size))) | |
# if field_type == 'string': | |
# print(b"string:" + reader.read(field_size)) | |
# else: | |
# for _ in range(field_size): | |
# s = b'' | |
# while True: | |
# c = reader.read(1) | |
# if c == 0x0: | |
# print("string: '%s'" % s) | |
# break | |
# s += bytes([c]) | |
# | |
# else: | |
# reader.read(field_size) | |
print(")") | |
# reader.read(reader.get_data_size(local_message_type)) | |
pass | |
def main(filename): | |
with open(filename, 'rb') as f: | |
data = f.read() | |
# series of records | |
# each with header and content | |
# record content is either: 1) definition message 2) fields with data | |
# global profile and device subset profiles | |
# file header, main records, 2-byte crc | |
# record header is 1 byte | |
# file header | |
# minimum size 12-bytes, 14-byte preferred | |
# header crc is optional with 14-byte header, set it to 0x0000 | |
header_size = data[0] | |
protocol_version = data[1] | |
profile_version = struct.unpack('<H', data[2:4])[0] | |
data_size = struct.unpack('<I', data[4:8])[0] # in bytes | |
magic = data[8:12] | |
header_crc = data[12:14] | |
print( | |
f"""fit_header( | |
header_size = {header_size}, | |
protocol_version = {protocol_version}, | |
profile_version = {profile_version}, | |
data_size = {data_size}, | |
magic = {magic}, | |
header_crc = {header_crc}, | |
) | |
""" | |
) | |
if data_size != len(data): | |
print(f"WARNING: data size recorded in header ({data_size}) does not match actual data size ({len(data)})") | |
reader = Reader(data) | |
reader.read(14) | |
i = 0 | |
while reader.remaining() > 2: | |
i += 1 | |
if i == 2367: | |
# skip to 55796 | |
print("WARNING: Skipping %i bytes at %i" % (55796 - reader.pos(), reader.pos())) | |
reader.read(55796 - reader.pos()) | |
# # find a byte 7, followed by four bytes that unpack to something around 938916000 | |
# while True: | |
# block = reader.peek(5) | |
# timestamp = struct.unpack('<I', block[1:])[0] | |
# if timestamp >= 938916000 and timestamp < 938916300: | |
# print(reader.pos(), timestamp) | |
# exit() | |
# else: | |
# reader.read(1) | |
# | |
# pass | |
print() | |
print("# message", i, "@", reader.pos()) | |
parse_record(reader) | |
main('9A1I0426.FIT') | |
# with open('9A1I0426.FIT', 'rb') as f: | |
# data = f.read() | |
# | |
# with open('skipped.fit', 'wb') as f: | |
# f.write(data[:55762]) | |
# f.write(data[55796:]) | |
# | |
# ch2 fix-fit skipped.FIT --fix-header --fix-checksum -o fixed.fit |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment