Skip to content

Instantly share code, notes, and snippets.

@jamesu
Created March 15, 2026 20:34
Show Gist options
  • Select an option

  • Save jamesu/bec2ed4224c1933a9bf69d26709756c8 to your computer and use it in GitHub Desktop.

Select an option

Save jamesu/bec2ed4224c1933a9bf69d26709756c8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
#
# DevonThink database decoder
#
# Stuck on a desert island with no internet, unable to load your meticulously categorized DevonThink database?
# This could be come in handy!
#
# This code should probably be considered in the public domain since it is basically just vibe coded with a load of prompts,
# though feel free to fork and vibe code your own enhancements!
# Lovingly vibe coded 100% in ChatGPT.
#
from __future__ import annotations
import argparse, json, tempfile, zipfile, struct, datetime
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Set
STUB = bytes.fromhex('011401050000000000003f00000000400000000000')
OBJ_TAG=b'\x18\x10'; STRING_TAG=b'\x19\x0e'; UP_TAG=b'\x14\x07'; CHILD_TAG=b'\x15\x07'; SCALAR_TAG=b'\x1a\x06'
BUILTIN_SMART = {'Duplicate','Duplicates','All PDF Documents','All Image','All Images'}
EPOCH = datetime.datetime(2001,1,1)
TAGS_ROOT_WRAPPER_ID = 10
@dataclass
class Obj:
page: str
offset: int
obj_id: int
flags: int
meta: int
text: Optional[str]
up: List[int]
children: List[int]
scalar: Optional[int]
raw_len: int
chunk: bytes
def maybe_extract(path: Path):
if path.is_file() and path.suffix.lower() == '.zip':
td = tempfile.TemporaryDirectory(prefix='dtbase2_')
with zipfile.ZipFile(path) as z:
z.extractall(td.name)
return Path(td.name), td
return path, None
def find_dbs(root: Path) -> List[Path]:
if root.is_dir() and root.name.endswith('.dtBase2'):
return [root]
out = sorted(p for p in root.rglob('*.dtBase2') if p.is_dir())
return [p for p in out if '/__MACOSX/' not in str(p) and not p.name.startswith('._')]
def parse_string_field(chunk: bytes, pos: int) -> Tuple[Optional[str], int]:
if pos + 16 <= len(chunk):
ln = int.from_bytes(chunk[pos+6:pos+8], 'big')
ln2 = int.from_bytes(chunk[pos+8:pos+12], 'big')
if 0 < ln < 8192 and ln == ln2 and pos + 16 + ln <= len(chunk):
s = chunk[pos+16:pos+16+ln]
if all(32 <= b < 127 for b in s):
return s.decode('ascii', 'replace'), pos + 16 + ln
return None, pos+2
def parse_up_ref_field(chunk: bytes, pos: int) -> Tuple[List[int], int]:
# Common compact-object ref field: 3-byte IDs after a 4-byte mode
if pos + 10 > len(chunk):
return [], len(chunk)
count = int.from_bytes(chunk[pos+2:pos+6], 'big')
end = pos + 10 + 3*count
if 0 <= count < 2048 and end <= len(chunk):
refs = [int.from_bytes(chunk[pos+10+i*3:pos+13+i*3], 'big') for i in range(count)]
return refs, end
return [], pos+2
def parse_child_ref_field(chunk: bytes, pos: int) -> Tuple[List[int], int]:
# Common compact-object child field: 4-byte IDs after a 4-byte mode
if pos + 10 > len(chunk):
return [], len(chunk)
count = int.from_bytes(chunk[pos+2:pos+6], 'big')
end = pos + 9 + 4*count
if 0 <= count < 2048 and end <= len(chunk):
refs = [int.from_bytes(chunk[pos+9+i*4:pos+13+i*4], 'big') for i in range(count)]
return refs, end
return [], pos+2
def parse_tail_ref4_field(chunk: bytes, pos: int) -> Tuple[List[int], int]:
# Rich page-1 tail fields appear to store 4-byte IDs for both UP and CHILD refs
if pos + 10 > len(chunk):
return [], len(chunk)
count = int.from_bytes(chunk[pos+2:pos+6], 'big')
end = pos + 9 + 4*count
if 0 <= count < 2048 and end <= len(chunk):
refs = [int.from_bytes(chunk[pos+9+i*4:pos+13+i*4], 'big') for i in range(count)]
return refs, end
return [], pos+2
def parse_scalar_field(chunk: bytes, pos: int) -> Tuple[Optional[int], int]:
if pos + 16 <= len(chunk):
return int.from_bytes(chunk[pos+8:pos+16], 'big'), pos+16
return None, pos+2
PAGE_HEADER_LEN = len(STUB)
OBJ_HEADER_LEN = 18
def parse_page(path: Path) -> List[Obj]:
data = path.read_bytes()
if len(data) <= PAGE_HEADER_LEN or data == STUB:
return []
# Empirically, populated pages have a fixed 22-byte page header and the
# first object begins immediately after it. Object bodies are variable
# length; there is no evidence of a fixed record size.
out: List[Obj] = []
pos = PAGE_HEADER_LEN
while pos + OBJ_HEADER_LEN <= len(data):
if data[pos:pos+2] != OBJ_TAG:
# Resync conservatively rather than scanning the whole page up front.
nxt = data.find(OBJ_TAG, pos + 1)
if nxt < 0:
break
pos = nxt
continue
start = pos
flags = int.from_bytes(data[pos+2:pos+6], 'big')
obj_id = int.from_bytes(data[pos+6:pos+10], 'big')
meta = int.from_bytes(data[pos+10:pos+18], 'big')
if obj_id <= 0 or obj_id > 10_000_000:
nxt = data.find(OBJ_TAG, pos + 2)
if nxt < 0:
break
pos = nxt
continue
p = pos + OBJ_HEADER_LEN
text: Optional[str] = None
up: List[int] = []
children: List[int] = []
scalar: Optional[int] = None
while p + 2 <= len(data):
if data[p:p+2] == OBJ_TAG:
break
tag = data[p:p+2]
if tag == STRING_TAG:
s, p2 = parse_string_field(data, p)
if p2 != p + 2:
if s is not None:
text = s
p = p2
continue
elif tag == UP_TAG:
refs, p2 = parse_up_ref_field(data, p)
if p2 != p + 2:
up.extend(refs)
p = p2
continue
elif tag == CHILD_TAG:
refs, p2 = parse_child_ref_field(data, p)
if p2 != p + 2:
children.extend(refs)
p = p2
continue
elif tag == SCALAR_TAG:
val, p2 = parse_scalar_field(data, p)
if p2 != p + 2:
scalar = val
p = p2
continue
p += 1
if p <= start:
break
chunk = data[start:p]
out.append(Obj(path.name, start, obj_id, flags, meta, text, up, children, scalar, p-start, chunk))
pos = p
return out
def decode_utf16be_text(blob: bytes) -> str:
try:
return blob.decode('utf-16-be', 'replace')
except Exception:
return ''
def parse_wide_prop_from(chunk: bytes, key: bytes, start: int = 0) -> Optional[Tuple[str,int,int]]:
idx = chunk.find(key, start)
if idx < 0 or idx + 8 > len(chunk):
return None
ln = int.from_bytes(chunk[idx+4:idx+8], 'big')
end = idx + 8 + (2*ln)
if ln <= 0 or end > len(chunk):
return None
raw = chunk[idx+8:end]
txt = decode_utf16be_text(raw).rstrip('\x00')
return txt, idx, end
def parse_all_wide_props(chunk: bytes) -> Dict[str, str]:
props: Dict[str, str] = {}
keys = [b'NAME', b'PATH', b'URL ', b'UUID', b'SMGR', b'DBID']
pos = 0
while pos < len(chunk) - 8:
matched = False
for k in keys:
got = parse_wide_prop_from(chunk, k, pos)
if got:
txt, idx, end = got
props[k.decode('ascii').strip()] = txt
pos = end
matched = True
break
if not matched:
pos += 1
return props
def parse_plausible_times(chunk: bytes, limit: int) -> List[str]:
out: List[str] = []
end = min(limit, len(chunk) - 8)
for pos in range(0, end + 1, 8):
try:
d = struct.unpack('>d', chunk[pos:pos+8])[0]
except Exception:
continue
if 6.0e8 < d < 1.1e9:
dt = EPOCH + datetime.timedelta(seconds=d)
if 2010 <= dt.year <= 2038:
out.append(dt.replace(microsecond=0).isoformat(sep=' '))
return out
def pick_size_bytes(chunk: bytes, first_prop_idx: int) -> Optional[int]:
for pos in (108, 104, 112, 116, 100, 120):
if pos + 4 <= min(first_prop_idx, len(chunk)):
v = int.from_bytes(chunk[pos:pos+4], 'big')
if 1 <= v <= (1<<31):
return v
return None
def human_size(n: Optional[int]) -> Optional[str]:
if n is None:
return None
units = ['B','KB','MB','GB','TB']
x = float(n)
i = 0
while x >= 1000 and i < len(units)-1:
x /= 1000.0
i += 1
if i == 0:
return f'{int(x)} {units[i]}'
return f'{x:.1f} {units[i]}'
def parse_tail_refs(chunk: bytes) -> Tuple[List[int], List[int]]:
k = chunk.rfind(b'KSTP')
if k < 0:
return [], []
pos = k + 4
up=[]; children=[]
while pos + 2 <= len(chunk):
tag = chunk[pos:pos+2]
if tag == UP_TAG:
refs, pos2 = parse_tail_ref4_field(chunk, pos)
if pos2 != pos+2:
up.extend(refs); pos = pos2; continue
elif tag == CHILD_TAG:
refs, pos2 = parse_tail_ref4_field(chunk, pos)
if pos2 != pos+2:
children.extend(refs); pos = pos2; continue
pos += 1
return up, children
def parse_rich_records(page1: Path) -> List[Dict[str, Any]]:
records: List[Dict[str, Any]] = []
for obj in parse_page(page1):
props = parse_all_wide_props(obj.chunk)
if not props:
continue
first_prop_idx = min((obj.chunk.find(k) for k in [b'NAME', b'PATH', b'URL ', b'UUID'] if obj.chunk.find(k) >= 0), default=len(obj.chunk))
times = parse_plausible_times(obj.chunk, first_prop_idx)
tail_up, tail_children = parse_tail_refs(obj.chunk)
rec = {
'obj_id': obj.obj_id,
'offset': obj.offset,
'flags': obj.flags,
'meta': obj.meta,
'up': tail_up or obj.up,
'children': tail_children or obj.children,
'props': props,
}
if times:
labels = ['created', 'modified', 'added']
rec['times'] = {labels[i] if i < len(labels) else f'time_{i+1}': t for i, t in enumerate(times[:6])}
size_bytes = pick_size_bytes(obj.chunk, first_prop_idx) if ('PATH' in props or 'URL' in props) else None
if size_bytes is not None:
rec['size_bytes'] = size_bytes
rec['size_human'] = human_size(size_bytes)
records.append(rec)
return records
def resolve_parent(start_id: int, rich_ids: Set[int], wrappers: Dict[int, Obj]) -> Optional[int]:
seen=set(); cur=start_id
while cur and cur not in seen:
seen.add(cur)
if cur == 1:
return 1
if cur == TAGS_ROOT_WRAPPER_ID:
return TAGS_ROOT_WRAPPER_ID
if cur in rich_ids:
return cur
w = wrappers.get(cur)
if not w or not w.up:
return None
cur = w.up[0]
return None
def resolve_to_rich(start_id: int, rich_ids: Set[int], wrappers: Dict[int, Obj]) -> Optional[int]:
seen=set(); cur=start_id
while cur and cur not in seen:
seen.add(cur)
if cur in rich_ids:
return cur
w = wrappers.get(cur)
if not w or not w.up:
return None
cur = w.up[0]
return None
def summarize_db(db: Path) -> Dict[str, Any]:
out: Dict[str, Any] = {'db': db.name, 'pages': {}, 'rich_records': [], 'files_noindex': []}
all_objs: List[Obj] = []
for p in sorted(db.glob('DEVONthink-*.dtMeta')):
data = p.read_bytes()
info = {'size': len(data), 'stub': data == STUB}
if data != STUB:
objs = parse_page(p)
all_objs.extend(objs)
info['objects'] = [{
'id': o.obj_id, 'offset': o.offset, 'flags': o.flags, 'meta': o.meta,
'text': o.text, 'up': o.up, 'children': o.children, 'scalar': o.scalar
} for o in objs]
out['pages'][p.name] = info
page1 = db / 'DEVONthink-1.dtMeta'
rich_records = parse_rich_records(page1) if page1.exists() else []
out['rich_records'] = rich_records
atom_text = {o.obj_id:o.text for o in all_objs if o.text}
rich: Dict[int, Dict[str, Any]] = {}
for rec in rich_records:
props = rec['props']
if 'NAME' not in props:
continue
name = props['NAME']
symbolic=[]
for cid in rec.get('children', []):
if cid in atom_text:
symbolic.append(atom_text[cid])
rich[rec['obj_id']] = {
'obj_id': rec['obj_id'],
'name': name,
'path': props.get('PATH'),
'url': props.get('URL'),
'uuid': props.get('UUID'),
'kind': 'unknown',
'up': rec.get('up', []),
'children': rec.get('children', []),
'symbolic': symbolic,
'times': rec.get('times', {}),
'size_bytes': rec.get('size_bytes'),
'size_human': rec.get('size_human'),
'fields': props,
'tags': [],
}
rich_ids = set(rich)
wrappers = {o.obj_id:o for o in all_objs if o.page=='DEVONthink-1.dtMeta' and o.obj_id not in rich_ids}
parent_of: Dict[int, Optional[int]] = {}
for rid, r in rich.items():
parent = None
if r['up']:
parent = resolve_parent(r['up'][0], rich_ids, wrappers)
parent_of[rid] = parent
for rid, r in rich.items():
name = r['name']
parent = parent_of[rid]
if ('PATH' in r['fields']) or ('URL' in r['fields']):
r['kind'] = 'file'
elif name in BUILTIN_SMART:
r['kind'] = 'smart_group'
elif parent == TAGS_ROOT_WRAPPER_ID:
r['kind'] = 'tag'
else:
r['kind'] = 'group'
for rid, r in rich.items():
if r['kind'] != 'file':
continue
tags: List[str] = []
for ref in r.get('up', [])[1:]:
tr = resolve_to_rich(ref, rich_ids, wrappers)
if tr and tr in rich and rich[tr]['kind'] == 'tag':
tags.append(rich[tr]['name'])
# also support direct child/tag refs if they ever appear that way
for ref in r.get('children', []):
tr = resolve_to_rich(ref, rich_ids, wrappers)
if tr and tr in rich and rich[tr]['kind'] == 'tag':
tags.append(rich[tr]['name'])
seen=set(); dedup=[]
for t in tags:
if t not in seen:
seen.add(t); dedup.append(t)
r['tags'] = dedup
children_of = {rid: [] for rid in rich}
root_children = []
tag_ids = []
for rid, parent in parent_of.items():
if rich[rid]['kind'] == 'tag':
tag_ids.append(rid)
continue
if parent == 1 or parent is None or parent == TAGS_ROOT_WRAPPER_ID:
root_children.append(rid)
elif parent in children_of:
children_of[parent].append(rid)
tag_to_items: Dict[int, List[int]] = {tid: [] for tid in tag_ids}
for rid, r in rich.items():
if r['kind'] != 'file':
continue
for tag_name in r.get('tags', []):
for tid in tag_ids:
if rich[tid]['name'] == tag_name:
tag_to_items[tid].append(rid)
for p in sorted((db / 'Files.noindex').rglob('*')) if (db / 'Files.noindex').exists() else []:
if p.is_file():
out['files_noindex'].append(str(p.relative_to(db)))
out['ui'] = {
'rich': rich,
'children_of': children_of,
'root_children': sorted(set(root_children)),
'tag_ids': sorted(tag_ids, key=lambda tid: rich[tid]['name'].lower()),
'tag_to_items': {str(k): sorted(v, key=lambda rid: rich[rid]['name'].lower()) for k,v in tag_to_items.items()},
}
return out
def render_tree(summary: Dict[str, Any]) -> str:
ui = summary.get('ui', {})
rich = ui.get('rich', {})
children_of = ui.get('children_of', {})
root_children = ui.get('root_children', [])
tag_ids = ui.get('tag_ids', [])
tag_to_items = {int(k): v for k,v in ui.get('tag_to_items', {}).items()}
lines=[f"=== {summary['db']} ===", 'Root/']
def walk(rid: int, indent: int):
r = rich[rid]
if r['kind'] in ('smart_group', 'tag'):
return
nm = r['name']
suffix = '/' if r['kind']=='group' else ''
detail = ''
if r['kind'] == 'file':
extras = []
if r.get('tags'): extras.append('tags=' + ', '.join(r['tags']))
if r.get('url'): extras.append(f"URL={r['url']}")
if r.get('size_human'): extras.append(f"size={r['size_human']}")
if r.get('times', {}).get('created'): extras.append(f"created={r['times']['created']}")
if extras:
detail = ' [' + '; '.join(extras) + ']'
lines.append(' '*indent + f'{nm}{suffix}{detail}')
for child in sorted(children_of.get(rid, []), key=lambda x:(rich[x]['kind']!='group', rich[x]['name'].lower())):
walk(child, indent+1)
for rid in sorted(root_children, key=lambda x:(rich[x]['kind']!='group', rich[x]['name'].lower())):
walk(rid, 1)
lines.append('')
lines.append('Tags/')
for tid in tag_ids:
tag = rich[tid]
lines.append(f" {tag['name']}/")
for rid in tag_to_items.get(tid, []):
lines.append(f" {rich[rid]['name']}")
lines.append('')
lines.append('Files.noindex:')
for p in summary.get('files_noindex', []):
lines.append(f' - {p}')
lines.append('')
lines.append('Items:')
parent_of = {}
for pid, kids in children_of.items():
for kid in kids:
parent_of[kid] = pid
for rid in sorted(rich, key=lambda x:(rich[x]['kind'], rich[x]['name'].lower())):
r = rich[rid]
if r['kind'] == 'smart_group':
continue
if r['kind'] == 'tag':
parent = 'Tags'
else:
parent = 'Root' if rid in root_children else (rich[parent_of[rid]]['name'] if rid in parent_of else None)
bits = [f"[{rid}] {r['kind']}: {r['name']}"]
if r.get('path'): bits.append(f"PATH={r['path']}")
if r.get('url'): bits.append(f"URL={r['url']}")
if r.get('uuid'): bits.append(f"UUID={r['uuid']}")
if parent: bits.append(f"parent={parent}")
if r.get('tags'): bits.append('tags=' + ','.join(r['tags']))
if r.get('times'): bits.extend(f"{k}={v}" for k,v in r['times'].items())
if r.get('size_bytes') is not None: bits.append(f"size={r['size_bytes']} ({r['size_human']})")
if r.get('symbolic'): bits.append('symbolic=' + ','.join(r['symbolic']))
lines.append(' - ' + ' | '.join(bits))
return '\n'.join(lines)
def print_db_summary(summary: Dict[str, Any], full: bool=False):
print(render_tree(summary))
if full:
print('')
populated = [k for k,v in summary['pages'].items() if not v['stub']]
print('Populated pages:', ', '.join(populated) if populated else '(none)')
for page,info in summary['pages'].items():
if info['stub']:
continue
print(f'Page {page}:')
for obj in info.get('objects', []):
print(f" obj {obj['id']}: text={obj['text']!r} up={obj['up']} children={obj['children']} flags={obj['flags']} meta={obj['meta']}")
def main():
ap = argparse.ArgumentParser()
ap.add_argument('path', type=Path)
ap.add_argument('--json', action='store_true')
ap.add_argument('--full', action='store_true')
args = ap.parse_args()
root, td = maybe_extract(args.path)
try:
dbs = find_dbs(root)
if not dbs and root.is_dir() and root.name.endswith('.dtBase2'):
dbs = [root]
summaries = [summarize_db(db) for db in dbs]
if args.json:
print(json.dumps(summaries if len(summaries) != 1 else summaries[0], indent=2))
else:
for i, s in enumerate(summaries):
if i:
print()
print_db_summary(s, full=args.full)
finally:
if td is not None:
td.cleanup()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment