Skip to content

Instantly share code, notes, and snippets.

@fredvol
Created January 22, 2025 21:37
Show Gist options
  • Save fredvol/5e342205a21c07a385d05a41e1d099d8 to your computer and use it in GitHub Desktop.
Save fredvol/5e342205a21c07a385d05a41e1d099d8 to your computer and use it in GitHub Desktop.
## TRIAL on packet size
# usefull link : https://evanw.github.io/float-toy/
# %% import
import struct
from datetime import datetime
import time
import numpy as np
import pandas as pd
#%%
#### FIRST APPROACH ( using fully struct.pack)
# # Sample message_dict
# message_dict = {
# "timestamp": "2024-11-10T15:06:05.029",
# "P0_(mbar)": -74.1467161179948,
# "T0_(C)": 22.406355721950533,
# "fish_depth": 0.0030501921191828972,
# "sensor_used": 0,
# "P_dyn_(mbar)": 2042.9901137867594,
# "T_dyn_(C)": 22.69834464788437,
# "fish_speed": 0,
# "duration_read_sensors": 0.04412102699279785,
# "elapsed_time": 5.428764343261719,
# "setpoint": 0.3,
# "pid_value": -2.071595431578338,
# "vbus_voltage_1": 11.929389953613281,
# "bat_level_1": 85.09755452473959,
# "ibus_1": 0.08959825336933136,
# "axis0.pos_estimate_1": -2.080005645751953,
# "axis0.current_state_1": 8,
# "vbus_voltage_2": 11.929389953613281,
# "bat_level_2": 85.09755452473959,
# "ibus_2": 0.08959825336933136,
# "axis0.pos_estimate_2": -2.080005645751953,
# "axis0.current_state_2": 8,
# "errors": 0,
# "core_frequ": 15.905589685248389,
# }
# # Define sender, receiver, and message type
# sender_id = 1 # as exemple
# receiver_id = 3 # as exemple
# message_type = 3 # as exemple ( Command :0 , status:1,Telemetry :3 etc.)
# # Encode the first byte
# header_byte = (
# (sender_id & 0b111) | ((receiver_id & 0b111) << 3) | ((message_type & 0b11) << 6)
# )
# # Convert timestamp to epoch ( we might need more precision than 1 s)
# timestamp_epoch = int(
# time.mktime(
# datetime.strptime(message_dict["timestamp"], "%Y-%m-%dT%H:%M:%S.%f").timetuple()
# )
# )
# # Define the structure format for binary packing
# packet_format = "<B I f f f B f f f f f f f f f f f f f f f f f f"
# # Prepare all the values
# field_values = [
# int(header_byte), # Header byte
# int(timestamp_epoch), # Timestamp as an unsigned integer
# float(message_dict["P0_(mbar)"]), # Float
# float(message_dict["T0_(C)"]), # Float
# float(message_dict["fish_depth"]), # Float
# int(message_dict["sensor_used"]), # Byte
# float(message_dict["P_dyn_(mbar)"]), # Float
# float(message_dict["T_dyn_(C)"]), # Float
# float(message_dict["fish_speed"]), # Float
# float(message_dict["duration_read_sensors"]), # Float
# float(message_dict["elapsed_time"]), # Float
# float(message_dict["setpoint"]), # Float
# float(message_dict["pid_value"]), # Float
# int(message_dict["axis0.current_state_1"]), # Byte
# float(message_dict["vbus_voltage_1"]), # Float
# float(message_dict["bat_level_1"]), # Float
# float(message_dict["ibus_1"]), # Float
# float(message_dict["axis0.pos_estimate_1"]), # Float
# int(message_dict["axis0.current_state_2"]), # Byte
# float(message_dict["vbus_voltage_2"]), # Float
# float(message_dict["bat_level_2"]), # Float
# float(message_dict["ibus_2"]), # Float
# float(message_dict["axis0.pos_estimate_2"]), # Float
# float(message_dict["core_frequ"]), # Float
# ]
# # Making the packet
# binary_packet = struct.pack(packet_format, *field_values)
# # Outputs
# print(f"Binary Packet Size: {len(binary_packet)} bytes")
# print(f"Binary Packet: {binary_packet}")
# struct.calcsize(packet_format)
## Conclusion the packet is too big : 90 bytes
## to try to reduced to float16 isntead of float32
# %%
#### SECOND APPROACH ( using struct.pack and numpy for use Float16)
# get the info on float16
print(np.finfo(np.float16))
print(np.finfo(np.float32))
message_dict = {
"timestamp": "2024-11-10T15:06:05.029",
"P0_(mbar)": -74.1467161179948,
"T0_(C)": 22.406355721950533,
"fish_depth": 0.0030501921191828972,
"sensor_used": 0,
"P_dyn_(mbar)": 2042.9901137867594,
"T_dyn_(C)": 22.69834464788437,
"fish_speed": 0,
"duration_read_sensors": 0.04412102699279785,
"elapsed_time": 5.428764343261719,
"setpoint": 0.3,
"pid_value": -2.071595431578338,
"vbus_voltage_1": 11.929389953613281,
"bat_level_1": 85.09755452473959,
"ibus_1": 0.08959825336933136,
"axis0.pos_estimate_1": -2.080005645751953,
"axis0.current_state_1": 8,
"vbus_voltage_2": 11.929389953613281,
"bat_level_2": 85.09755452473959,
"ibus_2": 0.08959825336933136,
"axis0.pos_estimate_2": -2.080005645751953,
"axis0.current_state_2": 8,
"errors": 0,
"core_frequ": 15.905589685248389,
}
# # Define sender, receiver, and message type
sender_id = 1 # as exemple
receiver_id = 3 # as exemple
message_type = 3 # as exemple ( Command :0 , status:1,Telemetry :3 etc.)
# Encode the first byte
header_byte = (
(sender_id & 0b111) | ((receiver_id & 0b111) << 3) | ((message_type & 0b11) << 6)
)
# Convert timestamp to epoch ( same here as well we need more presision)
timestamp_epoch = int(
time.mktime(
datetime.strptime(message_dict["timestamp"], "%Y-%m-%dT%H:%M:%S.%f").timetuple()
)
)
# Fields to convert to float16
float_fields = [
"P0_(mbar)",
"T0_(C)",
"fish_depth",
"P_dyn_(mbar)",
"T_dyn_(C)",
"fish_speed",
"duration_read_sensors",
"elapsed_time",
"setpoint",
"pid_value",
"vbus_voltage_1",
"bat_level_1",
"ibus_1",
"axis0.pos_estimate_1",
"vbus_voltage_2",
"bat_level_2",
"ibus_2",
"axis0.pos_estimate_2",
"core_frequ",
]
# Convert
float16_values = [np.float16(message_dict[field]).tobytes() for field in float_fields]
# Define the structure format for binary packing
packet_format = "<B I B B B" # Header byte, timestamp, sensor_used, axis states (current_state_1 and current_state_2)
# Pack non-float fields
binary_packet = struct.pack(
packet_format,
header_byte, # Header byte
timestamp_epoch, # Timestamp
int(message_dict["sensor_used"]), # Sensor used
int(message_dict["axis0.current_state_1"]), # Current state 1
int(message_dict["axis0.current_state_2"]), # Current state 2
)
# Append all float16 fields
for float16 in float16_values:
binary_packet += float16
# Output the binary packet size and content
print(f"Binary Packet Size: {len(binary_packet)} bytes")
print(f"Binary Packet: {binary_packet}")
## Binary Packet Size: 46 bytes = acceptable
# %% ### CRATING STRUCTURE DICT
# code to create the dictionarry that hold the structure
# I add some scaling has it might be need later as the maxium range for float16 is
float_fields = [
"P0_(mbar)",
"T0_(C)",
"fish_depth",
"P_dyn_(mbar)",
"T_dyn_(C)",
"fish_speed",
"duration_read_sensors",
"elapsed_time",
"setpoint",
"pid_value",
"vbus_voltage_1",
"bat_level_1",
"ibus_1",
"axis0.pos_estimate_1",
"vbus_voltage_2",
"bat_level_2",
"ibus_2",
"axis0.pos_estimate_2",
"core_frequ",
]
# Create a dictionary to hold the format for all fields in the packet
field_format_dict = {
"header_byte": {"bytes": 1, "type": "uint8", "scale": 1}, # Header byte
"timestamp": {"bytes": 4, "type": "uint32", "scale": 1}, # Timestamp
"sensor_used": {"bytes": 1, "type": "uint8", "scale": 1}, # Sensor used
"axis0.current_state_1": {"bytes": 1, "type": "uint8", "scale": 1}, # Axis state 1
"axis0.current_state_2": {"bytes": 1, "type": "uint8", "scale": 1}, # Axis state 2
}
# Add float fields as 16-bit floats (2 bytes each) with scale = 1
for field in float_fields:
field_format_dict[field] = {"bytes": 2, "type": "float16", "scale": 1}
# Display the dictionary as a DataFrame for better visualization
field_format_df = pd.DataFrame(field_format_dict).T.reset_index()
field_format_df.columns = ["Field Name", "Bytes Used", "Type", "Scale"]
print(field_format_df)
# %%
#### DECODE BINARY
## to check how the value is decoded
# binary_data = b"\xd9\xcd\xbd0g\x1eK\x94\xc27@\xb3A\xbb\xe5G;\x00\xaf_\xffD6\x96\xb5A\x00\x00\x00\x00@\xb84=p\xb8\xad@\x9a\x99\x99>\x05\x95\x04\xc0\x00\x00\x00A\xc8\xde>A\xf31\xaaBJ\x7f\xb7=\xd0\x1e\x05\xc0\x00\x00\x00A\xc8\xde>A\xf31\xaaBJ\x7f\xb7=\xd0\x1e\x05\xc0L}~A"
binary_data = binary_packet
# Field dict format from the code above
field_format = {
"header_byte": {"bytes": 1, "type": "uint8", "scale": 1},
"timestamp": {"bytes": 4, "type": "uint32", "scale": 1},
"sensor_used": {"bytes": 1, "type": "uint8", "scale": 1},
"axis0.current_state_1": {"bytes": 1, "type": "uint8", "scale": 1},
"axis0.current_state_2": {"bytes": 1, "type": "uint8", "scale": 1},
"P0_(mbar)": {"bytes": 2, "type": "float16", "scale": 1},
"T0_(C)": {"bytes": 2, "type": "float16", "scale": 1},
"fish_depth": {"bytes": 2, "type": "float16", "scale": 1},
"P_dyn_(mbar)": {"bytes": 2, "type": "float16", "scale": 1},
"T_dyn_(C)": {"bytes": 2, "type": "float16", "scale": 1},
"fish_speed": {"bytes": 2, "type": "float16", "scale": 1},
"duration_read_sensors": {"bytes": 2, "type": "float16", "scale": 1},
"elapsed_time": {"bytes": 2, "type": "float16", "scale": 1},
"setpoint": {"bytes": 2, "type": "float16", "scale": 1},
"pid_value": {"bytes": 2, "type": "float16", "scale": 1},
"vbus_voltage_1": {"bytes": 2, "type": "float16", "scale": 1},
"bat_level_1": {"bytes": 2, "type": "float16", "scale": 1},
"ibus_1": {"bytes": 2, "type": "float16", "scale": 1},
"axis0.pos_estimate_1": {"bytes": 2, "type": "float16", "scale": 1},
"vbus_voltage_2": {"bytes": 2, "type": "float16", "scale": 1},
"bat_level_2": {"bytes": 2, "type": "float16", "scale": 1},
"ibus_2": {"bytes": 2, "type": "float16", "scale": 1},
"axis0.pos_estimate_2": {"bytes": 2, "type": "float16", "scale": 1},
"core_frequ": {"bytes": 2, "type": "float16", "scale": 1},
}
# Decode binary data based on the field format
def decode_binary_data(binary_data, field_format):
offset = 0
decoded_data = {}
for field, specs in field_format.items():
field_bytes = specs["bytes"]
field_type = specs["type"]
field_scale = specs["scale"]
# Extract bytes for the current field
field_data = binary_data[offset : offset + field_bytes]
offset += field_bytes
# Decode based on type
if field_type == "uint8":
value = struct.unpack("<B", field_data)[0]
elif field_type == "uint32":
value = struct.unpack("<I", field_data)[0]
elif field_type == "float16":
value = np.frombuffer(field_data, dtype=np.float16)[0]
else:
raise ValueError(f"Unsupported type: {field_type}")
# Apply scaling
decoded_data[field] = value * field_scale
return decoded_data
# Decode the binary data
decoded_values = decode_binary_data(binary_data, field_format)
decoded_values
# %% COMPARAISON BEFORE AND AFTER TRANSFER
comparison_table = []
for key in message_dict.keys():
print(f"checking: {key}")
original_value = message_dict[key]
decoded_value = decoded_values.get(key, None)
print(f"{original_value = } , {decoded_value =}")
try:
difference = (
abs(original_value - decoded_value) if decoded_value is not None else None
)
comparison_table.append(
{
"Field": key,
"Original Value": original_value,
"Decoded Value": decoded_value,
"Difference": difference,
}
)
except:
print( "fail")
# Convert to DataFrame for better visualization
comparison_df = pd.DataFrame(comparison_table)
# %%
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment