Last active
October 7, 2025 12:54
-
-
Save Staars/af9565cee7e6655726efedfe2f9773e5 to your computer and use it in GitHub Desktop.
csi tests
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class CSI_Driver | |
| var csi_started, type | |
| var packet_count | |
| var stats | |
| def init() | |
| self.csi_started = false | |
| self.packet_count = 0 | |
| self.stats = {'raw': 0, 'light': 0, 'unknown': 0, 'errors': 0} | |
| tasmota.add_driver(self) | |
| self.start_csi() | |
| end | |
| def start_csi() | |
| import TFL | |
| self.type = 0 # feature_mode (0=RAW, 1=LIGHTWEIGHT) | |
| var config = bytes(-5) | |
| config[0] = 5 # sample_rate (packets/sec) | |
| config[1] = self.type | |
| config[2] = 1 # use_quantization | |
| config[3] = 1 # training_mode (enable data output) | |
| config[4] = 4 # max_invocations | |
| self.csi_started = TFL.begin("CSI", config) | |
| print("CSI started:", self.csi_started) | |
| end | |
| def every_100ms() | |
| if !self.csi_started return end | |
| import TFL | |
| var data | |
| var count = 0 | |
| while (data := TFL.log()) && count < 5 | |
| self.packet_count += 1 | |
| count += 1 | |
| self.process_data(bytes().fromhex(data)) | |
| end | |
| end | |
| def process_data(bin_data) | |
| var data_size = size(bin_data) | |
| if data_size < 9 | |
| self.stats['errors'] += 1 | |
| print("ERROR: Packet too short:", data_size) | |
| return | |
| end | |
| if self.type == 0 | |
| self.stats['raw'] += 1 | |
| self.parse_raw_packet(bin_data) | |
| elif self.type == 1 | |
| self.stats['light'] += 1 | |
| self.parse_lightweight_packet(bin_data) | |
| else | |
| self.stats['unknown'] += 1 | |
| print("UNKNOWN type:", type) | |
| end | |
| end | |
| def parse_raw_packet(data) | |
| # Use native bytes functions (little-endian by default) | |
| var timestamp = data.get(0, 4) | |
| var rssi = data.geti(4, 1) | |
| var noise_floor = data.geti(5, 1) | |
| var data_len = data.get(6, 2) | |
| if size(data) < 8 + data_len | |
| self.stats['errors'] += 1 | |
| print(format("TRUNCATED: expected %d got %d", 8 + data_len, size(data))) | |
| return | |
| end | |
| # Extract CSI data slice | |
| var csi_data = data[8 .. 7 + data_len] | |
| # Preview first 16 bytes | |
| var preview_end = (data_len < 16) ? data_len : 16 | |
| var preview = data[8 .. 7 + preview_end].tohex() | |
| print(format("RAW[%d] T:%d RSSI:%d NF:%d Len:%d CSI:%s", | |
| self.stats['raw'], timestamp, rssi, noise_floor, data_len, preview)) | |
| end | |
| def parse_lightweight_packet(data) | |
| if size(data) < 34 | |
| self.stats['errors'] += 1 | |
| print("LIGHT packet too short:", size(data)) | |
| return | |
| end | |
| # Use native bytes functions for all fields | |
| var timestamp = data.get(0, 4) | |
| var rssi = data.geti(4, 1) | |
| var noise_floor = data.geti(5, 1) | |
| # Statistical features (scaled by 1000, little-endian signed) | |
| var mean = data.geti(6, 4) / 1000.0 | |
| var std_dev = data.geti(10, 4) / 1000.0 | |
| var min_val = data.geti(14, 1) | |
| var max_val = data.geti(15, 1) | |
| var energy = data.geti(16, 4) | |
| var mad = data.geti(20, 4) | |
| var zero_crossings = data.get(24, 2) | |
| var skewness = data.geti(26, 4) / 1000.0 | |
| var kurtosis = data.geti(30, 4) / 1000.0 | |
| print(format("LIGHT[%d] T:%d RSSI:%d NF:%d | μ:%.2f σ:%.2f | [%d,%d] E:%d MAD:%d ZC:%d | Skew:%.2f Kurt:%.2f", | |
| self.stats['light'], timestamp, rssi, noise_floor, | |
| mean, std_dev, min_val, max_val, energy, mad, zero_crossings, | |
| skewness, kurtosis)) | |
| end | |
| end | |
| var csi_driver = CSI_Driver() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment