Last active
August 28, 2025 08:38
-
-
Save Staars/2b006c15b9332bbaa794e0bff23a4517 to your computer and use it in GitHub Desktop.
NVS parser WIP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class NC # NVS Constants | |
| static page_size = 4096 | |
| static entry_size = 32 | |
| static item_type = { | |
| 0x01: 'uint8_t', | |
| 0x11: 'int8_t', | |
| 0x02: 'uint16_t', | |
| 0x12: 'int16_t', | |
| 0x04: 'uint32_t', | |
| 0x14: 'int32_t', | |
| 0x08: 'uint64_t', | |
| 0x18: 'int64_t', | |
| 0x21: 'string', | |
| 0x41: 'blob', | |
| 0x42: 'blob_data', | |
| 0x48: 'blob_index', | |
| } | |
| static page_status = { | |
| 0xFFFFFFFF: 'Empty', | |
| 0xFFFFFFFE: 'Active', | |
| 0xFFFFFFFC: 'Full', | |
| 0xFFFFFFF8: 'Erasing', | |
| 0x00000000: 'Corrupted', | |
| } | |
| static entry_status = { | |
| 3: 'Empty', # 0b11 | |
| 2: 'Written', # 0b10 | |
| 0: 'Erased', # 0b00 | |
| } | |
| end | |
| class NVS_Entry | |
| var buf, offset, state, is_empty, index, metadata, children, key, data, type | |
| var payload_offset, payload_length | |
| def raw() | |
| return self.buf[self.offset .. self.offset + NC.entry_size - 1] | |
| end | |
| def raw_payload() | |
| if self.payload_length > 0 | |
| return self.buf[self.payload_offset .. self.payload_offset + self.payload_length - 1] | |
| end | |
| return bytes() | |
| end | |
| def init(index, partition_bytes, entry_offset, entry_state) | |
| self.buf = partition_bytes | |
| self.offset = entry_offset | |
| self.state = entry_state | |
| self.is_empty = true | |
| self.index = index | |
| self.children = [] # kept for compatibility (non-blob multi-span if ever needed) | |
| self.key = nil | |
| self.data = nil | |
| self.payload_offset = 0 | |
| self.payload_length = 0 | |
| self.metadata = { | |
| "namespace": 0, | |
| "type": nil, | |
| "span": 0, | |
| "chunk_index": 0, | |
| "crc": { | |
| "original": 0, | |
| "computed": 0, | |
| "data_original": 0, | |
| "data_computed": 0, | |
| }, | |
| } | |
| # Detect empty entry | |
| var i = 0 | |
| while i < NC.entry_size | |
| if self.buf[self.offset + i] != 0xFF | |
| self.is_empty = false | |
| break | |
| end | |
| i += 1 | |
| end | |
| if !self.is_empty | |
| var namespace = self.buf[self.offset + 0] | |
| var entry_type = self.buf[self.offset + 1] | |
| var span = self.buf[self.offset + 2] | |
| var chunk_index = self.buf[self.offset + 3] | |
| var crc_val = self.buf[self.offset + 4 .. self.offset + 7] | |
| var key_bytes = self.buf[self.offset + 8 .. self.offset + 23] | |
| var data_bytes = self.buf[self.offset + 24 .. self.offset + 31] | |
| var raw_without_crc = self.buf[self.offset + 0 .. self.offset + 3] + self.buf[self.offset + 8 .. self.offset + 31] | |
| self.type = NC.item_type.find(entry_type, f"0x{entry_type:02x}") | |
| import crc | |
| self.metadata = { | |
| "namespace": namespace, | |
| "type": self.type, | |
| "span": span, | |
| "chunk_index": chunk_index, | |
| "crc": { | |
| "original": crc_val.get(0, 4), | |
| "computed": crc.crc32(0xFFFFFFFF, raw_without_crc), | |
| "data_original": data_bytes[-4 ..].get(0, 4), | |
| "data_computed": 0, | |
| }, | |
| } | |
| self.key = self.key_decode(key_bytes) | |
| if self.key != nil | |
| self.data = self.item_convert(entry_type, data_bytes) | |
| end | |
| # For multi-span entries, record payload range (zero-copy) | |
| if span > 1 | |
| self.payload_offset = self.offset + NC.entry_size | |
| self.payload_length = (span - 1) * NC.entry_size | |
| end | |
| end | |
| end | |
| def item_convert(i_type, data) | |
| var byte_size_mask = 0x0F | |
| var number_sign_mask = 0xF0 | |
| var fixed_entry_length_threshold = 0x20 | |
| if NC.item_type.find(i_type) != nil | |
| if i_type < fixed_entry_length_threshold | |
| var sz = i_type & byte_size_mask | |
| var num | |
| try | |
| num = data.get(0, sz) | |
| except .. | |
| log("NVS: Corrupt entry!!!") | |
| return {"value": nil} | |
| end | |
| if (i_type & number_sign_mask) != 0 | |
| var bits = 8 * sz | |
| var signbit = 1 << (bits - 1) | |
| var full = 1 << bits | |
| if num & signbit != 0 | |
| num = num - full | |
| end | |
| end | |
| return {"value": num} | |
| elif i_type == 0x41 || i_type == 0x48 | |
| var sz = data.get(0, 4) | |
| var chunk_count = data[4] | |
| var chunk_start = data[5] | |
| return { | |
| "value": [sz, chunk_count, chunk_start], | |
| "size": sz, | |
| "chunk_count": chunk_count, | |
| "chunk_start": chunk_start | |
| } | |
| elif NC.item_type[i_type] == "string" || NC.item_type[i_type] == "blob_data" | |
| var sz = data.get(0, 2) | |
| var crc_val = data.get(4, 4) | |
| return {"value": [sz, crc_val], "size": sz, "crc": crc_val} | |
| end | |
| end | |
| return {"value": nil} | |
| end | |
| def key_decode(data) | |
| var start = 0 | |
| while start < size(data) && data[start] == 0x00 | |
| start += 1 | |
| end | |
| if start >= size(data) | |
| return nil | |
| end | |
| var decoded = "" | |
| var i = start | |
| while i < size(data) | |
| var b = data[i] | |
| if b == 0x00 | |
| break | |
| end | |
| if b < 128 | |
| decoded += data[i .. i].asstring() | |
| else | |
| return nil | |
| end | |
| i += 1 | |
| end | |
| if size(decoded) == 0 | |
| return nil | |
| end | |
| return decoded | |
| end | |
| # Kept for compatibility (useful for non-blob multi-span), but not needed for blob_data | |
| def child_assign(entry) | |
| self.children.push(entry) | |
| end | |
| def compute_crc() | |
| import crc | |
| var t = self.metadata["type"] | |
| var written = (self.state == "Written") | |
| var has_size = self.data != nil && self.data.contains("size") && self.data["size"] != nil | |
| if !written || !has_size | |
| self.metadata["crc"]["data_computed"] = 0 | |
| return | |
| end | |
| var size_bytes = self.data["size"] | |
| var is_var = (t == "string") || (t == "blob_data") || (t == "blob") || (t == "blob_index") | |
| if !is_var | |
| self.metadata["crc"]["data_computed"] = 0 | |
| return | |
| end | |
| # Prefer zero-copy payload range if present (blob_data) | |
| if t == "blob_data" && self.payload_length > 0 | |
| var to_hash = size_bytes | |
| if to_hash > self.payload_length | |
| to_hash = self.payload_length | |
| end | |
| var seed = 0xFFFFFFFF | |
| var pos = self.payload_offset | |
| var remaining = to_hash | |
| var chunk = 256 | |
| while remaining > 0 | |
| var take = remaining > chunk ? chunk : remaining | |
| var seg = self.buf[pos .. pos + take - 1] | |
| seed = crc.crc32(seed, seg) | |
| pos += take | |
| remaining -= take | |
| end | |
| self.metadata["crc"]["data_computed"] = seed | |
| # Data CRC is stored at the end of the last span entry | |
| var last_entry_off = self.offset + (self.metadata["span"] - 1) * NC.entry_size | |
| self.metadata["crc"]["data_original"] = self.buf[last_entry_off + NC.entry_size - 4 .. last_entry_off + NC.entry_size - 1].get(0, 4) | |
| return | |
| end | |
| # Fallback (legacy): reconstruct small variable data from header + children | |
| var acc = bytes() | |
| if t == "blob_data" | |
| acc = acc .. self.buf[self.offset + 24 .. self.offset + 31] | |
| else | |
| var head_val = self.buf[self.offset + 24 .. self.offset + 31] | |
| if size(head_val) > 8 | |
| acc = acc .. head_val[8 .. size(head_val) - 1] | |
| end | |
| end | |
| var j = 0 | |
| while j < size(self.children) | |
| var entry = self.children[j] | |
| if entry.metadata["type"] == "blob_data" | |
| acc = acc .. entry.buf[entry.offset + 24 .. entry.offset + 31] | |
| else | |
| acc = acc .. entry.buf[entry.offset + 32 .. entry.offset + 31] | |
| end | |
| j += 1 | |
| end | |
| if size_bytes < size(acc) | |
| acc = acc[0 .. size_bytes - 1] | |
| end | |
| self.metadata["crc"]["data_computed"] = crc.crc32(0xFFFFFFFF, acc) | |
| var last_entry = size(self.children) > 0 ? self.children[-1] : self | |
| self.metadata["crc"]["data_original"] = last_entry.buf[last_entry.offset + NC.entry_size - 4 .. last_entry.offset + NC.entry_size - 1].get(0, 4) | |
| end | |
| def is_blob() | |
| var t = self.metadata["type"] | |
| return self.state == "Written" && (t == "blob" || t == "blob_index") | |
| end | |
| def is_blob_data() | |
| var t = self.metadata["type"] | |
| return self.state == "Written" && (t == "blob_data") | |
| end | |
| def blob_key() | |
| return self.key | |
| end | |
| def blob_total_size() | |
| if self.data != nil && self.data.contains("size") && self.data["size"] != nil | |
| return self.data["size"] | |
| end | |
| return 0 | |
| end | |
| def blob_expected_chunks() | |
| if self.data != nil && self.data.contains("chunk_count") && self.data["chunk_count"] != nil | |
| return self.data["chunk_count"] | |
| end | |
| return 0 | |
| end | |
| end | |
| class NVS_Blob | |
| var key | |
| var total_size | |
| var expected_chunks | |
| var index_entry | |
| var chunks # list of {offset, length, index} | |
| var np # reference to parent NP for access to partition_data | |
| def init(np, k, total_sz, expected) | |
| self.np = np | |
| self.key = k | |
| self.total_size = total_sz | |
| self.expected_chunks = expected | |
| self.index_entry = nil | |
| self.chunks = [] | |
| end | |
| # Return the fully assembled blob by reading from partition_data | |
| def get_data(start, length) | |
| # Bubble sort by "index" | |
| var n = self.chunks.size() | |
| var swapped = true | |
| while swapped | |
| swapped = false | |
| var i = 0 | |
| while i < n - 1 | |
| if self.chunks[i]["index"] > self.chunks[i + 1]["index"] | |
| var tmp = self.chunks[i] | |
| self.chunks[i] = self.chunks[i + 1] | |
| self.chunks[i + 1] = tmp | |
| swapped = true | |
| end | |
| i += 1 | |
| end | |
| n -= 1 | |
| end | |
| # Default to full blob if both missing OR one missing | |
| if start == nil || length == nil | |
| start = 0 | |
| length = self.total_size | |
| end | |
| # Clamp and guard | |
| if start < 0 | |
| start = 0 | |
| end | |
| if start >= self.total_size || length <= 0 | |
| return bytes() | |
| end | |
| var slice_end = start + length | |
| if slice_end > self.total_size | |
| slice_end = self.total_size | |
| end | |
| # Assemble result | |
| var buf = bytes(length) | |
| var assembled_offset = 0 | |
| var ci = 0 | |
| while ci < self.chunks.size() | |
| var ch = self.chunks[ci] | |
| var cl = ch["length"] | |
| var chunk_blob_start = assembled_offset | |
| var chunk_blob_end = assembled_offset + cl | |
| var os = start | |
| if os < chunk_blob_start | |
| os = chunk_blob_start | |
| end | |
| var oe = slice_end | |
| if oe > chunk_blob_end | |
| oe = chunk_blob_end | |
| end | |
| if os < oe | |
| var offset_into_chunk = os - chunk_blob_start | |
| var copy_len = oe - os | |
| var src_start = ch["offset"] + offset_into_chunk | |
| var src_end = src_start + copy_len - 1 | |
| buf = buf .. self.np.partition_data[src_start .. src_end] | |
| end | |
| assembled_offset = chunk_blob_end | |
| if assembled_offset >= slice_end | |
| break | |
| end | |
| ci += 1 | |
| end | |
| return buf | |
| end | |
| # Store only offset/length/index, no payload allocation here | |
| def add_chunk_ref(offset, length, index) | |
| self.chunks.push({ | |
| "offset": offset, | |
| "length": length, | |
| "index": index | |
| }) | |
| end | |
| def set_index(entry) | |
| self.index_entry = entry | |
| end | |
| end | |
| class NVS_Page | |
| var np, offset | |
| var is_empty, header, entries | |
| def init(np, page_offset) | |
| self.np = np | |
| self.offset = page_offset | |
| self.entries = [] | |
| # Detect if page is all 0xFF | |
| var empty = true | |
| var i = 0 | |
| while i < NC.page_size | |
| if self.np.partition_data[self.offset + i] != 0xFF | |
| empty = false | |
| break | |
| end | |
| i += 1 | |
| end | |
| self.is_empty = empty | |
| # Parse header | |
| import crc | |
| var buf = self.np.partition_data | |
| self.header = { | |
| "status": NC.page_status.find(buf.get(self.offset + 0, 4), "Invalid"), | |
| "page_index": buf.get(self.offset + 4, 4), | |
| "version": 256 - buf[self.offset + 8], | |
| "crc": { | |
| "original": buf.get(self.offset + 28, 4), | |
| "computed": crc.crc32(0xFFFFFFFF, buf[self.offset + 4 .. self.offset + 27]), | |
| }, | |
| } | |
| if self.is_empty | |
| self.header["crc"]["original"] = nil | |
| self.header["crc"]["computed"] = nil | |
| return | |
| end | |
| # Entry state bitmap (entry #1) | |
| var entry_states = [] | |
| var map_off = self.offset + NC.entry_size | |
| var j = 0 | |
| while j < NC.entry_size | |
| var byte = buf[map_off + j] | |
| var shift = 0 | |
| while shift < 8 | |
| var status = NC.entry_status.find((byte >> shift) & 3, "Invalid") | |
| entry_states.push(status) | |
| shift += 2 | |
| end | |
| j += 1 | |
| end | |
| entry_states = entry_states[0 .. 125] # entries 2..127 | |
| # Parse entries | |
| var entry_count = int(NC.page_size / NC.entry_size) | |
| i = 2 | |
| while i < entry_count | |
| var entry_off = self.offset + (i * NC.entry_size) | |
| var span_byte = buf[entry_off + 2] | |
| var span = ([0xFF, 0].find(span_byte) != nil) ? 1 : span_byte | |
| var entry_state = entry_states[i - 2] | |
| var entry = NVS_Entry(i - 2, buf, entry_off, entry_state) | |
| self.entries.push(entry) | |
| # For multi-span entries that are NOT blob_data: keep legacy child assign (if you still need it) | |
| if span > 1 && !entry.is_blob_data() | |
| var s = 1 | |
| while s < span | |
| var addr = i + s | |
| var idx = addr - 2 | |
| if (addr * NC.entry_size) >= NC.page_size | |
| break | |
| end | |
| var child_off = self.offset + (addr * NC.entry_size) | |
| var child_state = entry_states[idx] | |
| var child = NVS_Entry(idx, buf, child_off, child_state) | |
| entry.child_assign(child) | |
| s += 1 | |
| end | |
| end | |
| # Blob header/index: ensure blob object exists and update totals if needed | |
| if entry.is_blob() | |
| self.report_blob(entry) | |
| # Inline blob payload inside 'blob' header (small blobs): store as a chunk ref | |
| if entry.metadata["type"] == "blob" && entry.state == "Written" | |
| var key = entry.key | |
| if self.np.blob_map.contains(key) | |
| var blob = self.np.blob_map[key] | |
| var chunk_size = entry.data && entry.data["size"] != nil ? entry.data["size"] : 0 | |
| if chunk_size > 0 | |
| var inline_off = entry_off + 24 | |
| var inline_len = chunk_size | |
| blob.add_chunk_ref(inline_off, inline_len, 0) | |
| end | |
| end | |
| end | |
| end | |
| # Blob data: always register chunk refs zero-copy, creating blob if missing | |
| if entry.is_blob_data() && entry.state == "Written" | |
| # Ensure blob exists (if header/index not parsed yet, create provisional) | |
| if !self.np.blob_map.contains(entry.key) | |
| var provisional_total = entry.blob_total_size() # for blob_data this is chunk size; may be updated later | |
| var provisional_chunks = entry.blob_expected_chunks() # likely 0 for blob_data; updated by blob_index later | |
| var new_blob = NVS_Blob(self.np, entry.key, provisional_total, provisional_chunks) | |
| self.np.blobs.push(new_blob) | |
| self.np.blob_map[entry.key] = new_blob | |
| end | |
| var blob2 = self.np.blob_map[entry.key] | |
| # Compute payload range: (span - 1) payload entries, each NC.entry_size bytes | |
| var payload_offset = entry_off + NC.entry_size | |
| var payload_length = (span - 1) * NC.entry_size | |
| # Clamp to declared size in header for this chunk (avoid trailing padding) | |
| if entry.data && entry.data["size"] != nil && entry.data["size"] < payload_length | |
| payload_length = entry.data["size"] | |
| end | |
| var chunk_index = entry.metadata["chunk_index"] & 127 | |
| if payload_length > 0 | |
| blob2.add_chunk_ref(payload_offset, payload_length, chunk_index) | |
| end | |
| end | |
| i += span | |
| end | |
| end | |
| def report_blob(entry) | |
| var key = entry.blob_key() | |
| if self.np.blob_map.contains(key) | |
| # Update totals if header/index provides better information | |
| var existing = self.np.blob_map[key] | |
| var new_total = entry.blob_total_size() | |
| var new_expected = entry.blob_expected_chunks() | |
| if new_total != nil && new_total > 0 | |
| existing.total_size = new_total | |
| end | |
| if new_expected != nil && new_expected > 0 | |
| existing.expected_chunks = new_expected | |
| end | |
| return | |
| end | |
| # Create blob using header/index info | |
| var blob = NVS_Blob(self.np, entry.blob_key(), entry.blob_total_size(), entry.blob_expected_chunks()) | |
| self.np.blobs.push(blob) | |
| self.np.blob_map[key] = blob | |
| end | |
| end | |
| # Define the NP class (NVS Partition) | |
| class NP | |
| var name | |
| var partition_data | |
| var blobs | |
| var blob_map | |
| def init(name, partition_bytes) | |
| self.name = name | |
| self.partition_data = partition_bytes | |
| self.blobs = [] | |
| self.blob_map = {} | |
| # no pages[] list here — we will create NVS_Page objects as needed | |
| end | |
| # Optional helper to get page count without building page objects | |
| def page_count() | |
| return int(size(self.partition_data) / NC.page_size) | |
| end | |
| end | |
| class NVSInspector | |
| var loglevel | |
| var nvs | |
| var namespaces | |
| var page_index | |
| var phase | |
| var active | |
| var total_pages | |
| def init() | |
| self.loglevel = 1 | |
| self.namespaces = {} | |
| self.page_index = 0 | |
| self.phase = 0 | |
| self.active = false | |
| self.total_pages = 0 | |
| print(tasmota.memory()) | |
| tasmota.log("NVSInspector loaded. Use 'n <level>' to run.") | |
| end | |
| def stop() | |
| tasmota.remove_driver(self) | |
| end | |
| def read_nvs_partition(bin_to_file) | |
| import partition_core | |
| import flash | |
| var p = partition_core.Partition() | |
| var nvs = nil | |
| log("NVS: will read NVS partition ...") | |
| var start_ms = tasmota.millis() | |
| for slot : p.slots | |
| if slot.label == "nvs" | |
| var nvs_partition = flash.read(slot.start, slot.sz) | |
| var elapsed = tasmota.millis() - start_ms | |
| var part_bytes = size(nvs_partition) | |
| log(f"NVS: flash read took {elapsed} ms for {part_bytes} bytes") | |
| if part_bytes < 12288 | |
| log(f"NVS: [WARN] Partition size {part_bytes} bytes is smaller than 12 KiB — may be invalid.") | |
| end | |
| if part_bytes % 4096 != 0 | |
| log(f"NVS: [WARN] Partition size {part_bytes} bytes is not a multiple of 4 KiB — may be misaligned.") | |
| end | |
| if bin_to_file | |
| var f = open("nvs.bin", "wb") | |
| if f != nil | |
| f.write(nvs_partition) | |
| f.close() | |
| log("NVS: raw partition dumped to nvs.bin") | |
| else | |
| log("NVS: [ERROR] Failed to open nvs.bin for writing") | |
| end | |
| else | |
| nvs = NP("N", nvs_partition) | |
| end | |
| break | |
| end | |
| end | |
| return nvs | |
| end | |
| def every_100ms() | |
| if !self.active | |
| return | |
| end | |
| if self.phase == 0 | |
| self.namespaces = {} | |
| self.page_index = 0 | |
| # streaming-friendly: compute page count from raw partition bytes | |
| var part_bytes = size(self.nvs.partition_data) | |
| self.total_pages = int(part_bytes / NC.page_size) | |
| # loglevel 0: stream summary without storing pages | |
| if self.loglevel == 0 | |
| var page_count = self.nvs.page_count() | |
| var empty_count = 0 | |
| var corrupted_count = 0 | |
| var pi = 0 | |
| while pi < page_count | |
| var page_off = pi * NC.page_size | |
| var page = NVS_Page(self.nvs, page_off) | |
| if page.header == nil || page.entries == nil | |
| corrupted_count += 1 | |
| elif page.is_empty | |
| empty_count += 1 | |
| end | |
| page.entries = nil | |
| pi += 1 | |
| end | |
| print(f"NVS: Summary — {page_count} pages, {empty_count} empty, {corrupted_count} corrupted") | |
| self.active = false | |
| return | |
| end | |
| self.phase = 1 | |
| tasmota.gc() | |
| return | |
| end | |
| if self.phase == 1 | |
| if self.page_index < self.total_pages | |
| var page_off = self.page_index * NC.page_size | |
| var page = NVS_Page(self.nvs, page_off) | |
| self._process_page(page, self.loglevel, self.namespaces) | |
| self.page_index += 1 | |
| else | |
| self.phase = 2 | |
| tasmota.gc() | |
| end | |
| return | |
| end | |
| if self.phase == 2 | |
| self._print_summary(self.namespaces, self.nvs.blobs, self.loglevel) | |
| self.active = false | |
| self.phase = 0 | |
| tasmota.log("NVSInspector: Dump complete") | |
| tasmota.gc() | |
| print(tasmota.memory()) | |
| end | |
| end | |
| def hexdump(blob) | |
| var width = 16 | |
| var chunk_size = 256 | |
| var total = blob.total_size | |
| var pos = 0 | |
| while pos < total | |
| var len = (total - pos) > chunk_size ? chunk_size : (total - pos) | |
| var data = blob.get_data(pos, len) | |
| var local_off = 0 | |
| var chunk_output = "" | |
| while local_off < data.size() | |
| var hexline = "" | |
| var i = 0 | |
| while i < width | |
| if local_off + i < data.size() | |
| var byte = data[local_off + i] | |
| hexline += f"{byte:02X} " | |
| else | |
| hexline += " " # pad with spaces for missing byte | |
| end | |
| i += 1 | |
| end | |
| var raw = data[local_off .. local_off + (i - 1)] | |
| var j = 0 | |
| while j < raw.size() | |
| var b = raw[j] | |
| if b < 32 || b > 126 | |
| raw[j] = 46 # '.' | |
| end | |
| j += 1 | |
| end | |
| var asciiline = raw.asstring() | |
| # Hex section is fixed width, tab cleanly separates ASCII | |
| chunk_output += f"{(pos + local_off):6s} {hexline}\t{asciiline}\n" | |
| local_off += width | |
| end | |
| print(chunk_output) | |
| pos += len | |
| tasmota.gc() | |
| end | |
| end | |
| def _process_page(page, loglevel, namespaces) | |
| if loglevel == 1 && page.is_empty | |
| return | |
| end | |
| var page_log = f"\nPage {page.header['page_index']} - {page.header['status']}\n" | |
| if loglevel >= 2 | |
| var hdr_crc_ok = page.header["crc"]["computed"] == page.header["crc"]["original"] ? "OK" : "BAD" | |
| page_log += f" Version: {page.header['version']} Header CRC: {hdr_crc_ok}\n" | |
| end | |
| var ei = 0 | |
| while ei < size(page.entries) | |
| var entry = page.entries[ei] | |
| page_log = self._process_entry(entry, page, loglevel, namespaces, page_log) | |
| page.entries[ei] = nil | |
| ei += 1 | |
| end | |
| page.entries = nil | |
| print(page_log) | |
| end | |
| def _process_entry(entry, page, loglevel, namespaces, page_log) | |
| if entry.key == nil | |
| return page_log | |
| end | |
| var ns = entry.metadata["namespace"] | |
| self._collect_namespaces(entry, namespaces) | |
| if loglevel == 1 && entry.state != "Written" | |
| return page_log | |
| end | |
| var line = f" #{entry.index}\tKey: {entry.key:16s} Type: {entry.metadata['type']}\tNS: {ns}\tState: {entry.state}\t" | |
| if loglevel >= 2 && entry.metadata["crc"]["data_computed"] != 0 | |
| var crc_ok = entry.metadata["crc"]["data_computed"] == entry.metadata["crc"]["data_original"] ? "OK" : "FAIL" | |
| line += " CRC:" + crc_ok | |
| end | |
| page_log += line + "\n" | |
| if loglevel >= 3 | |
| page_log += f" RAW: {entry.raw()}\n" | |
| page_log += f" META: {entry.metadata}\n" | |
| page_log += f" DATA: {entry.data}\n" | |
| end | |
| return page_log | |
| end | |
| def _collect_namespaces(entry, namespaces) | |
| if entry.metadata["namespace"] == 0 && entry.metadata["type"] == "uint8_t" | |
| namespaces[entry.data["value"]] = entry.key | |
| end | |
| end | |
| def _print_blobs(blobs, loglevel) | |
| if size(blobs) > 0 | |
| print("\nBlobs found:") | |
| var i = 0 | |
| while i < size(blobs) | |
| var b = blobs[i] | |
| print(f"Key: {b.key:16s} TotalSize: {b.total_size:8s}\tChunks: {b.expected_chunks}") | |
| if loglevel > 3 | |
| tasmota.gc() | |
| print("Hexdump:") | |
| self.hexdump(b) | |
| end | |
| i += 1 | |
| tasmota.gc() | |
| end | |
| else | |
| print("\nNo blob entries found.") | |
| end | |
| end | |
| def _print_summary(namespaces, blobs, loglevel) | |
| if loglevel < 1 | |
| return | |
| end | |
| if size(namespaces) > 0 | |
| print("\nNamespaces found:") | |
| for ns_idx : namespaces.keys() | |
| print(f" Index {ns_idx} -> {namespaces[ns_idx]}") | |
| end | |
| else | |
| print("\nNo namespace entries found.") | |
| end | |
| self._print_blobs(blobs, loglevel) | |
| end | |
| def dump_nvs(loglevel) | |
| self.nvs = nil | |
| tasmota.gc() | |
| self.loglevel = loglevel | |
| self.total_pages = 0 | |
| tasmota.log(f"NVSInspector: Starting dump with loglevel {loglevel}") | |
| self.nvs = self.read_nvs_partition(false) | |
| if self.nvs == nil | |
| tasmota.log("NVSInspector: No NVS partition found") | |
| self.active = false | |
| return | |
| end | |
| self.active = true | |
| self.phase = 0 | |
| self.page_index = 0 | |
| end | |
| end | |
| # Register the driver | |
| var nvs = NVSInspector() | |
| tasmota.add_driver(nvs) | |
| # Command: n <level 0 - 4> | |
| def cmd_n(cmd, idx, payload, payload_json) | |
| var level = 1 | |
| if payload != "" | |
| try | |
| level = int(payload) | |
| except .. | |
| tasmota.log("Invalid loglevel, using default 1") | |
| end | |
| end | |
| nvs.dump_nvs(level) | |
| tasmota.resp_cmnd_done() | |
| end | |
| tasmota.add_cmd("n", cmd_n) | |
| # Command: d (dump raw to file: nvs.bin) | |
| def cmd_d(cmd, idx, payload, payload_json) | |
| nvs.read_nvs_partition(true) | |
| tasmota.resp_cmnd_done() | |
| end | |
| tasmota.add_cmd("d", cmd_d) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment