Skip to content

Instantly share code, notes, and snippets.

@Staars
Last active October 7, 2025 12:54
Show Gist options
  • Select an option

  • Save Staars/af9565cee7e6655726efedfe2f9773e5 to your computer and use it in GitHub Desktop.

Select an option

Save Staars/af9565cee7e6655726efedfe2f9773e5 to your computer and use it in GitHub Desktop.
csi tests
class CSI_Driver
var csi_started, type
var packet_count
var stats
def init()
self.csi_started = false
self.packet_count = 0
self.stats = {'raw': 0, 'light': 0, 'unknown': 0, 'errors': 0}
tasmota.add_driver(self)
self.start_csi()
end
def start_csi()
import TFL
self.type = 0 # feature_mode (0=RAW, 1=LIGHTWEIGHT)
var config = bytes(-5)
config[0] = 5 # sample_rate (packets/sec)
config[1] = self.type
config[2] = 1 # use_quantization
config[3] = 1 # training_mode (enable data output)
config[4] = 4 # max_invocations
self.csi_started = TFL.begin("CSI", config)
print("CSI started:", self.csi_started)
end
def every_100ms()
if !self.csi_started return end
import TFL
var data
var count = 0
while (data := TFL.log()) && count < 5
self.packet_count += 1
count += 1
self.process_data(bytes().fromhex(data))
end
end
def process_data(bin_data)
var data_size = size(bin_data)
if data_size < 9
self.stats['errors'] += 1
print("ERROR: Packet too short:", data_size)
return
end
if self.type == 0
self.stats['raw'] += 1
self.parse_raw_packet(bin_data)
elif self.type == 1
self.stats['light'] += 1
self.parse_lightweight_packet(bin_data)
else
self.stats['unknown'] += 1
print("UNKNOWN type:", type)
end
end
def parse_raw_packet(data)
# Use native bytes functions (little-endian by default)
var timestamp = data.get(0, 4)
var rssi = data.geti(4, 1)
var noise_floor = data.geti(5, 1)
var data_len = data.get(6, 2)
if size(data) < 8 + data_len
self.stats['errors'] += 1
print(format("TRUNCATED: expected %d got %d", 8 + data_len, size(data)))
return
end
# Extract CSI data slice
var csi_data = data[8 .. 7 + data_len]
# Preview first 16 bytes
var preview_end = (data_len < 16) ? data_len : 16
var preview = data[8 .. 7 + preview_end].tohex()
print(format("RAW[%d] T:%d RSSI:%d NF:%d Len:%d CSI:%s",
self.stats['raw'], timestamp, rssi, noise_floor, data_len, preview))
end
def parse_lightweight_packet(data)
if size(data) < 34
self.stats['errors'] += 1
print("LIGHT packet too short:", size(data))
return
end
# Use native bytes functions for all fields
var timestamp = data.get(0, 4)
var rssi = data.geti(4, 1)
var noise_floor = data.geti(5, 1)
# Statistical features (scaled by 1000, little-endian signed)
var mean = data.geti(6, 4) / 1000.0
var std_dev = data.geti(10, 4) / 1000.0
var min_val = data.geti(14, 1)
var max_val = data.geti(15, 1)
var energy = data.geti(16, 4)
var mad = data.geti(20, 4)
var zero_crossings = data.get(24, 2)
var skewness = data.geti(26, 4) / 1000.0
var kurtosis = data.geti(30, 4) / 1000.0
print(format("LIGHT[%d] T:%d RSSI:%d NF:%d | μ:%.2f σ:%.2f | [%d,%d] E:%d MAD:%d ZC:%d | Skew:%.2f Kurt:%.2f",
self.stats['light'], timestamp, rssi, noise_floor,
mean, std_dev, min_val, max_val, energy, mad, zero_crossings,
skewness, kurtosis))
end
end
var csi_driver = CSI_Driver()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment