Skip to content

Instantly share code, notes, and snippets.

@fredvol
Last active January 23, 2025 22:32
Show Gist options
  • Save fredvol/3a8e910916e63ca191001a681f4fb0dd to your computer and use it in GitHub Desktop.
Save fredvol/3a8e910916e63ca191001a681f4fb0dd to your computer and use it in GitHub Desktop.
Telemetry packet specifications

LoRa Transmission Packet Definition

This document describes the specs for Fish telemetry data packets

Packet Structure Definition

The packet structure is described using a dictionary that specifies the size, type, scale, and offset for each field. This dictionary ensures consistency during both encoding and decoding processes.

The fields were taken from the current log DATA from the CLI , with and atificial duplication of the Odrive data to simulate 2 motors.

Field Definitions

Field Name Bytes Used Type Scale Offset
header_byte 1 uint8 1 0
timestamp 4 uint32 0.1 1704063600
sensor_used 1 uint8 1 0
axis0.current_state_1 1 uint8 1 0
axis0.current_state_2 1 uint8 1 0
P0_(mbar) 2 float16 1 0
T0_(C) 2 float16 1 0
fish_depth 2 float16 1 0
P_dyn_(mbar) 2 float16 1 0
T_dyn_(C) 2 float16 1 0
fish_speed 2 float16 1 0
duration_read_sensors 2 float16 1 0
elapsed_time 2 float16 1 0
setpoint 2 float16 1 0
pid_value 2 float16 1 0
vbus_voltage_1 2 float16 1 0
bat_level_1 2 float16 1 0
ibus_1 2 float16 1 0
axis0.pos_estimate_1 2 float16 1 0
vbus_voltage_2 2 float16 1 0
bat_level_2 2 float16 1 0
ibus_2 2 float16 1 0
axis0.pos_estimate_2 2 float16 1 0
core_frequ 2 float16 1 0

The total size should be 46 bytes, which is considered as acceptable.

Notes

  1. Headers The header byte is an 8-bit representation that encodes three key metadata fields:

    1. Sender ID (3 bits)
    2. Receiver ID (3 bits)
    3. Message Type (2 bits)

    The header byte is divided into specific bit sections:

    • Bits 0–2: Sender ID (values range from 0 to 7)
    • Bits 3–5: Receiver ID (values range from 0 to 7) assuming that 7 mean "all"
    • Bits 6–7: Message Type (values range from 0 to 3)

    Example: Header Byte Construction

     - **Sender ID** = `1` (binary: `001`)
     - **Receiver ID** = `3` (binary: `011`)
     - **Message Type** = `3` (binary: `11`)
    
     ### Step-by-Step Encoding
    
     1. Sender ID (`001`): Set bits 0–2 → `001`.
     2. Receiver ID (`011`): Shift left 3 bits → `011000`.
     3. Message Type (`11`): Shift left 6 bits → `11000000`.
    
     The **header byte** is an 8-bit value, and each section of the byte is assigned specific bits for encoding metadata:
    
    Bit Position 7 6 5 4 3 2 1 0
    Field Name MsgType MsgType RecID RecID RecID SendID SendID SendID
  2. using float32 for all data fields:

    By simplicity I original try to used float32 for all values , the total packet was 90 bytes, too big. I select to converted to float16 , it seems that all the values fall the the range. if not the case we can play on offset and scale.

  3. Timestamp Conversion:

    Hypothesis:

    • we want at least to have 0.1 seconde resolutions

    • we want to store the date , not only the time of the day

    • no record will be done before the 1 january 2024

    • the data format should at least survive 5 years.

    Failed approched ::

    • Using the unix timestamp with a resolution of 0.1s required 36bits so 5 bytes and do not fit in float32 , even by removing the the sign-bit.

    • only storing hh:mm:ss.s require : 21bits , but do not store the date. adding the day of the year (1-365) require 32bits

    Solution The unix timestamp is converted to 0.1-second precision from the epoch starting 1 January 2024 ( an offset of 1704063600).

## Lora transmition Packet definition ( TELEMETRY)
# This document outlines the process of encoding and decoding telemetry data into a compact binary format suitable for LoRa transmission.
# usefull link : https://evanw.github.io/float-toy/
# This document has 4 parts:
# * Creating a dict that hold the structure
# * the code that encode a Telemetry data dict
# * the code that decode it .
# * A comparaison between the original data dict and the reconstructed one.
# %% import
import struct
from datetime import datetime
import time
import numpy as np
import pandas as pd
# %% INTRO :: General info on the data type
# get the info on float16
print(np.finfo(np.float16))
print("size float16 (bytes): ", np.dtype("float16").itemsize)
print(np.finfo(np.float32))
print("size float16 (bytes): ", np.dtype("float16").itemsize)
print(np.iinfo(np.int32))
print("size float16 (bytes): ", np.dtype("int32").itemsize)
# %% ### PART 1 :: CREATING STRUCTURE DICT
# code to create the dictionarry that hold the structure
# I add some scaling and offset has it might be need later as the maxium range for float16 is 65504.0
# Define float fields
float_fields = [
"P0_(mbar)",
"T0_(C)",
"fish_depth",
"P_dyn_(mbar)",
"T_dyn_(C)",
"fish_speed",
"duration_read_sensors",
"elapsed_time",
"setpoint",
"pid_value",
"vbus_voltage_1",
"bat_level_1",
"ibus_1",
"axis0.pos_estimate_1",
"vbus_voltage_2",
"bat_level_2",
"ibus_2",
"axis0.pos_estimate_2",
"core_frequ",
]
# Create a dictionary to hold the format for all fields in the packet with offsets in seconds
field_format_dict_with_seconds_offset = {
"header_byte": {
"bytes": 1,
"type": "uint8",
"scale": 1,
"offset": 0,
},
"timestamp": {
"bytes": 4,
"type": "uint32",
"scale": 0.1,
"offset": 1704063600,
}, # Offset for timestamp in seconds
"sensor_used": {
"bytes": 1,
"type": "uint8",
"scale": 1,
"offset": 0,
},
"axis0.current_state_1": {
"bytes": 1,
"type": "uint8",
"scale": 1,
"offset": 0,
},
"axis0.current_state_2": {
"bytes": 1,
"type": "uint8",
"scale": 1,
"offset": 0,
},
}
# Add float fields with no offset but with default scale and size
for field in float_fields:
field_format_dict_with_seconds_offset[field] = {
"bytes": 2,
"type": "float16",
"scale": 1,
"offset": 0,
}
# Convert to a DataFrame for better visualization
field_format_df = pd.DataFrame(field_format_dict_with_seconds_offset).T.reset_index()
field_format_df.columns = ["Field Name", "Bytes Used", "Type", "Scale", "Offset"]
print(field_format_df)
# %% #### PART2 :: ENCODING ( using struct.pack and numpy for use Float16)
# extract from fish eye log ( I duplicate Odrive data to simulate 2 Odrives)
message_dict = {
"timestamp": "2024-11-10T15:06:05.329",
"P0_(mbar)": -74.1467161179948,
"T0_(C)": 22.406355721950533,
"fish_depth": 0.0030501921191828972,
"sensor_used": 0,
"P_dyn_(mbar)": 2042.9901137867594,
"T_dyn_(C)": 22.69834464788437,
"fish_speed": 0,
"duration_read_sensors": 0.04412102699279785,
"elapsed_time": 5.428764343261719,
"setpoint": 0.3,
"pid_value": -2.071595431578338,
"vbus_voltage_1": 11.929389953613281,
"bat_level_1": 85.09755452473959,
"ibus_1": 0.08959825336933136,
"axis0.pos_estimate_1": -2.080005645751953,
"axis0.current_state_1": 8,
"vbus_voltage_2": 11.929389953613281,
"bat_level_2": 85.09755452473959,
"ibus_2": 0.08959825336933136,
"axis0.pos_estimate_2": -2.080005645751953,
"axis0.current_state_2": 8,
"errors": 0,
"core_frequ": 15.905589685248389,
}
# # Define sender, receiver, and message type
sender_id = 1 # as exemple
receiver_id = 3 # as exemple
message_type = 3 # as exemple ( Command :0 , status:1,Telemetry :3 etc.)
# Encode the first byte
header_byte = (
(sender_id & 0b111) | ((receiver_id & 0b111) << 3) | ((message_type & 0b11) << 6)
)
# Convert timestamp
timestamp_epoch = datetime.strptime(
message_dict["timestamp"], "%Y-%m-%dT%H:%M:%S.%f"
).timestamp()
start2024 = datetime(2024, 1, 1, 0, 0, 0).timestamp()
time_difference = timestamp_epoch - start2024
timestamp_int32 = int(time_difference * 10)
# Fields to convert to float16
float_fields = [
"P0_(mbar)",
"T0_(C)",
"fish_depth",
"P_dyn_(mbar)",
"T_dyn_(C)",
"fish_speed",
"duration_read_sensors",
"elapsed_time",
"setpoint",
"pid_value",
"vbus_voltage_1",
"bat_level_1",
"ibus_1",
"axis0.pos_estimate_1",
"vbus_voltage_2",
"bat_level_2",
"ibus_2",
"axis0.pos_estimate_2",
"core_frequ",
]
# Convert
float16_values = [np.float16(message_dict[field]).tobytes() for field in float_fields]
# Define the structure format for binary packing
packet_format = (
"<B I B B B" # Header byte, timestamp (unsigned int32), sensor_used, axis states
)
# Pack non-float fields
binary_packet = struct.pack(
packet_format,
header_byte, # Header byte
timestamp_int32, # Timestamp
int(message_dict["sensor_used"]), # Sensor used
int(message_dict["axis0.current_state_1"]), # Current state 1
int(message_dict["axis0.current_state_2"]), # Current state 2
)
# Append all float16 fields
for float16 in float16_values:
binary_packet += float16
# Output the binary packet size and content
print(f"Binary Packet Size: {len(binary_packet)} bytes")
print(f"Binary Packet: {binary_packet}")
## Binary Packet Size: 46 bytes = acceptable
# %%
#### PART 3 :: DECODE BINARY
## to check how the value is decoded
# binary_data = b"\xd9\xcd\xbd0g\x1eK\x94\xc27@\xb3A\xbb\xe5G;\x00\xaf_\xffD6\x96\xb5A\x00\x00\x00\x00@\xb84=p\xb8\xad@\x9a\x99\x99>\x05\x95\x04\xc0\x00\x00\x00A\xc8\xde>A\xf31\xaaBJ\x7f\xb7=\xd0\x1e\x05\xc0\x00\x00\x00A\xc8\xde>A\xf31\xaaBJ\x7f\xb7=\xd0\x1e\x05\xc0L}~A"
binary_data = binary_packet
# Field dict format from the code above
field_format = {
"header_byte": {"bytes": 1, "type": "uint8", "scale": 1, "offset": 0},
"timestamp": {"bytes": 4, "type": "uint32", "scale": 0.1, "offset": 1704063600},
"sensor_used": {"bytes": 1, "type": "uint8", "scale": 1, "offset": 0},
"axis0.current_state_1": {"bytes": 1, "type": "uint8", "scale": 1, "offset": 0},
"axis0.current_state_2": {"bytes": 1, "type": "uint8", "scale": 1, "offset": 0},
"P0_(mbar)": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"T0_(C)": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"fish_depth": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"P_dyn_(mbar)": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"T_dyn_(C)": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"fish_speed": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"duration_read_sensors": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"elapsed_time": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"setpoint": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"pid_value": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"vbus_voltage_1": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"bat_level_1": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"ibus_1": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"axis0.pos_estimate_1": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"vbus_voltage_2": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"bat_level_2": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"ibus_2": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"axis0.pos_estimate_2": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
"core_frequ": {"bytes": 2, "type": "float16", "scale": 1, "offset": 0},
}
# Decode binary data based on the field format
def decode_binary_data(binary_data, field_format):
offset = 0
decoded_data = {}
for field, specs in field_format.items():
field_bytes = specs["bytes"]
field_type = specs["type"]
field_scale = specs["scale"]
field_offset = specs["offset"]
# Extract bytes for the current field
field_data = binary_data[offset : offset + field_bytes]
offset += field_bytes
# Decode based on type
if field_type == "uint8":
value = struct.unpack("<B", field_data)[0]
elif field_type == "uint32":
value = struct.unpack("<I", field_data)[0]
elif field_type == "float16":
value = np.frombuffer(field_data, dtype=np.float16)[0]
else:
raise ValueError(f"Unsupported type: {field_type}")
# Apply scaling
decoded_data[field] = value * field_scale + field_offset
return decoded_data
# Decode the binary data
decoded_values = decode_binary_data(binary_data, field_format)
decoded_values
# %% PART 4 :: COMPARAISON BEFORE AND AFTER TRANSFER
comparison_table = []
for key in message_dict.keys():
print(f"checking: {key}")
original_value = message_dict[key]
decoded_value = decoded_values.get(key, None)
print(f"{original_value = } , {decoded_value =}")
try:
difference = (
abs(original_value - decoded_value) if decoded_value is not None else None
)
comparison_table.append(
{
"Field": key,
"Original Value": original_value,
"Decoded Value": decoded_value,
"Difference": difference,
}
)
except:
print("fail")
# Convert to DataFrame for better visualization
comparison_df = pd.DataFrame(comparison_table)
# %% ARCHIVE
# %%
#### FIRST APPROACH ( the simplest using fully struct.pack)
# it fail because the size is too big
# # Sample message_dict
# message_dict = {
# "timestamp": "2024-11-10T15:06:05.029",
# "P0_(mbar)": -74.1467161179948,
# "T0_(C)": 22.406355721950533,
# "fish_depth": 0.0030501921191828972,
# "sensor_used": 0,
# "P_dyn_(mbar)": 2042.9901137867594,
# "T_dyn_(C)": 22.69834464788437,
# "fish_speed": 0,
# "duration_read_sensors": 0.04412102699279785,
# "elapsed_time": 5.428764343261719,
# "setpoint": 0.3,
# "pid_value": -2.071595431578338,
# "vbus_voltage_1": 11.929389953613281,
# "bat_level_1": 85.09755452473959,
# "ibus_1": 0.08959825336933136,
# "axis0.pos_estimate_1": -2.080005645751953,
# "axis0.current_state_1": 8,
# "vbus_voltage_2": 11.929389953613281,
# "bat_level_2": 85.09755452473959,
# "ibus_2": 0.08959825336933136,
# "axis0.pos_estimate_2": -2.080005645751953,
# "axis0.current_state_2": 8,
# "errors": 0,
# "core_frequ": 15.905589685248389,
# }
# # Define sender, receiver, and message type
# sender_id = 1 # as exemple
# receiver_id = 3 # as exemple
# message_type = 3 # as exemple ( Command :0 , status:1,Telemetry :3 etc.)
# # Encode the first byte
# header_byte = (
# (sender_id & 0b111) | ((receiver_id & 0b111) << 3) | ((message_type & 0b11) << 6)
# )
# # Convert timestamp to epoch ( we might need more precision than 1 s)
# timestamp_epoch = int(
# time.mktime(
# datetime.strptime(message_dict["timestamp"], "%Y-%m-%dT%H:%M:%S.%f").timetuple()
# )
# )
# # Define the structure format for binary packing
# packet_format = "<B I f f f B f f f f f f f f f f f f f f f f f f"
# # Prepare all the values
# field_values = [
# int(header_byte), # Header byte
# int(timestamp_epoch), # Timestamp as an unsigned integer
# float(message_dict["P0_(mbar)"]), # Float
# float(message_dict["T0_(C)"]), # Float
# float(message_dict["fish_depth"]), # Float
# int(message_dict["sensor_used"]), # Byte
# float(message_dict["P_dyn_(mbar)"]), # Float
# float(message_dict["T_dyn_(C)"]), # Float
# float(message_dict["fish_speed"]), # Float
# float(message_dict["duration_read_sensors"]), # Float
# float(message_dict["elapsed_time"]), # Float
# float(message_dict["setpoint"]), # Float
# float(message_dict["pid_value"]), # Float
# int(message_dict["axis0.current_state_1"]), # Byte
# float(message_dict["vbus_voltage_1"]), # Float
# float(message_dict["bat_level_1"]), # Float
# float(message_dict["ibus_1"]), # Float
# float(message_dict["axis0.pos_estimate_1"]), # Float
# int(message_dict["axis0.current_state_2"]), # Byte
# float(message_dict["vbus_voltage_2"]), # Float
# float(message_dict["bat_level_2"]), # Float
# float(message_dict["ibus_2"]), # Float
# float(message_dict["axis0.pos_estimate_2"]), # Float
# float(message_dict["core_frequ"]), # Float
# ]
# # Making the packet
# binary_packet = struct.pack(packet_format, *field_values)
# # Outputs
# print(f"Binary Packet Size: {len(binary_packet)} bytes")
# print(f"Binary Packet: {binary_packet}")
# struct.calcsize(packet_format)
## Conclusion the packet is too big : 90 bytes
## to try to reduced to float16 isntead of float32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment