Created
January 22, 2025 21:37
-
-
Save fredvol/5e342205a21c07a385d05a41e1d099d8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## TRIAL on packet size | |
# usefull link : https://evanw.github.io/float-toy/ | |
# %% import | |
import struct | |
from datetime import datetime | |
import time | |
import numpy as np | |
import pandas as pd | |
#%% | |
#### FIRST APPROACH ( using fully struct.pack) | |
# # Sample message_dict | |
# message_dict = { | |
# "timestamp": "2024-11-10T15:06:05.029", | |
# "P0_(mbar)": -74.1467161179948, | |
# "T0_(C)": 22.406355721950533, | |
# "fish_depth": 0.0030501921191828972, | |
# "sensor_used": 0, | |
# "P_dyn_(mbar)": 2042.9901137867594, | |
# "T_dyn_(C)": 22.69834464788437, | |
# "fish_speed": 0, | |
# "duration_read_sensors": 0.04412102699279785, | |
# "elapsed_time": 5.428764343261719, | |
# "setpoint": 0.3, | |
# "pid_value": -2.071595431578338, | |
# "vbus_voltage_1": 11.929389953613281, | |
# "bat_level_1": 85.09755452473959, | |
# "ibus_1": 0.08959825336933136, | |
# "axis0.pos_estimate_1": -2.080005645751953, | |
# "axis0.current_state_1": 8, | |
# "vbus_voltage_2": 11.929389953613281, | |
# "bat_level_2": 85.09755452473959, | |
# "ibus_2": 0.08959825336933136, | |
# "axis0.pos_estimate_2": -2.080005645751953, | |
# "axis0.current_state_2": 8, | |
# "errors": 0, | |
# "core_frequ": 15.905589685248389, | |
# } | |
# # Define sender, receiver, and message type | |
# sender_id = 1 # as exemple | |
# receiver_id = 3 # as exemple | |
# message_type = 3 # as exemple ( Command :0 , status:1,Telemetry :3 etc.) | |
# # Encode the first byte | |
# header_byte = ( | |
# (sender_id & 0b111) | ((receiver_id & 0b111) << 3) | ((message_type & 0b11) << 6) | |
# ) | |
# # Convert timestamp to epoch ( we might need more precision than 1 s) | |
# timestamp_epoch = int( | |
# time.mktime( | |
# datetime.strptime(message_dict["timestamp"], "%Y-%m-%dT%H:%M:%S.%f").timetuple() | |
# ) | |
# ) | |
# # Define the structure format for binary packing | |
# packet_format = "<B I f f f B f f f f f f f f f f f f f f f f f f" | |
# # Prepare all the values | |
# field_values = [ | |
# int(header_byte), # Header byte | |
# int(timestamp_epoch), # Timestamp as an unsigned integer | |
# float(message_dict["P0_(mbar)"]), # Float | |
# float(message_dict["T0_(C)"]), # Float | |
# float(message_dict["fish_depth"]), # Float | |
# int(message_dict["sensor_used"]), # Byte | |
# float(message_dict["P_dyn_(mbar)"]), # Float | |
# float(message_dict["T_dyn_(C)"]), # Float | |
# float(message_dict["fish_speed"]), # Float | |
# float(message_dict["duration_read_sensors"]), # Float | |
# float(message_dict["elapsed_time"]), # Float | |
# float(message_dict["setpoint"]), # Float | |
# float(message_dict["pid_value"]), # Float | |
# int(message_dict["axis0.current_state_1"]), # Byte | |
# float(message_dict["vbus_voltage_1"]), # Float | |
# float(message_dict["bat_level_1"]), # Float | |
# float(message_dict["ibus_1"]), # Float | |
# float(message_dict["axis0.pos_estimate_1"]), # Float | |
# int(message_dict["axis0.current_state_2"]), # Byte | |
# float(message_dict["vbus_voltage_2"]), # Float | |
# float(message_dict["bat_level_2"]), # Float | |
# float(message_dict["ibus_2"]), # Float | |
# float(message_dict["axis0.pos_estimate_2"]), # Float | |
# float(message_dict["core_frequ"]), # Float | |
# ] | |
# # Making the packet | |
# binary_packet = struct.pack(packet_format, *field_values) | |
# # Outputs | |
# print(f"Binary Packet Size: {len(binary_packet)} bytes") | |
# print(f"Binary Packet: {binary_packet}") | |
# struct.calcsize(packet_format) | |
## Conclusion the packet is too big : 90 bytes | |
## to try to reduced to float16 isntead of float32 | |
# %% | |
#### SECOND APPROACH ( using struct.pack and numpy for use Float16) | |
# get the info on float16 | |
print(np.finfo(np.float16)) | |
print(np.finfo(np.float32)) | |
message_dict = { | |
"timestamp": "2024-11-10T15:06:05.029", | |
"P0_(mbar)": -74.1467161179948, | |
"T0_(C)": 22.406355721950533, | |
"fish_depth": 0.0030501921191828972, | |
"sensor_used": 0, | |
"P_dyn_(mbar)": 2042.9901137867594, | |
"T_dyn_(C)": 22.69834464788437, | |
"fish_speed": 0, | |
"duration_read_sensors": 0.04412102699279785, | |
"elapsed_time": 5.428764343261719, | |
"setpoint": 0.3, | |
"pid_value": -2.071595431578338, | |
"vbus_voltage_1": 11.929389953613281, | |
"bat_level_1": 85.09755452473959, | |
"ibus_1": 0.08959825336933136, | |
"axis0.pos_estimate_1": -2.080005645751953, | |
"axis0.current_state_1": 8, | |
"vbus_voltage_2": 11.929389953613281, | |
"bat_level_2": 85.09755452473959, | |
"ibus_2": 0.08959825336933136, | |
"axis0.pos_estimate_2": -2.080005645751953, | |
"axis0.current_state_2": 8, | |
"errors": 0, | |
"core_frequ": 15.905589685248389, | |
} | |
# # Define sender, receiver, and message type | |
sender_id = 1 # as exemple | |
receiver_id = 3 # as exemple | |
message_type = 3 # as exemple ( Command :0 , status:1,Telemetry :3 etc.) | |
# Encode the first byte | |
header_byte = ( | |
(sender_id & 0b111) | ((receiver_id & 0b111) << 3) | ((message_type & 0b11) << 6) | |
) | |
# Convert timestamp to epoch ( same here as well we need more presision) | |
timestamp_epoch = int( | |
time.mktime( | |
datetime.strptime(message_dict["timestamp"], "%Y-%m-%dT%H:%M:%S.%f").timetuple() | |
) | |
) | |
# Fields to convert to float16 | |
float_fields = [ | |
"P0_(mbar)", | |
"T0_(C)", | |
"fish_depth", | |
"P_dyn_(mbar)", | |
"T_dyn_(C)", | |
"fish_speed", | |
"duration_read_sensors", | |
"elapsed_time", | |
"setpoint", | |
"pid_value", | |
"vbus_voltage_1", | |
"bat_level_1", | |
"ibus_1", | |
"axis0.pos_estimate_1", | |
"vbus_voltage_2", | |
"bat_level_2", | |
"ibus_2", | |
"axis0.pos_estimate_2", | |
"core_frequ", | |
] | |
# Convert | |
float16_values = [np.float16(message_dict[field]).tobytes() for field in float_fields] | |
# Define the structure format for binary packing | |
packet_format = "<B I B B B" # Header byte, timestamp, sensor_used, axis states (current_state_1 and current_state_2) | |
# Pack non-float fields | |
binary_packet = struct.pack( | |
packet_format, | |
header_byte, # Header byte | |
timestamp_epoch, # Timestamp | |
int(message_dict["sensor_used"]), # Sensor used | |
int(message_dict["axis0.current_state_1"]), # Current state 1 | |
int(message_dict["axis0.current_state_2"]), # Current state 2 | |
) | |
# Append all float16 fields | |
for float16 in float16_values: | |
binary_packet += float16 | |
# Output the binary packet size and content | |
print(f"Binary Packet Size: {len(binary_packet)} bytes") | |
print(f"Binary Packet: {binary_packet}") | |
## Binary Packet Size: 46 bytes = acceptable | |
# %% ### CRATING STRUCTURE DICT | |
# code to create the dictionarry that hold the structure | |
# I add some scaling has it might be need later as the maxium range for float16 is | |
float_fields = [ | |
"P0_(mbar)", | |
"T0_(C)", | |
"fish_depth", | |
"P_dyn_(mbar)", | |
"T_dyn_(C)", | |
"fish_speed", | |
"duration_read_sensors", | |
"elapsed_time", | |
"setpoint", | |
"pid_value", | |
"vbus_voltage_1", | |
"bat_level_1", | |
"ibus_1", | |
"axis0.pos_estimate_1", | |
"vbus_voltage_2", | |
"bat_level_2", | |
"ibus_2", | |
"axis0.pos_estimate_2", | |
"core_frequ", | |
] | |
# Create a dictionary to hold the format for all fields in the packet | |
field_format_dict = { | |
"header_byte": {"bytes": 1, "type": "uint8", "scale": 1}, # Header byte | |
"timestamp": {"bytes": 4, "type": "uint32", "scale": 1}, # Timestamp | |
"sensor_used": {"bytes": 1, "type": "uint8", "scale": 1}, # Sensor used | |
"axis0.current_state_1": {"bytes": 1, "type": "uint8", "scale": 1}, # Axis state 1 | |
"axis0.current_state_2": {"bytes": 1, "type": "uint8", "scale": 1}, # Axis state 2 | |
} | |
# Add float fields as 16-bit floats (2 bytes each) with scale = 1 | |
for field in float_fields: | |
field_format_dict[field] = {"bytes": 2, "type": "float16", "scale": 1} | |
# Display the dictionary as a DataFrame for better visualization | |
field_format_df = pd.DataFrame(field_format_dict).T.reset_index() | |
field_format_df.columns = ["Field Name", "Bytes Used", "Type", "Scale"] | |
print(field_format_df) | |
# %% | |
#### DECODE BINARY | |
## to check how the value is decoded | |
# binary_data = b"\xd9\xcd\xbd0g\x1eK\x94\xc27@\xb3A\xbb\xe5G;\x00\xaf_\xffD6\x96\xb5A\x00\x00\x00\x00@\xb84=p\xb8\xad@\x9a\x99\x99>\x05\x95\x04\xc0\x00\x00\x00A\xc8\xde>A\xf31\xaaBJ\x7f\xb7=\xd0\x1e\x05\xc0\x00\x00\x00A\xc8\xde>A\xf31\xaaBJ\x7f\xb7=\xd0\x1e\x05\xc0L}~A" | |
binary_data = binary_packet | |
# Field dict format from the code above | |
field_format = { | |
"header_byte": {"bytes": 1, "type": "uint8", "scale": 1}, | |
"timestamp": {"bytes": 4, "type": "uint32", "scale": 1}, | |
"sensor_used": {"bytes": 1, "type": "uint8", "scale": 1}, | |
"axis0.current_state_1": {"bytes": 1, "type": "uint8", "scale": 1}, | |
"axis0.current_state_2": {"bytes": 1, "type": "uint8", "scale": 1}, | |
"P0_(mbar)": {"bytes": 2, "type": "float16", "scale": 1}, | |
"T0_(C)": {"bytes": 2, "type": "float16", "scale": 1}, | |
"fish_depth": {"bytes": 2, "type": "float16", "scale": 1}, | |
"P_dyn_(mbar)": {"bytes": 2, "type": "float16", "scale": 1}, | |
"T_dyn_(C)": {"bytes": 2, "type": "float16", "scale": 1}, | |
"fish_speed": {"bytes": 2, "type": "float16", "scale": 1}, | |
"duration_read_sensors": {"bytes": 2, "type": "float16", "scale": 1}, | |
"elapsed_time": {"bytes": 2, "type": "float16", "scale": 1}, | |
"setpoint": {"bytes": 2, "type": "float16", "scale": 1}, | |
"pid_value": {"bytes": 2, "type": "float16", "scale": 1}, | |
"vbus_voltage_1": {"bytes": 2, "type": "float16", "scale": 1}, | |
"bat_level_1": {"bytes": 2, "type": "float16", "scale": 1}, | |
"ibus_1": {"bytes": 2, "type": "float16", "scale": 1}, | |
"axis0.pos_estimate_1": {"bytes": 2, "type": "float16", "scale": 1}, | |
"vbus_voltage_2": {"bytes": 2, "type": "float16", "scale": 1}, | |
"bat_level_2": {"bytes": 2, "type": "float16", "scale": 1}, | |
"ibus_2": {"bytes": 2, "type": "float16", "scale": 1}, | |
"axis0.pos_estimate_2": {"bytes": 2, "type": "float16", "scale": 1}, | |
"core_frequ": {"bytes": 2, "type": "float16", "scale": 1}, | |
} | |
# Decode binary data based on the field format | |
def decode_binary_data(binary_data, field_format): | |
offset = 0 | |
decoded_data = {} | |
for field, specs in field_format.items(): | |
field_bytes = specs["bytes"] | |
field_type = specs["type"] | |
field_scale = specs["scale"] | |
# Extract bytes for the current field | |
field_data = binary_data[offset : offset + field_bytes] | |
offset += field_bytes | |
# Decode based on type | |
if field_type == "uint8": | |
value = struct.unpack("<B", field_data)[0] | |
elif field_type == "uint32": | |
value = struct.unpack("<I", field_data)[0] | |
elif field_type == "float16": | |
value = np.frombuffer(field_data, dtype=np.float16)[0] | |
else: | |
raise ValueError(f"Unsupported type: {field_type}") | |
# Apply scaling | |
decoded_data[field] = value * field_scale | |
return decoded_data | |
# Decode the binary data | |
decoded_values = decode_binary_data(binary_data, field_format) | |
decoded_values | |
# %% COMPARAISON BEFORE AND AFTER TRANSFER | |
comparison_table = [] | |
for key in message_dict.keys(): | |
print(f"checking: {key}") | |
original_value = message_dict[key] | |
decoded_value = decoded_values.get(key, None) | |
print(f"{original_value = } , {decoded_value =}") | |
try: | |
difference = ( | |
abs(original_value - decoded_value) if decoded_value is not None else None | |
) | |
comparison_table.append( | |
{ | |
"Field": key, | |
"Original Value": original_value, | |
"Decoded Value": decoded_value, | |
"Difference": difference, | |
} | |
) | |
except: | |
print( "fail") | |
# Convert to DataFrame for better visualization | |
comparison_df = pd.DataFrame(comparison_table) | |
# %% |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment