Skip to content

Instantly share code, notes, and snippets.

@Staars
Last active August 28, 2025 08:38
Show Gist options
  • Save Staars/2b006c15b9332bbaa794e0bff23a4517 to your computer and use it in GitHub Desktop.
Save Staars/2b006c15b9332bbaa794e0bff23a4517 to your computer and use it in GitHub Desktop.
NVS parser WIP
class NC # NVS Constants
static page_size = 4096
static entry_size = 32
static item_type = {
0x01: 'uint8_t',
0x11: 'int8_t',
0x02: 'uint16_t',
0x12: 'int16_t',
0x04: 'uint32_t',
0x14: 'int32_t',
0x08: 'uint64_t',
0x18: 'int64_t',
0x21: 'string',
0x41: 'blob',
0x42: 'blob_data',
0x48: 'blob_index',
}
static page_status = {
0xFFFFFFFF: 'Empty',
0xFFFFFFFE: 'Active',
0xFFFFFFFC: 'Full',
0xFFFFFFF8: 'Erasing',
0x00000000: 'Corrupted',
}
static entry_status = {
3: 'Empty', # 0b11
2: 'Written', # 0b10
0: 'Erased', # 0b00
}
end
class NVS_Entry
var buf, offset, state, is_empty, index, metadata, children, key, data, type
var payload_offset, payload_length
def raw()
return self.buf[self.offset .. self.offset + NC.entry_size - 1]
end
def raw_payload()
if self.payload_length > 0
return self.buf[self.payload_offset .. self.payload_offset + self.payload_length - 1]
end
return bytes()
end
def init(index, partition_bytes, entry_offset, entry_state)
self.buf = partition_bytes
self.offset = entry_offset
self.state = entry_state
self.is_empty = true
self.index = index
self.children = [] # kept for compatibility (non-blob multi-span if ever needed)
self.key = nil
self.data = nil
self.payload_offset = 0
self.payload_length = 0
self.metadata = {
"namespace": 0,
"type": nil,
"span": 0,
"chunk_index": 0,
"crc": {
"original": 0,
"computed": 0,
"data_original": 0,
"data_computed": 0,
},
}
# Detect empty entry
var i = 0
while i < NC.entry_size
if self.buf[self.offset + i] != 0xFF
self.is_empty = false
break
end
i += 1
end
if !self.is_empty
var namespace = self.buf[self.offset + 0]
var entry_type = self.buf[self.offset + 1]
var span = self.buf[self.offset + 2]
var chunk_index = self.buf[self.offset + 3]
var crc_val = self.buf[self.offset + 4 .. self.offset + 7]
var key_bytes = self.buf[self.offset + 8 .. self.offset + 23]
var data_bytes = self.buf[self.offset + 24 .. self.offset + 31]
var raw_without_crc = self.buf[self.offset + 0 .. self.offset + 3] + self.buf[self.offset + 8 .. self.offset + 31]
self.type = NC.item_type.find(entry_type, f"0x{entry_type:02x}")
import crc
self.metadata = {
"namespace": namespace,
"type": self.type,
"span": span,
"chunk_index": chunk_index,
"crc": {
"original": crc_val.get(0, 4),
"computed": crc.crc32(0xFFFFFFFF, raw_without_crc),
"data_original": data_bytes[-4 ..].get(0, 4),
"data_computed": 0,
},
}
self.key = self.key_decode(key_bytes)
if self.key != nil
self.data = self.item_convert(entry_type, data_bytes)
end
# For multi-span entries, record payload range (zero-copy)
if span > 1
self.payload_offset = self.offset + NC.entry_size
self.payload_length = (span - 1) * NC.entry_size
end
end
end
def item_convert(i_type, data)
var byte_size_mask = 0x0F
var number_sign_mask = 0xF0
var fixed_entry_length_threshold = 0x20
if NC.item_type.find(i_type) != nil
if i_type < fixed_entry_length_threshold
var sz = i_type & byte_size_mask
var num
try
num = data.get(0, sz)
except ..
log("NVS: Corrupt entry!!!")
return {"value": nil}
end
if (i_type & number_sign_mask) != 0
var bits = 8 * sz
var signbit = 1 << (bits - 1)
var full = 1 << bits
if num & signbit != 0
num = num - full
end
end
return {"value": num}
elif i_type == 0x41 || i_type == 0x48
var sz = data.get(0, 4)
var chunk_count = data[4]
var chunk_start = data[5]
return {
"value": [sz, chunk_count, chunk_start],
"size": sz,
"chunk_count": chunk_count,
"chunk_start": chunk_start
}
elif NC.item_type[i_type] == "string" || NC.item_type[i_type] == "blob_data"
var sz = data.get(0, 2)
var crc_val = data.get(4, 4)
return {"value": [sz, crc_val], "size": sz, "crc": crc_val}
end
end
return {"value": nil}
end
def key_decode(data)
var start = 0
while start < size(data) && data[start] == 0x00
start += 1
end
if start >= size(data)
return nil
end
var decoded = ""
var i = start
while i < size(data)
var b = data[i]
if b == 0x00
break
end
if b < 128
decoded += data[i .. i].asstring()
else
return nil
end
i += 1
end
if size(decoded) == 0
return nil
end
return decoded
end
# Kept for compatibility (useful for non-blob multi-span), but not needed for blob_data
def child_assign(entry)
self.children.push(entry)
end
def compute_crc()
import crc
var t = self.metadata["type"]
var written = (self.state == "Written")
var has_size = self.data != nil && self.data.contains("size") && self.data["size"] != nil
if !written || !has_size
self.metadata["crc"]["data_computed"] = 0
return
end
var size_bytes = self.data["size"]
var is_var = (t == "string") || (t == "blob_data") || (t == "blob") || (t == "blob_index")
if !is_var
self.metadata["crc"]["data_computed"] = 0
return
end
# Prefer zero-copy payload range if present (blob_data)
if t == "blob_data" && self.payload_length > 0
var to_hash = size_bytes
if to_hash > self.payload_length
to_hash = self.payload_length
end
var seed = 0xFFFFFFFF
var pos = self.payload_offset
var remaining = to_hash
var chunk = 256
while remaining > 0
var take = remaining > chunk ? chunk : remaining
var seg = self.buf[pos .. pos + take - 1]
seed = crc.crc32(seed, seg)
pos += take
remaining -= take
end
self.metadata["crc"]["data_computed"] = seed
# Data CRC is stored at the end of the last span entry
var last_entry_off = self.offset + (self.metadata["span"] - 1) * NC.entry_size
self.metadata["crc"]["data_original"] = self.buf[last_entry_off + NC.entry_size - 4 .. last_entry_off + NC.entry_size - 1].get(0, 4)
return
end
# Fallback (legacy): reconstruct small variable data from header + children
var acc = bytes()
if t == "blob_data"
acc = acc .. self.buf[self.offset + 24 .. self.offset + 31]
else
var head_val = self.buf[self.offset + 24 .. self.offset + 31]
if size(head_val) > 8
acc = acc .. head_val[8 .. size(head_val) - 1]
end
end
var j = 0
while j < size(self.children)
var entry = self.children[j]
if entry.metadata["type"] == "blob_data"
acc = acc .. entry.buf[entry.offset + 24 .. entry.offset + 31]
else
acc = acc .. entry.buf[entry.offset + 32 .. entry.offset + 31]
end
j += 1
end
if size_bytes < size(acc)
acc = acc[0 .. size_bytes - 1]
end
self.metadata["crc"]["data_computed"] = crc.crc32(0xFFFFFFFF, acc)
var last_entry = size(self.children) > 0 ? self.children[-1] : self
self.metadata["crc"]["data_original"] = last_entry.buf[last_entry.offset + NC.entry_size - 4 .. last_entry.offset + NC.entry_size - 1].get(0, 4)
end
def is_blob()
var t = self.metadata["type"]
return self.state == "Written" && (t == "blob" || t == "blob_index")
end
def is_blob_data()
var t = self.metadata["type"]
return self.state == "Written" && (t == "blob_data")
end
def blob_key()
return self.key
end
def blob_total_size()
if self.data != nil && self.data.contains("size") && self.data["size"] != nil
return self.data["size"]
end
return 0
end
def blob_expected_chunks()
if self.data != nil && self.data.contains("chunk_count") && self.data["chunk_count"] != nil
return self.data["chunk_count"]
end
return 0
end
end
class NVS_Blob
var key
var total_size
var expected_chunks
var index_entry
var chunks # list of {offset, length, index}
var np # reference to parent NP for access to partition_data
def init(np, k, total_sz, expected)
self.np = np
self.key = k
self.total_size = total_sz
self.expected_chunks = expected
self.index_entry = nil
self.chunks = []
end
# Return the fully assembled blob by reading from partition_data
def get_data(start, length)
# Bubble sort by "index"
var n = self.chunks.size()
var swapped = true
while swapped
swapped = false
var i = 0
while i < n - 1
if self.chunks[i]["index"] > self.chunks[i + 1]["index"]
var tmp = self.chunks[i]
self.chunks[i] = self.chunks[i + 1]
self.chunks[i + 1] = tmp
swapped = true
end
i += 1
end
n -= 1
end
# Default to full blob if both missing OR one missing
if start == nil || length == nil
start = 0
length = self.total_size
end
# Clamp and guard
if start < 0
start = 0
end
if start >= self.total_size || length <= 0
return bytes()
end
var slice_end = start + length
if slice_end > self.total_size
slice_end = self.total_size
end
# Assemble result
var buf = bytes(length)
var assembled_offset = 0
var ci = 0
while ci < self.chunks.size()
var ch = self.chunks[ci]
var cl = ch["length"]
var chunk_blob_start = assembled_offset
var chunk_blob_end = assembled_offset + cl
var os = start
if os < chunk_blob_start
os = chunk_blob_start
end
var oe = slice_end
if oe > chunk_blob_end
oe = chunk_blob_end
end
if os < oe
var offset_into_chunk = os - chunk_blob_start
var copy_len = oe - os
var src_start = ch["offset"] + offset_into_chunk
var src_end = src_start + copy_len - 1
buf = buf .. self.np.partition_data[src_start .. src_end]
end
assembled_offset = chunk_blob_end
if assembled_offset >= slice_end
break
end
ci += 1
end
return buf
end
# Store only offset/length/index, no payload allocation here
def add_chunk_ref(offset, length, index)
self.chunks.push({
"offset": offset,
"length": length,
"index": index
})
end
def set_index(entry)
self.index_entry = entry
end
end
class NVS_Page
var np, offset
var is_empty, header, entries
def init(np, page_offset)
self.np = np
self.offset = page_offset
self.entries = []
# Detect if page is all 0xFF
var empty = true
var i = 0
while i < NC.page_size
if self.np.partition_data[self.offset + i] != 0xFF
empty = false
break
end
i += 1
end
self.is_empty = empty
# Parse header
import crc
var buf = self.np.partition_data
self.header = {
"status": NC.page_status.find(buf.get(self.offset + 0, 4), "Invalid"),
"page_index": buf.get(self.offset + 4, 4),
"version": 256 - buf[self.offset + 8],
"crc": {
"original": buf.get(self.offset + 28, 4),
"computed": crc.crc32(0xFFFFFFFF, buf[self.offset + 4 .. self.offset + 27]),
},
}
if self.is_empty
self.header["crc"]["original"] = nil
self.header["crc"]["computed"] = nil
return
end
# Entry state bitmap (entry #1)
var entry_states = []
var map_off = self.offset + NC.entry_size
var j = 0
while j < NC.entry_size
var byte = buf[map_off + j]
var shift = 0
while shift < 8
var status = NC.entry_status.find((byte >> shift) & 3, "Invalid")
entry_states.push(status)
shift += 2
end
j += 1
end
entry_states = entry_states[0 .. 125] # entries 2..127
# Parse entries
var entry_count = int(NC.page_size / NC.entry_size)
i = 2
while i < entry_count
var entry_off = self.offset + (i * NC.entry_size)
var span_byte = buf[entry_off + 2]
var span = ([0xFF, 0].find(span_byte) != nil) ? 1 : span_byte
var entry_state = entry_states[i - 2]
var entry = NVS_Entry(i - 2, buf, entry_off, entry_state)
self.entries.push(entry)
# For multi-span entries that are NOT blob_data: keep legacy child assign (if you still need it)
if span > 1 && !entry.is_blob_data()
var s = 1
while s < span
var addr = i + s
var idx = addr - 2
if (addr * NC.entry_size) >= NC.page_size
break
end
var child_off = self.offset + (addr * NC.entry_size)
var child_state = entry_states[idx]
var child = NVS_Entry(idx, buf, child_off, child_state)
entry.child_assign(child)
s += 1
end
end
# Blob header/index: ensure blob object exists and update totals if needed
if entry.is_blob()
self.report_blob(entry)
# Inline blob payload inside 'blob' header (small blobs): store as a chunk ref
if entry.metadata["type"] == "blob" && entry.state == "Written"
var key = entry.key
if self.np.blob_map.contains(key)
var blob = self.np.blob_map[key]
var chunk_size = entry.data && entry.data["size"] != nil ? entry.data["size"] : 0
if chunk_size > 0
var inline_off = entry_off + 24
var inline_len = chunk_size
blob.add_chunk_ref(inline_off, inline_len, 0)
end
end
end
end
# Blob data: always register chunk refs zero-copy, creating blob if missing
if entry.is_blob_data() && entry.state == "Written"
# Ensure blob exists (if header/index not parsed yet, create provisional)
if !self.np.blob_map.contains(entry.key)
var provisional_total = entry.blob_total_size() # for blob_data this is chunk size; may be updated later
var provisional_chunks = entry.blob_expected_chunks() # likely 0 for blob_data; updated by blob_index later
var new_blob = NVS_Blob(self.np, entry.key, provisional_total, provisional_chunks)
self.np.blobs.push(new_blob)
self.np.blob_map[entry.key] = new_blob
end
var blob2 = self.np.blob_map[entry.key]
# Compute payload range: (span - 1) payload entries, each NC.entry_size bytes
var payload_offset = entry_off + NC.entry_size
var payload_length = (span - 1) * NC.entry_size
# Clamp to declared size in header for this chunk (avoid trailing padding)
if entry.data && entry.data["size"] != nil && entry.data["size"] < payload_length
payload_length = entry.data["size"]
end
var chunk_index = entry.metadata["chunk_index"] & 127
if payload_length > 0
blob2.add_chunk_ref(payload_offset, payload_length, chunk_index)
end
end
i += span
end
end
def report_blob(entry)
var key = entry.blob_key()
if self.np.blob_map.contains(key)
# Update totals if header/index provides better information
var existing = self.np.blob_map[key]
var new_total = entry.blob_total_size()
var new_expected = entry.blob_expected_chunks()
if new_total != nil && new_total > 0
existing.total_size = new_total
end
if new_expected != nil && new_expected > 0
existing.expected_chunks = new_expected
end
return
end
# Create blob using header/index info
var blob = NVS_Blob(self.np, entry.blob_key(), entry.blob_total_size(), entry.blob_expected_chunks())
self.np.blobs.push(blob)
self.np.blob_map[key] = blob
end
end
# Define the NP class (NVS Partition)
class NP
var name
var partition_data
var blobs
var blob_map
def init(name, partition_bytes)
self.name = name
self.partition_data = partition_bytes
self.blobs = []
self.blob_map = {}
# no pages[] list here — we will create NVS_Page objects as needed
end
# Optional helper to get page count without building page objects
def page_count()
return int(size(self.partition_data) / NC.page_size)
end
end
class NVSInspector
var loglevel
var nvs
var namespaces
var page_index
var phase
var active
var total_pages
def init()
self.loglevel = 1
self.namespaces = {}
self.page_index = 0
self.phase = 0
self.active = false
self.total_pages = 0
print(tasmota.memory())
tasmota.log("NVSInspector loaded. Use 'n <level>' to run.")
end
def stop()
tasmota.remove_driver(self)
end
def read_nvs_partition(bin_to_file)
import partition_core
import flash
var p = partition_core.Partition()
var nvs = nil
log("NVS: will read NVS partition ...")
var start_ms = tasmota.millis()
for slot : p.slots
if slot.label == "nvs"
var nvs_partition = flash.read(slot.start, slot.sz)
var elapsed = tasmota.millis() - start_ms
var part_bytes = size(nvs_partition)
log(f"NVS: flash read took {elapsed} ms for {part_bytes} bytes")
if part_bytes < 12288
log(f"NVS: [WARN] Partition size {part_bytes} bytes is smaller than 12 KiB — may be invalid.")
end
if part_bytes % 4096 != 0
log(f"NVS: [WARN] Partition size {part_bytes} bytes is not a multiple of 4 KiB — may be misaligned.")
end
if bin_to_file
var f = open("nvs.bin", "wb")
if f != nil
f.write(nvs_partition)
f.close()
log("NVS: raw partition dumped to nvs.bin")
else
log("NVS: [ERROR] Failed to open nvs.bin for writing")
end
else
nvs = NP("N", nvs_partition)
end
break
end
end
return nvs
end
def every_100ms()
if !self.active
return
end
if self.phase == 0
self.namespaces = {}
self.page_index = 0
# streaming-friendly: compute page count from raw partition bytes
var part_bytes = size(self.nvs.partition_data)
self.total_pages = int(part_bytes / NC.page_size)
# loglevel 0: stream summary without storing pages
if self.loglevel == 0
var page_count = self.nvs.page_count()
var empty_count = 0
var corrupted_count = 0
var pi = 0
while pi < page_count
var page_off = pi * NC.page_size
var page = NVS_Page(self.nvs, page_off)
if page.header == nil || page.entries == nil
corrupted_count += 1
elif page.is_empty
empty_count += 1
end
page.entries = nil
pi += 1
end
print(f"NVS: Summary — {page_count} pages, {empty_count} empty, {corrupted_count} corrupted")
self.active = false
return
end
self.phase = 1
tasmota.gc()
return
end
if self.phase == 1
if self.page_index < self.total_pages
var page_off = self.page_index * NC.page_size
var page = NVS_Page(self.nvs, page_off)
self._process_page(page, self.loglevel, self.namespaces)
self.page_index += 1
else
self.phase = 2
tasmota.gc()
end
return
end
if self.phase == 2
self._print_summary(self.namespaces, self.nvs.blobs, self.loglevel)
self.active = false
self.phase = 0
tasmota.log("NVSInspector: Dump complete")
tasmota.gc()
print(tasmota.memory())
end
end
def hexdump(blob)
var width = 16
var chunk_size = 256
var total = blob.total_size
var pos = 0
while pos < total
var len = (total - pos) > chunk_size ? chunk_size : (total - pos)
var data = blob.get_data(pos, len)
var local_off = 0
var chunk_output = ""
while local_off < data.size()
var hexline = ""
var i = 0
while i < width
if local_off + i < data.size()
var byte = data[local_off + i]
hexline += f"{byte:02X} "
else
hexline += " " # pad with spaces for missing byte
end
i += 1
end
var raw = data[local_off .. local_off + (i - 1)]
var j = 0
while j < raw.size()
var b = raw[j]
if b < 32 || b > 126
raw[j] = 46 # '.'
end
j += 1
end
var asciiline = raw.asstring()
# Hex section is fixed width, tab cleanly separates ASCII
chunk_output += f"{(pos + local_off):6s} {hexline}\t{asciiline}\n"
local_off += width
end
print(chunk_output)
pos += len
tasmota.gc()
end
end
def _process_page(page, loglevel, namespaces)
if loglevel == 1 && page.is_empty
return
end
var page_log = f"\nPage {page.header['page_index']} - {page.header['status']}\n"
if loglevel >= 2
var hdr_crc_ok = page.header["crc"]["computed"] == page.header["crc"]["original"] ? "OK" : "BAD"
page_log += f" Version: {page.header['version']} Header CRC: {hdr_crc_ok}\n"
end
var ei = 0
while ei < size(page.entries)
var entry = page.entries[ei]
page_log = self._process_entry(entry, page, loglevel, namespaces, page_log)
page.entries[ei] = nil
ei += 1
end
page.entries = nil
print(page_log)
end
def _process_entry(entry, page, loglevel, namespaces, page_log)
if entry.key == nil
return page_log
end
var ns = entry.metadata["namespace"]
self._collect_namespaces(entry, namespaces)
if loglevel == 1 && entry.state != "Written"
return page_log
end
var line = f" #{entry.index}\tKey: {entry.key:16s} Type: {entry.metadata['type']}\tNS: {ns}\tState: {entry.state}\t"
if loglevel >= 2 && entry.metadata["crc"]["data_computed"] != 0
var crc_ok = entry.metadata["crc"]["data_computed"] == entry.metadata["crc"]["data_original"] ? "OK" : "FAIL"
line += " CRC:" + crc_ok
end
page_log += line + "\n"
if loglevel >= 3
page_log += f" RAW: {entry.raw()}\n"
page_log += f" META: {entry.metadata}\n"
page_log += f" DATA: {entry.data}\n"
end
return page_log
end
def _collect_namespaces(entry, namespaces)
if entry.metadata["namespace"] == 0 && entry.metadata["type"] == "uint8_t"
namespaces[entry.data["value"]] = entry.key
end
end
def _print_blobs(blobs, loglevel)
if size(blobs) > 0
print("\nBlobs found:")
var i = 0
while i < size(blobs)
var b = blobs[i]
print(f"Key: {b.key:16s} TotalSize: {b.total_size:8s}\tChunks: {b.expected_chunks}")
if loglevel > 3
tasmota.gc()
print("Hexdump:")
self.hexdump(b)
end
i += 1
tasmota.gc()
end
else
print("\nNo blob entries found.")
end
end
def _print_summary(namespaces, blobs, loglevel)
if loglevel < 1
return
end
if size(namespaces) > 0
print("\nNamespaces found:")
for ns_idx : namespaces.keys()
print(f" Index {ns_idx} -> {namespaces[ns_idx]}")
end
else
print("\nNo namespace entries found.")
end
self._print_blobs(blobs, loglevel)
end
def dump_nvs(loglevel)
self.nvs = nil
tasmota.gc()
self.loglevel = loglevel
self.total_pages = 0
tasmota.log(f"NVSInspector: Starting dump with loglevel {loglevel}")
self.nvs = self.read_nvs_partition(false)
if self.nvs == nil
tasmota.log("NVSInspector: No NVS partition found")
self.active = false
return
end
self.active = true
self.phase = 0
self.page_index = 0
end
end
# Register the driver
var nvs = NVSInspector()
tasmota.add_driver(nvs)
# Command: n <level 0 - 4>
def cmd_n(cmd, idx, payload, payload_json)
var level = 1
if payload != ""
try
level = int(payload)
except ..
tasmota.log("Invalid loglevel, using default 1")
end
end
nvs.dump_nvs(level)
tasmota.resp_cmnd_done()
end
tasmota.add_cmd("n", cmd_n)
# Command: d (dump raw to file: nvs.bin)
def cmd_d(cmd, idx, payload, payload_json)
nvs.read_nvs_partition(true)
tasmota.resp_cmnd_done()
end
tasmota.add_cmd("d", cmd_d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment