Created
April 12, 2026 03:15
-
-
Save joshterrill/89d3f5bc8b235f5cbf31ce7610d78a73 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| LLDB helper to trace skencv1 anchor functions in SummarizationKit. | |
| Usage: | |
| (lldb) command script import /path/to/text-to-summary/lldb_skencv1_trace.py | |
| (lldb) sktrace_init # auto-detects SummarizationKit load base | |
| (lldb) # or: sktrace_init 0x287000000 | |
| (lldb) c | |
| Optional: | |
| (lldb) sktrace_show_globals | |
| (lldb) sktrace_set_dump_dir /tmp/sktrace-dumps | |
| (lldb) sktrace_set_max_dumps 128 | |
| """ | |
| import time | |
| import struct | |
| import os | |
| import lldb | |
| GHIDRA_IMAGE_BASE = 0x268BC0000 | |
| ANCHOR_FUNCS = { | |
| "main_handler": 0x268CFDAF8, # FUN_268cfdaf8 | |
| "key_init": 0x268CFE2BC, # FUN_268cfe2bc | |
| "key_source": 0x268CFEF18, # FUN_268cfef18 | |
| } | |
| KEY_GLOBALS = { | |
| "key_g0": 0x29946ABF8, | |
| "key_g1": 0x29946AC00, | |
| "key_g2": 0x29946AC08, | |
| "key_g3": 0x29946AC10, | |
| } | |
| MAX_HEX_DUMP = 96 | |
| MAX_PTR_DUMP = 64 | |
| _STATE = { | |
| "slide": None, | |
| "module_base": None, | |
| "installed": False, | |
| "hit_counts": {}, | |
| "last_main_ret_sig": None, | |
| "last_key_init_ret_sig": None, | |
| "last_key_source_ret_sig": None, | |
| "last_globals_hex": {}, | |
| "main_thread_seen_at": {}, | |
| "context_window_s": 0.4, | |
| "main_entry_count": 0, | |
| "main_return_count": 0, | |
| "input_dump_count": 0, | |
| "output_dump_count": 0, | |
| "dump_dir": "/tmp/sktrace-dumps", | |
| "max_auto_dumps": 32, | |
| "key_init_log_limit": 4, | |
| "key_source_log_limit": 4, | |
| "key_init_entry_seen": 0, | |
| "key_source_entry_seen": 0, | |
| } | |
| def _log(msg): | |
| print("[sktrace] {}".format(msg)) | |
| def _read_mem(process, addr, size): | |
| if not addr: | |
| return None | |
| err = lldb.SBError() | |
| data = process.ReadMemory(addr, size, err) | |
| if not err.Success(): | |
| return None | |
| return data | |
| def _untag_ptr(ptr): | |
| # arm64e user pointers in this target frequently carry top-byte tags (for example 0x40...) | |
| # TBI means masking to 56 bits is usually the right canonical form for LLDB memory reads. | |
| return ptr & 0x00FFFFFFFFFFFFFF | |
| def _u64_le(buf, off): | |
| if buf is None or off + 8 > len(buf): | |
| return 0 | |
| return struct.unpack_from("<Q", buf, off)[0] | |
| def _hexdump(data, width=16): | |
| if not data: | |
| return "<empty>" | |
| if isinstance(data, str): | |
| data = data.encode("latin1", errors="ignore") | |
| lines = [] | |
| for i in range(0, len(data), width): | |
| chunk = data[i : i + width] | |
| hx = " ".join("{:02x}".format(b) for b in chunk) | |
| lines.append(hx) | |
| return "\n".join(lines) | |
| def _reg_u64(frame, reg_name): | |
| reg = frame.FindRegister(reg_name) | |
| if not reg.IsValid(): | |
| return 0 | |
| return reg.GetValueAsUnsigned() | |
| def _dump_ptr(process, label, ptr, size=MAX_PTR_DUMP): | |
| if ptr == 0: | |
| _log("{}: 0x0".format(label)) | |
| return | |
| canonical = _untag_ptr(ptr) | |
| blob = _read_mem(process, canonical, size) | |
| if blob is None: | |
| _log("{}: 0x{:x} (canonical 0x{:x}, unreadable)".format(label, ptr, canonical)) | |
| return | |
| blob = blob[:MAX_HEX_DUMP] | |
| _log("{}: 0x{:x} (canonical 0x{:x})\n{}".format(label, ptr, canonical, _hexdump(blob))) | |
| def _ascii_preview(blob): | |
| if not blob: | |
| return "" | |
| out = [] | |
| for b in blob: | |
| if 32 <= b <= 126: | |
| out.append(chr(b)) | |
| else: | |
| out.append(".") | |
| return "".join(out) | |
| def _extract_swift_like_buffer(process, obj_ptr): | |
| if obj_ptr == 0: | |
| return None | |
| obj = _untag_ptr(obj_ptr) | |
| header = _read_mem(process, obj, 0x30) | |
| if header is None or len(header) < 0x28: | |
| return None | |
| cand_ptr = _u64_le(header, 0x10) | |
| cand_len = _u64_le(header, 0x18) | |
| cand_cap = _u64_le(header, 0x20) | |
| if cand_ptr == 0 or cand_len == 0: | |
| return None | |
| if cand_len > 8 * 1024 * 1024: | |
| return None | |
| buf_ptr = _untag_ptr(cand_ptr) | |
| read_len = min(cand_len, 256 * 1024) | |
| data = _read_mem(process, buf_ptr, read_len) | |
| if data is None: | |
| return None | |
| preview = data[:64] | |
| return { | |
| "obj_ptr": obj_ptr, | |
| "ptr": buf_ptr, | |
| "len": cand_len, | |
| "cap": cand_cap, | |
| "preview": preview, | |
| "data": data, | |
| } | |
| def _maybe_dump_buffer(kind, info): | |
| if info is None: | |
| return None | |
| if kind == "input": | |
| _STATE["input_dump_count"] += 1 | |
| idx = _STATE["input_dump_count"] | |
| else: | |
| _STATE["output_dump_count"] += 1 | |
| idx = _STATE["output_dump_count"] | |
| if idx > _STATE["max_auto_dumps"]: | |
| return None | |
| os.makedirs(_STATE["dump_dir"], exist_ok=True) | |
| path = os.path.join( | |
| _STATE["dump_dir"], | |
| "{}_{:03d}_len{}.bin".format(kind, idx, info["len"]), | |
| ) | |
| with open(path, "wb") as f: | |
| f.write(info["data"]) | |
| return path | |
| def _dump_swift_like_buffer(process, label, obj_ptr): | |
| info = _extract_swift_like_buffer(process, obj_ptr) | |
| if info is None: | |
| return None | |
| head8 = info["preview"][:8] | |
| ascii_preview = _ascii_preview(info["preview"][:32]) | |
| _log( | |
| "{} decoded-buffer obj=0x{:x} ptr=0x{:x} len={} cap={} head8={} ascii='{}'".format( | |
| label, | |
| info["obj_ptr"], | |
| info["ptr"], | |
| info["len"], | |
| info["cap"], | |
| head8.hex(), | |
| ascii_preview, | |
| ) | |
| ) | |
| _log("{} decoded-buffer hex:\n{}".format(label, _hexdump(info["preview"]))) | |
| return info | |
| def _inc_hit(name): | |
| current = _STATE["hit_counts"].get(name, 0) + 1 | |
| _STATE["hit_counts"][name] = current | |
| return current | |
| def _remember_main_thread(thread_id): | |
| _STATE["main_thread_seen_at"][thread_id] = time.time() | |
| def _is_recent_main_thread(thread_id): | |
| t = _STATE["main_thread_seen_at"].get(thread_id) | |
| if t is None: | |
| return False | |
| return (time.time() - t) <= _STATE["context_window_s"] | |
| def _runtime_addr(static_addr): | |
| slide = _STATE.get("slide") | |
| if slide is None: | |
| return None | |
| return static_addr + slide | |
| def _snapshot_key_globals(process): | |
| slide = _STATE.get("slide") | |
| if slide is None: | |
| _log("slide not initialized; cannot snapshot key globals") | |
| return | |
| for label, static_addr in KEY_GLOBALS.items(): | |
| runtime_addr = static_addr + slide | |
| blob = _read_mem(process, runtime_addr, 32) | |
| if blob is None: | |
| _log("{} @ 0x{:x}: unreadable".format(label, runtime_addr)) | |
| continue | |
| hx = blob.hex() | |
| _log("{} @ 0x{:x}: {}".format(label, runtime_addr, hx)) | |
| if _STATE["last_globals_hex"].get(label) == hx: | |
| continue | |
| _STATE["last_globals_hex"][label] = hx | |
| _dump_key_global_candidates(process, label, blob) | |
| def _dump_key_global_candidates(process, label, blob): | |
| if blob is None or len(blob) < 32: | |
| return | |
| q = [struct.unpack_from("<Q", blob, i * 8)[0] for i in range(4)] | |
| _log( | |
| "{} qwords: [{}]".format( | |
| label, ", ".join("0x{:x}".format(x) for x in q) | |
| ) | |
| ) | |
| seen = set() | |
| for idx, raw in enumerate(q): | |
| if raw == 0: | |
| continue | |
| ptr = _untag_ptr(raw) | |
| if ptr in seen: | |
| continue | |
| seen.add(ptr) | |
| if ptr < 0x1000: | |
| continue | |
| chunk = _read_mem(process, ptr, 0x40) | |
| if chunk is None: | |
| _log("{} q{} ptr=0x{:x} unreadable".format(label, idx, ptr)) | |
| continue | |
| _log("{} q{} ptr=0x{:x} hdr:\n{}".format(label, idx, ptr, _hexdump(chunk[:0x40]))) | |
| _dump_swift_like_buffer(process, "{} q{}".format(label, idx), raw) | |
| def _set_temp_return_bp(target, return_addr, callback_name): | |
| bp = target.BreakpointCreateByAddress(return_addr) | |
| bp.SetOneShot(True) | |
| bp.SetAutoContinue(True) | |
| bp.SetScriptCallbackFunction("{}.{}".format(__name__, callback_name)) | |
| return bp | |
| def _common_entry_trace(frame, label): | |
| target = frame.GetThread().GetProcess().GetTarget() | |
| pc = frame.GetPCAddress().GetLoadAddress(target) | |
| tid = frame.GetThread().GetThreadID() | |
| c = _inc_hit(label) | |
| x0 = _reg_u64(frame, "x0") | |
| x1 = _reg_u64(frame, "x1") | |
| x2 = _reg_u64(frame, "x2") | |
| x3 = _reg_u64(frame, "x3") | |
| _log( | |
| "{} entry #{} t={} pc=0x{:x} x0=0x{:x} x1=0x{:x} x2=0x{:x} x3=0x{:x}".format( | |
| label, c, tid, pc, x0, x1, x2, x3 | |
| ) | |
| ) | |
| if label == "FUN_268cfdaf8": | |
| _remember_main_thread(tid) | |
| _STATE["main_entry_count"] += 1 | |
| if x0 >> 32: | |
| _log("{} x0_hi32(candidate_len)={}".format(label, x0 >> 32)) | |
| process = frame.GetThread().GetProcess() | |
| _dump_ptr(process, "{} x0".format(label), x0) | |
| _dump_ptr(process, "{} x1".format(label), x1) | |
| info = _dump_swift_like_buffer(process, "{} x1".format(label), x1) | |
| if label == "FUN_268cfdaf8" and info is not None: | |
| if info["preview"][:7] == b"skencv1": | |
| p = _maybe_dump_buffer("input", info) | |
| if p: | |
| _log("input buffer dumped: {}".format(p)) | |
| def main_handler_ret_cb(frame, bp_loc, _dict): | |
| process = frame.GetThread().GetProcess() | |
| target = process.GetTarget() | |
| pc = frame.GetPCAddress().GetLoadAddress(target) | |
| x0 = _reg_u64(frame, "x0") | |
| x1 = _reg_u64(frame, "x1") | |
| sig = (pc, x0, x1) | |
| if _STATE.get("last_main_ret_sig") == sig: | |
| return False | |
| _STATE["last_main_ret_sig"] = sig | |
| _STATE["main_return_count"] += 1 | |
| _log("main_handler return pc=0x{:x} x0=0x{:x} x1=0x{:x}".format(pc, x0, x1)) | |
| if x0 >> 32: | |
| _log("main_handler return x0_hi32(candidate_len)={}".format(x0 >> 32)) | |
| _dump_ptr(process, "main_handler ret x0", x0) | |
| _dump_ptr(process, "main_handler ret x1", x1) | |
| out_info = _dump_swift_like_buffer(process, "main_handler ret x1", x1) | |
| if out_info is not None: | |
| p = _maybe_dump_buffer("output", out_info) | |
| if p: | |
| _log("output buffer dumped: {}".format(p)) | |
| _snapshot_key_globals(process) | |
| return False | |
| def key_init_ret_cb(frame, bp_loc, _dict): | |
| process = frame.GetThread().GetProcess() | |
| target = process.GetTarget() | |
| pc = frame.GetPCAddress().GetLoadAddress(target) | |
| x0 = _reg_u64(frame, "x0") | |
| x1 = _reg_u64(frame, "x1") | |
| sig = (pc, x0, x1) | |
| if _STATE.get("last_key_init_ret_sig") == sig: | |
| return False | |
| _STATE["last_key_init_ret_sig"] = sig | |
| _log("key_init return pc=0x{:x} x0=0x{:x} x1=0x{:x}".format(pc, x0, x1)) | |
| _dump_ptr(process, "key_init ret x0", x0) | |
| _dump_ptr(process, "key_init ret x1", x1) | |
| _dump_swift_like_buffer(process, "key_init ret x1", x1) | |
| _snapshot_key_globals(process) | |
| return False | |
| def key_source_ret_cb(frame, bp_loc, _dict): | |
| process = frame.GetThread().GetProcess() | |
| target = process.GetTarget() | |
| pc = frame.GetPCAddress().GetLoadAddress(target) | |
| x0 = _reg_u64(frame, "x0") | |
| x1 = _reg_u64(frame, "x1") | |
| sig = (pc, x0, x1) | |
| if _STATE.get("last_key_source_ret_sig") == sig: | |
| return False | |
| _STATE["last_key_source_ret_sig"] = sig | |
| _log("key_source return pc=0x{:x} x0=0x{:x} x1=0x{:x}".format(pc, x0, x1)) | |
| _dump_ptr(process, "key_source ret x0", x0) | |
| _dump_ptr(process, "key_source ret x1", x1) | |
| _dump_swift_like_buffer(process, "key_source ret x1", x1) | |
| _snapshot_key_globals(process) | |
| return False | |
| def main_handler_entry_cb(frame, bp_loc, _dict): | |
| _common_entry_trace(frame, "FUN_268cfdaf8") | |
| target = frame.GetThread().GetProcess().GetTarget() | |
| lr = _reg_u64(frame, "x30") | |
| if lr: | |
| _set_temp_return_bp(target, lr, "main_handler_ret_cb") | |
| return False | |
| def key_init_entry_cb(frame, bp_loc, _dict): | |
| tid = frame.GetThread().GetThreadID() | |
| if not _is_recent_main_thread(tid): | |
| return False | |
| _STATE["key_init_entry_seen"] += 1 | |
| if _STATE["key_init_entry_seen"] > _STATE["key_init_log_limit"]: | |
| return False | |
| _common_entry_trace(frame, "FUN_268cfe2bc") | |
| target = frame.GetThread().GetProcess().GetTarget() | |
| lr = _reg_u64(frame, "x30") | |
| if lr: | |
| _set_temp_return_bp(target, lr, "key_init_ret_cb") | |
| return False | |
| def key_source_entry_cb(frame, bp_loc, _dict): | |
| tid = frame.GetThread().GetThreadID() | |
| if not _is_recent_main_thread(tid): | |
| return False | |
| _STATE["key_source_entry_seen"] += 1 | |
| if _STATE["key_source_entry_seen"] > _STATE["key_source_log_limit"]: | |
| return False | |
| _common_entry_trace(frame, "FUN_268cfef18") | |
| target = frame.GetThread().GetProcess().GetTarget() | |
| lr = _reg_u64(frame, "x30") | |
| if lr: | |
| _set_temp_return_bp(target, lr, "key_source_ret_cb") | |
| return False | |
| def _detect_module_base(target, module_name_substr="SummarizationKit"): | |
| n = target.GetNumModules() | |
| for i in range(n): | |
| module = target.GetModuleAtIndex(i) | |
| if not module.IsValid(): | |
| continue | |
| filename = module.GetFileSpec().GetFilename() | |
| if not filename: | |
| continue | |
| if module_name_substr in filename: | |
| addr = module.GetObjectFileHeaderAddress() | |
| if addr.IsValid(): | |
| return addr.GetLoadAddress(target) | |
| return None | |
| def _install_bp(target, runtime_addr, cb_name, label): | |
| bp = target.BreakpointCreateByAddress(runtime_addr) | |
| bp.SetAutoContinue(True) | |
| bp.SetScriptCallbackFunction("{}.{}".format(__name__, cb_name)) | |
| _log("{} breakpoint #{} @ 0x{:x}".format(label, bp.GetID(), runtime_addr)) | |
| return bp | |
| def sktrace_init(debugger, command, exe_ctx, result, _dict): | |
| target = debugger.GetSelectedTarget() | |
| if not target.IsValid(): | |
| result.PutCString("No valid target.") | |
| return | |
| arg = command.strip() | |
| module_base = None | |
| if arg: | |
| try: | |
| module_base = int(arg, 16) | |
| except ValueError: | |
| result.PutCString("Invalid address: {}".format(arg)) | |
| return | |
| else: | |
| module_base = _detect_module_base(target) | |
| if module_base is None: | |
| result.PutCString( | |
| "Unable to auto-detect SummarizationKit base. Pass one manually: sktrace_init 0x..." | |
| ) | |
| return | |
| slide = module_base - GHIDRA_IMAGE_BASE | |
| _STATE["slide"] = slide | |
| _STATE["module_base"] = module_base | |
| _STATE["hit_counts"] = {} | |
| _STATE["last_main_ret_sig"] = None | |
| _STATE["last_key_init_ret_sig"] = None | |
| _STATE["last_key_source_ret_sig"] = None | |
| _STATE["last_globals_hex"] = {} | |
| _STATE["main_thread_seen_at"] = {} | |
| _STATE["main_entry_count"] = 0 | |
| _STATE["main_return_count"] = 0 | |
| _STATE["input_dump_count"] = 0 | |
| _STATE["output_dump_count"] = 0 | |
| _STATE["key_init_entry_seen"] = 0 | |
| _STATE["key_source_entry_seen"] = 0 | |
| _log( | |
| "module_base=0x{:x} ghidra_base=0x{:x} slide=0x{:x}".format( | |
| module_base, GHIDRA_IMAGE_BASE, slide | |
| ) | |
| ) | |
| _install_bp( | |
| target, | |
| _runtime_addr(ANCHOR_FUNCS["main_handler"]), | |
| "main_handler_entry_cb", | |
| "FUN_268cfdaf8", | |
| ) | |
| _install_bp( | |
| target, | |
| _runtime_addr(ANCHOR_FUNCS["key_init"]), | |
| "key_init_entry_cb", | |
| "FUN_268cfe2bc", | |
| ) | |
| _install_bp( | |
| target, | |
| _runtime_addr(ANCHOR_FUNCS["key_source"]), | |
| "key_source_entry_cb", | |
| "FUN_268cfef18", | |
| ) | |
| process = target.GetProcess() | |
| if process and process.IsValid(): | |
| _snapshot_key_globals(process) | |
| _STATE["installed"] = True | |
| result.PutCString( | |
| "sktrace initialized at {}. Dumps dir: {}. Continue with 'c' and trigger config load.".format( | |
| time.strftime("%Y-%m-%d %H:%M:%S"), _STATE["dump_dir"] | |
| ) | |
| ) | |
| def sktrace_show_globals(debugger, command, exe_ctx, result, _dict): | |
| target = debugger.GetSelectedTarget() | |
| if not target.IsValid(): | |
| result.PutCString("No valid target.") | |
| return | |
| process = target.GetProcess() | |
| if not process.IsValid(): | |
| result.PutCString("No running process.") | |
| return | |
| if _STATE.get("slide") is None: | |
| result.PutCString("Run sktrace_init first.") | |
| return | |
| _snapshot_key_globals(process) | |
| result.PutCString("Done.") | |
| def sktrace_set_dump_dir(debugger, command, exe_ctx, result, _dict): | |
| dump_dir = command.strip() | |
| if not dump_dir: | |
| result.PutCString("Usage: sktrace_set_dump_dir /absolute/or/relative/path") | |
| return | |
| _STATE["dump_dir"] = os.path.abspath(dump_dir) | |
| os.makedirs(_STATE["dump_dir"], exist_ok=True) | |
| result.PutCString("dump_dir set to: {}".format(_STATE["dump_dir"])) | |
| def sktrace_set_max_dumps(debugger, command, exe_ctx, result, _dict): | |
| raw = command.strip() | |
| if not raw: | |
| result.PutCString("Usage: sktrace_set_max_dumps <positive_int>") | |
| return | |
| try: | |
| val = int(raw) | |
| except ValueError: | |
| result.PutCString("Invalid integer: {}".format(raw)) | |
| return | |
| if val <= 0: | |
| result.PutCString("max dumps must be > 0") | |
| return | |
| _STATE["max_auto_dumps"] = val | |
| result.PutCString("max_auto_dumps set to: {}".format(val)) | |
| def __lldb_init_module(debugger, _dict): | |
| debugger.HandleCommand( | |
| "command script add -f {}.sktrace_init sktrace_init".format(__name__) | |
| ) | |
| debugger.HandleCommand( | |
| "command script add -f {}.sktrace_show_globals sktrace_show_globals".format(__name__) | |
| ) | |
| debugger.HandleCommand( | |
| "command script add -f {}.sktrace_set_dump_dir sktrace_set_dump_dir".format(__name__) | |
| ) | |
| debugger.HandleCommand( | |
| "command script add -f {}.sktrace_set_max_dumps sktrace_set_max_dumps".format(__name__) | |
| ) | |
| _log( | |
| "Loaded. Commands: sktrace_init [module_base_hex], sktrace_show_globals, " | |
| "sktrace_set_dump_dir <path>, sktrace_set_max_dumps <n>" | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment