Skip to content

Instantly share code, notes, and snippets.

@sloonz
Created November 22, 2020 19:41
Show Gist options
  • Save sloonz/32e69bd7b11e1a245329ac38056b4a22 to your computer and use it in GitHub Desktop.
Save sloonz/32e69bd7b11e1a245329ac38056b4a22 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
"""
MDB_page_header:
pgno: size_t
keysize: uint16_t // for LEAF2 pages, unused otherwise
flags: uint16_t // 1 = BRANCH, 2 = LEAF, 4 = OVERFLOW, 8 = META, 16 = DIRTY, 32 = LEAF2, 64 = SUBP, 0x4000 = LOOSE, 0x8000 = KEEP
union {
{ lower, upper: uint16_t }
pages: uint32_t // overflow pages
}
MDB_node_header:
// Branch node: PGNO(node) = lo + hi << 16 + flags << 32
// Leaf node: DATASZ(node) = lo + hi << 16
// KEYSZ(node) = ksize
lo, hi: uint16_t
flags: uint16_t
ksize: uint16_t
MDB_db:
pad: uint32 // = ksize for LEAF2
flags: uint16 // 2 = MDB_REVERSEKEY, 4 = MDB_DUPSORT, 8 = MDB_INTEGERKEY, 0x10 = MDB_DUPFIXED, 0x20 = MDB_INTEGERDUP, MDB_REVERSEDUP = 0x40
depth: uint16
branch_pages: uint64
leaf_pages: uint64
overflow_pages: uint64
entries: uint64
root: uint64
MDB_meta:
magic: uint32 = 0xBEEFC0DE
version: uint32 = 1
address: void*
mapsize: uint64
dbs: MDB_db[2] // 0 = FREE_DBI, 1 = MAIN_DBI
// psize = dbs[FREE_DBI].pad = 4096
// flags = dbs[FREE_DBI].flags
lastpg: uint64
txnid: uint64
"""
import collections
import struct
class Flags:
def __init__(self, flags_list, actual_flags):
self.flags_list = flags_list
self.actual_flags = actual_flags
def __repr__(self):
return "<%d = %s>" % (self.actual_flags, "|".join(name for name, val in self.flags_list.items() if val & self.actual_flags))
PAGE_HEADER_SIZE = 16
NODE_HEADER_SIZE = 8
PAGE_SIZE = 4096
PAGE_FLAGS = {
"BRANCH": 0x01,
"LEAF": 0x02,
"OVERFLOW": 0x04,
"META": 0x08,
"DIRTY": 0x10,
"LEAF2": 0x20,
"SUBP": 0x40,
"LOOSE": 0x4000,
"KEEP": 0x8000,
}
DB_FLAGS = {
"REVERSEKEY": 0x02,
"DUPSORT": 0x04,
"INTEGERKEY": 0x08,
"DUPFIXED": 0x10,
"INTEGERDUP": 0x20,
"REVERSEDUP": 0x40,
}
NODE_FLAGS = {
"BIGDATA": 0x01,
"SUBDATA": 0x02,
"DUPDATA": 0x04,
}
Page = collections.namedtuple("Page", ["pgno", "keysize", "flags", "lower", "upper", "pages", "data"])
DB = collections.namedtuple("DB", ["pad", "flags", "depth", "branch_pages", "leaf_pages", "overflow_pages", "entries", "root"])
Meta = collections.namedtuple("Meta", ["mapsize", "dbs", "lastpg", "txnid"])
BranchNode = collections.namedtuple("BranchNode", ["pgno", "key"])
Branch = collections.namedtuple("Branch", ["nodes"])
LeafNode = collections.namedtuple("LeafNode", ["key", "value", "flags", "dsize"])
Leaf = collections.namedtuple("Leaf", ["nodes"])
def read_page(pgno):
fd.seek(pgno*PAGE_SIZE)
res = fd.read(PAGE_SIZE)
assert len(res) == PAGE_SIZE
return res
def decode_page(data):
pgno, keysize, flags, lower, upper = struct.unpack("LHHHH", data[:PAGE_HEADER_SIZE])
pages = struct.unpack("I", data[12:16])[0]
return Page(pgno = pgno, keysize = keysize, flags = Flags(PAGE_FLAGS, flags), lower = lower, upper = upper, pages = pages, data = data[PAGE_HEADER_SIZE:])
def decode_db(data):
pad, flags, depth, branch_pages, leaf_pages, overflow_pages, entries, root = struct.unpack("IHHLLLLL", data[:48])
return DB(pad = pad, flags = Flags(DB_FLAGS, flags), depth = depth, branch_pages = branch_pages, leaf_pages = leaf_pages, overflow_pages = overflow_pages, entries = entries, root = root)
def decode_meta(page):
magic, version, addr, mapsize = struct.unpack("IILL", page.data[:24])
dbs = [decode_db(page.data[24:72]), decode_db(page.data[72:120])]
lastpg, txnid = struct.unpack("LL", page.data[120:136])
assert magic == 0xbeefc0de
assert version == 1
assert addr == 0
return Meta(mapsize = mapsize, dbs = dbs, lastpg = lastpg, txnid = txnid)
def decode_branch(page):
keys = (page.lower - PAGE_HEADER_SIZE) // 2
offsets = struct.unpack("H"*keys, page.data[:2*keys])
nodes = []
for off in offsets:
node_data = page.data[off-PAGE_HEADER_SIZE:]
lo, hi, flags, ksize = struct.unpack("HHHH", node_data[:NODE_HEADER_SIZE])
pgno = lo + (hi << 16) + (flags << 32)
key = node_data[NODE_HEADER_SIZE:NODE_HEADER_SIZE+ksize]
nodes.append(BranchNode(pgno = pgno, key = key))
return Branch(nodes)
def decode_leaf(page):
keys = (page.lower - PAGE_HEADER_SIZE) // 2
offsets = struct.unpack("H"*keys, page.data[:2*keys])
nodes = []
for off in offsets:
node_data = page.data[off-PAGE_HEADER_SIZE:]
lo, hi, flags, ksize = struct.unpack("HHHH", node_data[:NODE_HEADER_SIZE])
dsize = lo + (hi << 16)
key = node_data[NODE_HEADER_SIZE:NODE_HEADER_SIZE+ksize]
if flags & NODE_FLAGS["BIGDATA"]:
value = struct.unpack("L", node_data[NODE_HEADER_SIZE+ksize:NODE_HEADER_SIZE+ksize+8])[0]
else:
value = node_data[NODE_HEADER_SIZE+ksize:NODE_HEADER_SIZE+ksize+dsize]
if flags & NODE_FLAGS["SUBDATA"]:
value = decode_db(value)
nodes.append(LeafNode(key=key, value=value, dsize=dsize, flags=Flags(NODE_FLAGS, flags)))
return Leaf(nodes)
def read_overflow_node(node):
overflow_page = decode_page(read_page(node.value))
overflow_data = b"".join([overflow_page.data] + [read_page(node.value + i + 1) for i in range(overflow_page.pages - 1)])
return overflow_data[:node.dsize]
fd = open("/tmp/test.mdb/data.mdb", "rb")
metas = [decode_meta(decode_page(read_page(i))) for i in (0, 1)]
meta = metas[metas[1].txnid > metas[0].txnid]
root_page = decode_page(read_page(meta.dbs[1].root))
print(root_page)
#print(decode_branch(root_page))
#print(decode_leaf(decode_page(read_page(3))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment