Created
March 15, 2026 20:34
-
-
Save jamesu/bec2ed4224c1933a9bf69d26709756c8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # | |
| # DevonThink database decoder | |
| # | |
| # Stuck on a desert island with no internet, unable to load your meticulously categorized DevonThink database? | |
| # This could be come in handy! | |
| # | |
| # This code should probably be considered in the public domain since it is basically just vibe coded with a load of prompts, | |
| # though feel free to fork and vibe code your own enhancements! | |
| # Lovingly vibe coded 100% in ChatGPT. | |
| # | |
| from __future__ import annotations | |
| import argparse, json, tempfile, zipfile, struct, datetime | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Any, Set | |
| STUB = bytes.fromhex('011401050000000000003f00000000400000000000') | |
| OBJ_TAG=b'\x18\x10'; STRING_TAG=b'\x19\x0e'; UP_TAG=b'\x14\x07'; CHILD_TAG=b'\x15\x07'; SCALAR_TAG=b'\x1a\x06' | |
| BUILTIN_SMART = {'Duplicate','Duplicates','All PDF Documents','All Image','All Images'} | |
| EPOCH = datetime.datetime(2001,1,1) | |
| TAGS_ROOT_WRAPPER_ID = 10 | |
| @dataclass | |
| class Obj: | |
| page: str | |
| offset: int | |
| obj_id: int | |
| flags: int | |
| meta: int | |
| text: Optional[str] | |
| up: List[int] | |
| children: List[int] | |
| scalar: Optional[int] | |
| raw_len: int | |
| chunk: bytes | |
| def maybe_extract(path: Path): | |
| if path.is_file() and path.suffix.lower() == '.zip': | |
| td = tempfile.TemporaryDirectory(prefix='dtbase2_') | |
| with zipfile.ZipFile(path) as z: | |
| z.extractall(td.name) | |
| return Path(td.name), td | |
| return path, None | |
| def find_dbs(root: Path) -> List[Path]: | |
| if root.is_dir() and root.name.endswith('.dtBase2'): | |
| return [root] | |
| out = sorted(p for p in root.rglob('*.dtBase2') if p.is_dir()) | |
| return [p for p in out if '/__MACOSX/' not in str(p) and not p.name.startswith('._')] | |
| def parse_string_field(chunk: bytes, pos: int) -> Tuple[Optional[str], int]: | |
| if pos + 16 <= len(chunk): | |
| ln = int.from_bytes(chunk[pos+6:pos+8], 'big') | |
| ln2 = int.from_bytes(chunk[pos+8:pos+12], 'big') | |
| if 0 < ln < 8192 and ln == ln2 and pos + 16 + ln <= len(chunk): | |
| s = chunk[pos+16:pos+16+ln] | |
| if all(32 <= b < 127 for b in s): | |
| return s.decode('ascii', 'replace'), pos + 16 + ln | |
| return None, pos+2 | |
| def parse_up_ref_field(chunk: bytes, pos: int) -> Tuple[List[int], int]: | |
| # Common compact-object ref field: 3-byte IDs after a 4-byte mode | |
| if pos + 10 > len(chunk): | |
| return [], len(chunk) | |
| count = int.from_bytes(chunk[pos+2:pos+6], 'big') | |
| end = pos + 10 + 3*count | |
| if 0 <= count < 2048 and end <= len(chunk): | |
| refs = [int.from_bytes(chunk[pos+10+i*3:pos+13+i*3], 'big') for i in range(count)] | |
| return refs, end | |
| return [], pos+2 | |
| def parse_child_ref_field(chunk: bytes, pos: int) -> Tuple[List[int], int]: | |
| # Common compact-object child field: 4-byte IDs after a 4-byte mode | |
| if pos + 10 > len(chunk): | |
| return [], len(chunk) | |
| count = int.from_bytes(chunk[pos+2:pos+6], 'big') | |
| end = pos + 9 + 4*count | |
| if 0 <= count < 2048 and end <= len(chunk): | |
| refs = [int.from_bytes(chunk[pos+9+i*4:pos+13+i*4], 'big') for i in range(count)] | |
| return refs, end | |
| return [], pos+2 | |
| def parse_tail_ref4_field(chunk: bytes, pos: int) -> Tuple[List[int], int]: | |
| # Rich page-1 tail fields appear to store 4-byte IDs for both UP and CHILD refs | |
| if pos + 10 > len(chunk): | |
| return [], len(chunk) | |
| count = int.from_bytes(chunk[pos+2:pos+6], 'big') | |
| end = pos + 9 + 4*count | |
| if 0 <= count < 2048 and end <= len(chunk): | |
| refs = [int.from_bytes(chunk[pos+9+i*4:pos+13+i*4], 'big') for i in range(count)] | |
| return refs, end | |
| return [], pos+2 | |
| def parse_scalar_field(chunk: bytes, pos: int) -> Tuple[Optional[int], int]: | |
| if pos + 16 <= len(chunk): | |
| return int.from_bytes(chunk[pos+8:pos+16], 'big'), pos+16 | |
| return None, pos+2 | |
| PAGE_HEADER_LEN = len(STUB) | |
| OBJ_HEADER_LEN = 18 | |
| def parse_page(path: Path) -> List[Obj]: | |
| data = path.read_bytes() | |
| if len(data) <= PAGE_HEADER_LEN or data == STUB: | |
| return [] | |
| # Empirically, populated pages have a fixed 22-byte page header and the | |
| # first object begins immediately after it. Object bodies are variable | |
| # length; there is no evidence of a fixed record size. | |
| out: List[Obj] = [] | |
| pos = PAGE_HEADER_LEN | |
| while pos + OBJ_HEADER_LEN <= len(data): | |
| if data[pos:pos+2] != OBJ_TAG: | |
| # Resync conservatively rather than scanning the whole page up front. | |
| nxt = data.find(OBJ_TAG, pos + 1) | |
| if nxt < 0: | |
| break | |
| pos = nxt | |
| continue | |
| start = pos | |
| flags = int.from_bytes(data[pos+2:pos+6], 'big') | |
| obj_id = int.from_bytes(data[pos+6:pos+10], 'big') | |
| meta = int.from_bytes(data[pos+10:pos+18], 'big') | |
| if obj_id <= 0 or obj_id > 10_000_000: | |
| nxt = data.find(OBJ_TAG, pos + 2) | |
| if nxt < 0: | |
| break | |
| pos = nxt | |
| continue | |
| p = pos + OBJ_HEADER_LEN | |
| text: Optional[str] = None | |
| up: List[int] = [] | |
| children: List[int] = [] | |
| scalar: Optional[int] = None | |
| while p + 2 <= len(data): | |
| if data[p:p+2] == OBJ_TAG: | |
| break | |
| tag = data[p:p+2] | |
| if tag == STRING_TAG: | |
| s, p2 = parse_string_field(data, p) | |
| if p2 != p + 2: | |
| if s is not None: | |
| text = s | |
| p = p2 | |
| continue | |
| elif tag == UP_TAG: | |
| refs, p2 = parse_up_ref_field(data, p) | |
| if p2 != p + 2: | |
| up.extend(refs) | |
| p = p2 | |
| continue | |
| elif tag == CHILD_TAG: | |
| refs, p2 = parse_child_ref_field(data, p) | |
| if p2 != p + 2: | |
| children.extend(refs) | |
| p = p2 | |
| continue | |
| elif tag == SCALAR_TAG: | |
| val, p2 = parse_scalar_field(data, p) | |
| if p2 != p + 2: | |
| scalar = val | |
| p = p2 | |
| continue | |
| p += 1 | |
| if p <= start: | |
| break | |
| chunk = data[start:p] | |
| out.append(Obj(path.name, start, obj_id, flags, meta, text, up, children, scalar, p-start, chunk)) | |
| pos = p | |
| return out | |
| def decode_utf16be_text(blob: bytes) -> str: | |
| try: | |
| return blob.decode('utf-16-be', 'replace') | |
| except Exception: | |
| return '' | |
| def parse_wide_prop_from(chunk: bytes, key: bytes, start: int = 0) -> Optional[Tuple[str,int,int]]: | |
| idx = chunk.find(key, start) | |
| if idx < 0 or idx + 8 > len(chunk): | |
| return None | |
| ln = int.from_bytes(chunk[idx+4:idx+8], 'big') | |
| end = idx + 8 + (2*ln) | |
| if ln <= 0 or end > len(chunk): | |
| return None | |
| raw = chunk[idx+8:end] | |
| txt = decode_utf16be_text(raw).rstrip('\x00') | |
| return txt, idx, end | |
| def parse_all_wide_props(chunk: bytes) -> Dict[str, str]: | |
| props: Dict[str, str] = {} | |
| keys = [b'NAME', b'PATH', b'URL ', b'UUID', b'SMGR', b'DBID'] | |
| pos = 0 | |
| while pos < len(chunk) - 8: | |
| matched = False | |
| for k in keys: | |
| got = parse_wide_prop_from(chunk, k, pos) | |
| if got: | |
| txt, idx, end = got | |
| props[k.decode('ascii').strip()] = txt | |
| pos = end | |
| matched = True | |
| break | |
| if not matched: | |
| pos += 1 | |
| return props | |
| def parse_plausible_times(chunk: bytes, limit: int) -> List[str]: | |
| out: List[str] = [] | |
| end = min(limit, len(chunk) - 8) | |
| for pos in range(0, end + 1, 8): | |
| try: | |
| d = struct.unpack('>d', chunk[pos:pos+8])[0] | |
| except Exception: | |
| continue | |
| if 6.0e8 < d < 1.1e9: | |
| dt = EPOCH + datetime.timedelta(seconds=d) | |
| if 2010 <= dt.year <= 2038: | |
| out.append(dt.replace(microsecond=0).isoformat(sep=' ')) | |
| return out | |
| def pick_size_bytes(chunk: bytes, first_prop_idx: int) -> Optional[int]: | |
| for pos in (108, 104, 112, 116, 100, 120): | |
| if pos + 4 <= min(first_prop_idx, len(chunk)): | |
| v = int.from_bytes(chunk[pos:pos+4], 'big') | |
| if 1 <= v <= (1<<31): | |
| return v | |
| return None | |
| def human_size(n: Optional[int]) -> Optional[str]: | |
| if n is None: | |
| return None | |
| units = ['B','KB','MB','GB','TB'] | |
| x = float(n) | |
| i = 0 | |
| while x >= 1000 and i < len(units)-1: | |
| x /= 1000.0 | |
| i += 1 | |
| if i == 0: | |
| return f'{int(x)} {units[i]}' | |
| return f'{x:.1f} {units[i]}' | |
| def parse_tail_refs(chunk: bytes) -> Tuple[List[int], List[int]]: | |
| k = chunk.rfind(b'KSTP') | |
| if k < 0: | |
| return [], [] | |
| pos = k + 4 | |
| up=[]; children=[] | |
| while pos + 2 <= len(chunk): | |
| tag = chunk[pos:pos+2] | |
| if tag == UP_TAG: | |
| refs, pos2 = parse_tail_ref4_field(chunk, pos) | |
| if pos2 != pos+2: | |
| up.extend(refs); pos = pos2; continue | |
| elif tag == CHILD_TAG: | |
| refs, pos2 = parse_tail_ref4_field(chunk, pos) | |
| if pos2 != pos+2: | |
| children.extend(refs); pos = pos2; continue | |
| pos += 1 | |
| return up, children | |
| def parse_rich_records(page1: Path) -> List[Dict[str, Any]]: | |
| records: List[Dict[str, Any]] = [] | |
| for obj in parse_page(page1): | |
| props = parse_all_wide_props(obj.chunk) | |
| if not props: | |
| continue | |
| first_prop_idx = min((obj.chunk.find(k) for k in [b'NAME', b'PATH', b'URL ', b'UUID'] if obj.chunk.find(k) >= 0), default=len(obj.chunk)) | |
| times = parse_plausible_times(obj.chunk, first_prop_idx) | |
| tail_up, tail_children = parse_tail_refs(obj.chunk) | |
| rec = { | |
| 'obj_id': obj.obj_id, | |
| 'offset': obj.offset, | |
| 'flags': obj.flags, | |
| 'meta': obj.meta, | |
| 'up': tail_up or obj.up, | |
| 'children': tail_children or obj.children, | |
| 'props': props, | |
| } | |
| if times: | |
| labels = ['created', 'modified', 'added'] | |
| rec['times'] = {labels[i] if i < len(labels) else f'time_{i+1}': t for i, t in enumerate(times[:6])} | |
| size_bytes = pick_size_bytes(obj.chunk, first_prop_idx) if ('PATH' in props or 'URL' in props) else None | |
| if size_bytes is not None: | |
| rec['size_bytes'] = size_bytes | |
| rec['size_human'] = human_size(size_bytes) | |
| records.append(rec) | |
| return records | |
| def resolve_parent(start_id: int, rich_ids: Set[int], wrappers: Dict[int, Obj]) -> Optional[int]: | |
| seen=set(); cur=start_id | |
| while cur and cur not in seen: | |
| seen.add(cur) | |
| if cur == 1: | |
| return 1 | |
| if cur == TAGS_ROOT_WRAPPER_ID: | |
| return TAGS_ROOT_WRAPPER_ID | |
| if cur in rich_ids: | |
| return cur | |
| w = wrappers.get(cur) | |
| if not w or not w.up: | |
| return None | |
| cur = w.up[0] | |
| return None | |
| def resolve_to_rich(start_id: int, rich_ids: Set[int], wrappers: Dict[int, Obj]) -> Optional[int]: | |
| seen=set(); cur=start_id | |
| while cur and cur not in seen: | |
| seen.add(cur) | |
| if cur in rich_ids: | |
| return cur | |
| w = wrappers.get(cur) | |
| if not w or not w.up: | |
| return None | |
| cur = w.up[0] | |
| return None | |
| def summarize_db(db: Path) -> Dict[str, Any]: | |
| out: Dict[str, Any] = {'db': db.name, 'pages': {}, 'rich_records': [], 'files_noindex': []} | |
| all_objs: List[Obj] = [] | |
| for p in sorted(db.glob('DEVONthink-*.dtMeta')): | |
| data = p.read_bytes() | |
| info = {'size': len(data), 'stub': data == STUB} | |
| if data != STUB: | |
| objs = parse_page(p) | |
| all_objs.extend(objs) | |
| info['objects'] = [{ | |
| 'id': o.obj_id, 'offset': o.offset, 'flags': o.flags, 'meta': o.meta, | |
| 'text': o.text, 'up': o.up, 'children': o.children, 'scalar': o.scalar | |
| } for o in objs] | |
| out['pages'][p.name] = info | |
| page1 = db / 'DEVONthink-1.dtMeta' | |
| rich_records = parse_rich_records(page1) if page1.exists() else [] | |
| out['rich_records'] = rich_records | |
| atom_text = {o.obj_id:o.text for o in all_objs if o.text} | |
| rich: Dict[int, Dict[str, Any]] = {} | |
| for rec in rich_records: | |
| props = rec['props'] | |
| if 'NAME' not in props: | |
| continue | |
| name = props['NAME'] | |
| symbolic=[] | |
| for cid in rec.get('children', []): | |
| if cid in atom_text: | |
| symbolic.append(atom_text[cid]) | |
| rich[rec['obj_id']] = { | |
| 'obj_id': rec['obj_id'], | |
| 'name': name, | |
| 'path': props.get('PATH'), | |
| 'url': props.get('URL'), | |
| 'uuid': props.get('UUID'), | |
| 'kind': 'unknown', | |
| 'up': rec.get('up', []), | |
| 'children': rec.get('children', []), | |
| 'symbolic': symbolic, | |
| 'times': rec.get('times', {}), | |
| 'size_bytes': rec.get('size_bytes'), | |
| 'size_human': rec.get('size_human'), | |
| 'fields': props, | |
| 'tags': [], | |
| } | |
| rich_ids = set(rich) | |
| wrappers = {o.obj_id:o for o in all_objs if o.page=='DEVONthink-1.dtMeta' and o.obj_id not in rich_ids} | |
| parent_of: Dict[int, Optional[int]] = {} | |
| for rid, r in rich.items(): | |
| parent = None | |
| if r['up']: | |
| parent = resolve_parent(r['up'][0], rich_ids, wrappers) | |
| parent_of[rid] = parent | |
| for rid, r in rich.items(): | |
| name = r['name'] | |
| parent = parent_of[rid] | |
| if ('PATH' in r['fields']) or ('URL' in r['fields']): | |
| r['kind'] = 'file' | |
| elif name in BUILTIN_SMART: | |
| r['kind'] = 'smart_group' | |
| elif parent == TAGS_ROOT_WRAPPER_ID: | |
| r['kind'] = 'tag' | |
| else: | |
| r['kind'] = 'group' | |
| for rid, r in rich.items(): | |
| if r['kind'] != 'file': | |
| continue | |
| tags: List[str] = [] | |
| for ref in r.get('up', [])[1:]: | |
| tr = resolve_to_rich(ref, rich_ids, wrappers) | |
| if tr and tr in rich and rich[tr]['kind'] == 'tag': | |
| tags.append(rich[tr]['name']) | |
| # also support direct child/tag refs if they ever appear that way | |
| for ref in r.get('children', []): | |
| tr = resolve_to_rich(ref, rich_ids, wrappers) | |
| if tr and tr in rich and rich[tr]['kind'] == 'tag': | |
| tags.append(rich[tr]['name']) | |
| seen=set(); dedup=[] | |
| for t in tags: | |
| if t not in seen: | |
| seen.add(t); dedup.append(t) | |
| r['tags'] = dedup | |
| children_of = {rid: [] for rid in rich} | |
| root_children = [] | |
| tag_ids = [] | |
| for rid, parent in parent_of.items(): | |
| if rich[rid]['kind'] == 'tag': | |
| tag_ids.append(rid) | |
| continue | |
| if parent == 1 or parent is None or parent == TAGS_ROOT_WRAPPER_ID: | |
| root_children.append(rid) | |
| elif parent in children_of: | |
| children_of[parent].append(rid) | |
| tag_to_items: Dict[int, List[int]] = {tid: [] for tid in tag_ids} | |
| for rid, r in rich.items(): | |
| if r['kind'] != 'file': | |
| continue | |
| for tag_name in r.get('tags', []): | |
| for tid in tag_ids: | |
| if rich[tid]['name'] == tag_name: | |
| tag_to_items[tid].append(rid) | |
| for p in sorted((db / 'Files.noindex').rglob('*')) if (db / 'Files.noindex').exists() else []: | |
| if p.is_file(): | |
| out['files_noindex'].append(str(p.relative_to(db))) | |
| out['ui'] = { | |
| 'rich': rich, | |
| 'children_of': children_of, | |
| 'root_children': sorted(set(root_children)), | |
| 'tag_ids': sorted(tag_ids, key=lambda tid: rich[tid]['name'].lower()), | |
| 'tag_to_items': {str(k): sorted(v, key=lambda rid: rich[rid]['name'].lower()) for k,v in tag_to_items.items()}, | |
| } | |
| return out | |
| def render_tree(summary: Dict[str, Any]) -> str: | |
| ui = summary.get('ui', {}) | |
| rich = ui.get('rich', {}) | |
| children_of = ui.get('children_of', {}) | |
| root_children = ui.get('root_children', []) | |
| tag_ids = ui.get('tag_ids', []) | |
| tag_to_items = {int(k): v for k,v in ui.get('tag_to_items', {}).items()} | |
| lines=[f"=== {summary['db']} ===", 'Root/'] | |
| def walk(rid: int, indent: int): | |
| r = rich[rid] | |
| if r['kind'] in ('smart_group', 'tag'): | |
| return | |
| nm = r['name'] | |
| suffix = '/' if r['kind']=='group' else '' | |
| detail = '' | |
| if r['kind'] == 'file': | |
| extras = [] | |
| if r.get('tags'): extras.append('tags=' + ', '.join(r['tags'])) | |
| if r.get('url'): extras.append(f"URL={r['url']}") | |
| if r.get('size_human'): extras.append(f"size={r['size_human']}") | |
| if r.get('times', {}).get('created'): extras.append(f"created={r['times']['created']}") | |
| if extras: | |
| detail = ' [' + '; '.join(extras) + ']' | |
| lines.append(' '*indent + f'{nm}{suffix}{detail}') | |
| for child in sorted(children_of.get(rid, []), key=lambda x:(rich[x]['kind']!='group', rich[x]['name'].lower())): | |
| walk(child, indent+1) | |
| for rid in sorted(root_children, key=lambda x:(rich[x]['kind']!='group', rich[x]['name'].lower())): | |
| walk(rid, 1) | |
| lines.append('') | |
| lines.append('Tags/') | |
| for tid in tag_ids: | |
| tag = rich[tid] | |
| lines.append(f" {tag['name']}/") | |
| for rid in tag_to_items.get(tid, []): | |
| lines.append(f" {rich[rid]['name']}") | |
| lines.append('') | |
| lines.append('Files.noindex:') | |
| for p in summary.get('files_noindex', []): | |
| lines.append(f' - {p}') | |
| lines.append('') | |
| lines.append('Items:') | |
| parent_of = {} | |
| for pid, kids in children_of.items(): | |
| for kid in kids: | |
| parent_of[kid] = pid | |
| for rid in sorted(rich, key=lambda x:(rich[x]['kind'], rich[x]['name'].lower())): | |
| r = rich[rid] | |
| if r['kind'] == 'smart_group': | |
| continue | |
| if r['kind'] == 'tag': | |
| parent = 'Tags' | |
| else: | |
| parent = 'Root' if rid in root_children else (rich[parent_of[rid]]['name'] if rid in parent_of else None) | |
| bits = [f"[{rid}] {r['kind']}: {r['name']}"] | |
| if r.get('path'): bits.append(f"PATH={r['path']}") | |
| if r.get('url'): bits.append(f"URL={r['url']}") | |
| if r.get('uuid'): bits.append(f"UUID={r['uuid']}") | |
| if parent: bits.append(f"parent={parent}") | |
| if r.get('tags'): bits.append('tags=' + ','.join(r['tags'])) | |
| if r.get('times'): bits.extend(f"{k}={v}" for k,v in r['times'].items()) | |
| if r.get('size_bytes') is not None: bits.append(f"size={r['size_bytes']} ({r['size_human']})") | |
| if r.get('symbolic'): bits.append('symbolic=' + ','.join(r['symbolic'])) | |
| lines.append(' - ' + ' | '.join(bits)) | |
| return '\n'.join(lines) | |
| def print_db_summary(summary: Dict[str, Any], full: bool=False): | |
| print(render_tree(summary)) | |
| if full: | |
| print('') | |
| populated = [k for k,v in summary['pages'].items() if not v['stub']] | |
| print('Populated pages:', ', '.join(populated) if populated else '(none)') | |
| for page,info in summary['pages'].items(): | |
| if info['stub']: | |
| continue | |
| print(f'Page {page}:') | |
| for obj in info.get('objects', []): | |
| print(f" obj {obj['id']}: text={obj['text']!r} up={obj['up']} children={obj['children']} flags={obj['flags']} meta={obj['meta']}") | |
| def main(): | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument('path', type=Path) | |
| ap.add_argument('--json', action='store_true') | |
| ap.add_argument('--full', action='store_true') | |
| args = ap.parse_args() | |
| root, td = maybe_extract(args.path) | |
| try: | |
| dbs = find_dbs(root) | |
| if not dbs and root.is_dir() and root.name.endswith('.dtBase2'): | |
| dbs = [root] | |
| summaries = [summarize_db(db) for db in dbs] | |
| if args.json: | |
| print(json.dumps(summaries if len(summaries) != 1 else summaries[0], indent=2)) | |
| else: | |
| for i, s in enumerate(summaries): | |
| if i: | |
| print() | |
| print_db_summary(s, full=args.full) | |
| finally: | |
| if td is not None: | |
| td.cleanup() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment