|
## Lora transmition Packet definition ( TELEMETRY) |
|
|
|
# This document outlines the process of encoding and decoding telemetry data into a compact binary format suitable for LoRa transmission. |
|
# usefull link : https://evanw.github.io/float-toy/ |
|
|
|
# This document has 4 parts: |
|
# * Creating a dict that hold the structure |
|
# * the code that encode a Telemetry data dict |
|
# * the code that decode it . |
|
# * A comparaison between the original data dict and the reconstructed one. |
|
|
|
|
|
# %% import |
|
import struct |
|
from datetime import datetime |
|
import time |
|
import numpy as np |
|
import pandas as pd |
|
|
|
# %% INTRO :: General info on the data type |
|
# get the info on float16 |
|
print(np.finfo(np.float16)) |
|
print("size float16 (bytes): ", np.dtype("float16").itemsize) |
|
print(np.finfo(np.float32)) |
|
print("size float16 (bytes): ", np.dtype("float16").itemsize) |
|
print(np.iinfo(np.int32)) |
|
print("size float16 (bytes): ", np.dtype("int32").itemsize) |
|
|
|
# %% ### PART 1 :: CREATING STRUCTURE DICT |
|
# code to create the dictionarry that hold the structure |
|
# I add some scaling and offset has it might be need later as the maxium range for float16 is 65504.0 |
|
|
|
# Define float fields |
|
float_fields = [ |
|
"P0_(mbar)", |
|
"T0_(C)", |
|
"fish_depth", |
|
"P_dyn_(mbar)", |
|
"T_dyn_(C)", |
|
"fish_speed", |
|
"duration_read_sensors", |
|
"elapsed_time", |
|
"setpoint", |
|
"pid_value", |
|
"vbus_voltage_1", |
|
"bat_level_1", |
|
"ibus_1", |
|
"axis0.pos_estimate_1", |
|
"vbus_voltage_2", |
|
"bat_level_2", |
|
"ibus_2", |
|
"axis0.pos_estimate_2", |
|
"core_frequ", |
|
] |
|
|
|
|
|
# Create a dictionary to hold the format for all fields in the packet with offsets in seconds |
|
|
|
field_format_dict_with_seconds_offset = { |
|
"header_byte": { |
|
"bytes": 1, |
|
"type": "uint8", |
|
"scale": 1, |
|
"offset": 0, |
|
}, |
|
"timestamp": { |
|
"bytes": 4, |
|
"type": "uint32", |
|
"scale": 0.1, |
|
"offset": 1704063600, |
|
}, # Offset for timestamp in seconds |
|
"sensor_used": { |
|
"bytes": 1, |
|
"type": "uint8", |
|
"scale": 1, |
|
"offset": 0, |
|
}, |
|
"axis0.current_state_1": { |
|
"bytes": 1, |
|
"type": "uint8", |
|
"scale": 1, |
|
"offset": 0, |
|
}, |
|
"axis0.current_state_2": { |
|
"bytes": 1, |
|
"type": "uint8", |
|
"scale": 1, |
|
"offset": 0, |
|
}, |
|
} |
|
|
|
|
|
# Add float fields with no offset but with default scale and size |
|
|
|
for field in float_fields: |
|
|
|
field_format_dict_with_seconds_offset[field] = { |
|
"bytes": 2, |
|
"type": "float16", |
|
"scale": 1, |
|
"offset": 0, |
|
} |
|
|
|
# Convert to a DataFrame for better visualization |
|
|
|
field_format_df = pd.DataFrame(field_format_dict_with_seconds_offset).T.reset_index() |
|
|
|
|
|
field_format_df.columns = ["Field Name", "Bytes Used", "Type", "Scale", "Offset"] |
|
print(field_format_df) |
|
|
|
# %% #### PART2 :: ENCODING ( using struct.pack and numpy for use Float16) |
|
|
|
# extract from fish eye log ( I duplicate Odrive data to simulate 2 Odrives) |
|
message_dict = { |
|
"timestamp": "2024-11-10T15:06:05.329", |
|
"P0_(mbar)": -74.1467161179948, |
|
"T0_(C)": 22.406355721950533, |
|
"fish_depth": 0.0030501921191828972, |
|
"sensor_used": 0, |
|
"P_dyn_(mbar)": 2042.9901137867594, |
|
"T_dyn_(C)": 22.69834464788437, |
|
"fish_speed": 0, |
|
"duration_read_sensors": 0.04412102699279785, |
|
"elapsed_time": 5.428764343261719, |
|
"setpoint": 0.3, |
|
"pid_value": -2.071595431578338, |
|
"vbus_voltage_1": 11.929389953613281, |
|
"bat_level_1": 85.09755452473959, |
|
"ibus_1": 0.08959825336933136, |
|
"axis0.pos_estimate_1": -2.080005645751953, |
|
"axis0.current_state_1": 8, |
|
"vbus_voltage_2": 11.929389953613281, |
|
"bat_level_2": 85.09755452473959, |
|
"ibus_2": 0.08959825336933136, |
|
"axis0.pos_estimate_2": -2.080005645751953, |
|
"axis0.current_state_2": 8, |
|
"errors": 0, |
|
"core_frequ": 15.905589685248389, |
|
} |
|
|
|
|
|
# # Define sender, receiver, and message type |
|
sender_id = 1 # as exemple |
|
receiver_id = 3 # as exemple |
|
message_type = 3 # as exemple ( Command :0 , status:1,Telemetry :3 etc.) |
|
|
|
|
|
# Encode the first byte |
|
header_byte = ( |
|
(sender_id & 0b111) | ((receiver_id & 0b111) << 3) | ((message_type & 0b11) << 6) |
|
) |
|
|
|
# Convert timestamp |
|
timestamp_epoch = datetime.strptime( |
|
message_dict["timestamp"], "%Y-%m-%dT%H:%M:%S.%f" |
|
).timestamp() |
|
|
|
start2024 = datetime(2024, 1, 1, 0, 0, 0).timestamp() |
|
time_difference = timestamp_epoch - start2024 |
|
|
|
timestamp_int32 = int(time_difference * 10) |
|
|
|
# Fields to convert to float16 |
|
float_fields = [ |
|
"P0_(mbar)", |
|
"T0_(C)", |
|
"fish_depth", |
|
"P_dyn_(mbar)", |
|
"T_dyn_(C)", |
|
"fish_speed", |
|
"duration_read_sensors", |
|
"elapsed_time", |
|
"setpoint", |
|
"pid_value", |
|
"vbus_voltage_1", |
|
"bat_level_1", |
|
"ibus_1", |
|
"axis0.pos_estimate_1", |
|
"vbus_voltage_2", |
|
"bat_level_2", |
|
"ibus_2", |
|
"axis0.pos_estimate_2", |
|
"core_frequ", |
|
] |
|
|
|
# Convert |
|
float16_values = [np.float16(message_dict[field]).tobytes() for field in float_fields] |
|
|
|
# Define the structure format for binary packing |
|
packet_format = ( |
|
"<B I B B B" # Header byte, timestamp (unsigned int32), sensor_used, axis states |
|
) |
|
|
|
# Pack non-float fields |
|
binary_packet = struct.pack( |
|
packet_format, |
|
header_byte, # Header byte |
|
timestamp_int32, # Timestamp |
|
int(message_dict["sensor_used"]), # Sensor used |
|
int(message_dict["axis0.current_state_1"]), # Current state 1 |
|
int(message_dict["axis0.current_state_2"]), # Current state 2 |
|
) |
|
|
|
# Append all float16 fields |
|
for float16 in float16_values: |
|
binary_packet += float16 |
|
|
|
# Output the binary packet size and content |
|
print(f"Binary Packet Size: {len(binary_packet)} bytes") |
|
print(f"Binary Packet: {binary_packet}") |
|
|
|
## Binary Packet Size: 46 bytes = acceptable |
|
|
|
|
|
# %% |
|
|
|
#### PART 3 :: DECODE BINARY |
|
## to check how the value is decoded |
|
# binary_data = b"\xd9\xcd\xbd0g\x1eK\x94\xc27@\xb3A\xbb\xe5G;\x00\xaf_\xffD6\x96\xb5A\x00\x00\x00\x00@\xb84=p\xb8\xad@\x9a\x99\x99>\x05\x95\x04\xc0\x00\x00\x00A\xc8\xde>A\xf31\xaaBJ\x7f\xb7=\xd0\x1e\x05\xc0\x00\x00\x00A\xc8\xde>A\xf31\xaaBJ\x7f\xb7=\xd0\x1e\x05\xc0L}~A" |
|
binary_data = binary_packet |
|
|
|
# Field dict format from the code above |
|
field_format = { |
|
"header_byte": {"bytes": 1, "type": "uint8", "scale": 1, "offset": 0}, |
|
"timestamp": {"bytes": 4, "type": "uint32", "scale": 0.1, "offset": 1704063600}, |
|
"sensor_used": {"bytes": 1, "type": "uint8", "scale": 1, "offset": 0}, |
|
"axis0.current_state_1": {"bytes": 1, "type": "uint8", "scale": 1, "offset": 0}, |
|
"axis0.current_state_2": {"bytes": 1, "type": "uint8", "scale": 1, "offset": 0}, |
|
"P0_(mbar)": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"T0_(C)": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"fish_depth": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"P_dyn_(mbar)": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"T_dyn_(C)": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"fish_speed": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"duration_read_sensors": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"elapsed_time": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"setpoint": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"pid_value": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"vbus_voltage_1": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"bat_level_1": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"ibus_1": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"axis0.pos_estimate_1": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"vbus_voltage_2": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"bat_level_2": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"ibus_2": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"axis0.pos_estimate_2": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
"core_frequ": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0}, |
|
} |
|
|
|
|
|
# Decode binary data based on the field format |
|
def decode_binary_data(binary_data, field_format): |
|
|
|
offset = 0 |
|
decoded_data = {} |
|
|
|
for field, specs in field_format.items(): |
|
field_bytes = specs["bytes"] |
|
field_type = specs["type"] |
|
field_scale = specs["scale"] |
|
field_offset = specs["offset"] |
|
|
|
# Extract bytes for the current field |
|
field_data = binary_data[offset : offset + field_bytes] |
|
offset += field_bytes |
|
|
|
# Decode based on type |
|
if field_type == "uint8": |
|
value = struct.unpack("<B", field_data)[0] |
|
elif field_type == "uint32": |
|
value = struct.unpack("<I", field_data)[0] |
|
elif field_type == "float16": |
|
value = np.frombuffer(field_data, dtype=np.float16)[0] |
|
else: |
|
raise ValueError(f"Unsupported type: {field_type}") |
|
# Apply scaling |
|
|
|
decoded_data[field] = value * field_scale + field_offset |
|
return decoded_data |
|
|
|
|
|
# Decode the binary data |
|
decoded_values = decode_binary_data(binary_data, field_format) |
|
|
|
decoded_values |
|
|
|
# %% PART 4 :: COMPARAISON BEFORE AND AFTER TRANSFER |
|
comparison_table = [] |
|
|
|
for key in message_dict.keys(): |
|
print(f"checking: {key}") |
|
original_value = message_dict[key] |
|
decoded_value = decoded_values.get(key, None) |
|
print(f"{original_value = } , {decoded_value =}") |
|
|
|
try: |
|
|
|
difference = ( |
|
abs(original_value - decoded_value) if decoded_value is not None else None |
|
) |
|
comparison_table.append( |
|
{ |
|
"Field": key, |
|
"Original Value": original_value, |
|
"Decoded Value": decoded_value, |
|
"Difference": difference, |
|
} |
|
) |
|
except: |
|
print("fail") |
|
|
|
|
|
# Convert to DataFrame for better visualization |
|
comparison_df = pd.DataFrame(comparison_table) |
|
|
|
# %% ARCHIVE |
|
# %% |
|
#### FIRST APPROACH ( the simplest using fully struct.pack) |
|
# it fail because the size is too big |
|
|
|
# # Sample message_dict |
|
# message_dict = { |
|
# "timestamp": "2024-11-10T15:06:05.029", |
|
# "P0_(mbar)": -74.1467161179948, |
|
# "T0_(C)": 22.406355721950533, |
|
# "fish_depth": 0.0030501921191828972, |
|
# "sensor_used": 0, |
|
# "P_dyn_(mbar)": 2042.9901137867594, |
|
# "T_dyn_(C)": 22.69834464788437, |
|
# "fish_speed": 0, |
|
# "duration_read_sensors": 0.04412102699279785, |
|
# "elapsed_time": 5.428764343261719, |
|
# "setpoint": 0.3, |
|
# "pid_value": -2.071595431578338, |
|
# "vbus_voltage_1": 11.929389953613281, |
|
# "bat_level_1": 85.09755452473959, |
|
# "ibus_1": 0.08959825336933136, |
|
# "axis0.pos_estimate_1": -2.080005645751953, |
|
# "axis0.current_state_1": 8, |
|
# "vbus_voltage_2": 11.929389953613281, |
|
# "bat_level_2": 85.09755452473959, |
|
# "ibus_2": 0.08959825336933136, |
|
# "axis0.pos_estimate_2": -2.080005645751953, |
|
# "axis0.current_state_2": 8, |
|
# "errors": 0, |
|
# "core_frequ": 15.905589685248389, |
|
# } |
|
|
|
# # Define sender, receiver, and message type |
|
# sender_id = 1 # as exemple |
|
# receiver_id = 3 # as exemple |
|
# message_type = 3 # as exemple ( Command :0 , status:1,Telemetry :3 etc.) |
|
|
|
# # Encode the first byte |
|
# header_byte = ( |
|
# (sender_id & 0b111) | ((receiver_id & 0b111) << 3) | ((message_type & 0b11) << 6) |
|
# ) |
|
|
|
# # Convert timestamp to epoch ( we might need more precision than 1 s) |
|
# timestamp_epoch = int( |
|
# time.mktime( |
|
# datetime.strptime(message_dict["timestamp"], "%Y-%m-%dT%H:%M:%S.%f").timetuple() |
|
# ) |
|
# ) |
|
|
|
# # Define the structure format for binary packing |
|
# packet_format = "<B I f f f B f f f f f f f f f f f f f f f f f f" |
|
|
|
|
|
# # Prepare all the values |
|
# field_values = [ |
|
# int(header_byte), # Header byte |
|
# int(timestamp_epoch), # Timestamp as an unsigned integer |
|
# float(message_dict["P0_(mbar)"]), # Float |
|
# float(message_dict["T0_(C)"]), # Float |
|
# float(message_dict["fish_depth"]), # Float |
|
# int(message_dict["sensor_used"]), # Byte |
|
# float(message_dict["P_dyn_(mbar)"]), # Float |
|
# float(message_dict["T_dyn_(C)"]), # Float |
|
# float(message_dict["fish_speed"]), # Float |
|
# float(message_dict["duration_read_sensors"]), # Float |
|
# float(message_dict["elapsed_time"]), # Float |
|
# float(message_dict["setpoint"]), # Float |
|
# float(message_dict["pid_value"]), # Float |
|
# int(message_dict["axis0.current_state_1"]), # Byte |
|
# float(message_dict["vbus_voltage_1"]), # Float |
|
# float(message_dict["bat_level_1"]), # Float |
|
# float(message_dict["ibus_1"]), # Float |
|
# float(message_dict["axis0.pos_estimate_1"]), # Float |
|
# int(message_dict["axis0.current_state_2"]), # Byte |
|
# float(message_dict["vbus_voltage_2"]), # Float |
|
# float(message_dict["bat_level_2"]), # Float |
|
# float(message_dict["ibus_2"]), # Float |
|
# float(message_dict["axis0.pos_estimate_2"]), # Float |
|
# float(message_dict["core_frequ"]), # Float |
|
# ] |
|
|
|
|
|
# # Making the packet |
|
# binary_packet = struct.pack(packet_format, *field_values) |
|
|
|
# # Outputs |
|
# print(f"Binary Packet Size: {len(binary_packet)} bytes") |
|
# print(f"Binary Packet: {binary_packet}") |
|
# struct.calcsize(packet_format) |
|
|
|
## Conclusion the packet is too big : 90 bytes |
|
## to try to reduced to float16 isntead of float32 |