|
# Copyright (c) Meta Platforms, Inc. and affiliates. |
|
# This source code is licensed under the MIT license |
|
# found in the LICENSE file. |
|
|
|
# LLDB Python script for Android native debugging |
|
|
|
import argparse |
|
import ctypes |
|
import datetime |
|
import mmap |
|
import os |
|
import shlex |
|
import socket |
|
import struct |
|
import subprocess |
|
import threading |
|
import time |
|
import traceback |
|
import uuid |
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
from dataclasses import dataclass |
|
from enum import Enum, IntEnum |
|
from pathlib import Path |
|
from tempfile import NamedTemporaryFile, TemporaryDirectory |
|
from types import TracebackType |
|
from typing import Dict, List, Optional, Type |
|
|
|
import lldb |
|
|
|
LLDB_SERVER_FILE = "lldb-server" |
|
LLDB_SERVER_STAGING_BASE_PATH = "/data/local/tmp/lldb" |
|
LLDB_SERVER_PKG_BASE_PATH = "lldb" |
|
LLDB_SERVER_CHECK_WAIT_IN_SEC = 10 |
|
DEFAULT_WAIT_PROCESS_IN_SEC = 5 |
|
RESUME_JVM_TIMEOUT_IN_SEC = 100000 |
|
CHECK_SLEEP_IN_SEC = 1 |
|
BOLDLINE = "\n" + "=" * 80 + "\n" |
|
LINE = "\n" + "-" * 80 + "\n" |
|
|
|
|
|
class LldbConnection(Enum): |
|
TCP = 0 |
|
UNIX = 1 |
|
|
|
|
|
@dataclass |
|
class DebuggeeProcess: |
|
process_id: str |
|
|
|
|
|
@dataclass |
|
class LldbServerProcess: |
|
lldb_server_id: str |
|
lldb_server_endpoint: str |
|
lldb_server_cwd: Optional[str] |
|
lldb_server_log: str |
|
process_id: str |
|
process_group_id: str |
|
|
|
|
|
@dataclass |
|
class LldbServerContext: |
|
process: LldbServerProcess |
|
forwards: List[str] |
|
unix_sockets: List[str] |
|
|
|
|
|
class ArgParser(argparse.ArgumentParser): |
|
def error(self, message): |
|
raise RuntimeError(message) |
|
|
|
|
|
@dataclass |
|
class ElfInfo: |
|
build_id: Optional[str] |
|
has_debug: bool |
|
|
|
|
|
class ElfParser: |
|
class EClass(IntEnum): |
|
Invalid = 0 |
|
E32 = 1 |
|
E64 = 2 |
|
|
|
OffsetType = Dict[EClass, int] |
|
|
|
EI_MAGIC = {EClass.Invalid: 0x0} |
|
EI_MAGIC_BYTES = b"\x7fELF" |
|
EI_CLASS = {EClass.Invalid: 0x4} |
|
|
|
E_SHOFF = {EClass.E32: 0x20, EClass.E64: 0x28} |
|
E_SHNUM = {EClass.E32: 0x30, EClass.E64: 0x3C} |
|
E_SHSTRNDX = {EClass.E32: 0x32, EClass.E64: 0x3E} |
|
|
|
SH_HEADER_SIZE = {EClass.E32: 0x28, EClass.E64: 0x40} |
|
SH_NAME = {EClass.E32: 0x0, EClass.E64: 0x0} |
|
SH_TYPE = {EClass.E32: 0x4, EClass.E64: 0x4} |
|
SH_OFFSET = {EClass.E32: 0x10, EClass.E64: 0x18} |
|
SH_SIZE = {EClass.E32: 0x14, EClass.E64: 0x20} |
|
SHT_NOTE = 0x7 |
|
|
|
N_NAMSZ = {EClass.E32: 0x0, EClass.E64: 0x0} |
|
N_DESCSZ = {EClass.E32: 0x4, EClass.E64: 0x4} |
|
N_TYPE = {EClass.E32: 0x8, EClass.E64: 0x8} |
|
N_NAME = {EClass.E32: 0xC, EClass.E64: 0xC} |
|
NT_GNU_BUILD_ID = 0x3 |
|
NT_GNU_BUILD_ID_NAME = b"GNU\0" |
|
|
|
def __init__(self, path: Path): |
|
self.path = path |
|
self.e_class = self.EClass.Invalid |
|
|
|
def __enter__(self) -> ElfInfo: |
|
self.file = open(self.path, "r") |
|
self.mm = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ) |
|
return self.parse() |
|
|
|
def __exit__( |
|
self, |
|
exctype: Optional[Type[BaseException]], |
|
excinst: Optional[BaseException], |
|
exctb: Optional[TracebackType], |
|
) -> None: |
|
if self.mm: |
|
self.mm.close() |
|
if self.file: |
|
self.file.close() |
|
|
|
def parse(self) -> ElfInfo: |
|
# Parse ELF file header |
|
if ( |
|
self._bytes(0, self.EI_MAGIC, len(self.EI_MAGIC_BYTES)) |
|
!= self.EI_MAGIC_BYTES |
|
): |
|
raise RuntimeError("file is not ELF") |
|
|
|
self.e_class = self._byte(0, self.EI_CLASS) |
|
if self.e_class not in [self.EClass.E32, self.EClass.E64]: |
|
raise RuntimeError(f"invalid ELF class: {self.e_class}") |
|
|
|
e_shoff = self._offset(0, self.E_SHOFF) |
|
e_shnum = self._uint16(0, self.E_SHNUM) |
|
e_shstrndx = self._uint16(0, self.E_SHSTRNDX) |
|
|
|
# Get section names. |
|
sh_str = e_shoff + self.SH_HEADER_SIZE[self.e_class] * e_shstrndx |
|
str_offset = self._offset(sh_str, self.SH_OFFSET) |
|
str_end = str_offset + self._size(sh_str, self.SH_SIZE) |
|
|
|
# Iterate sections to find GNU BuildID and debug sections. |
|
build_id = None |
|
has_debug = False |
|
sh_header_offset = e_shoff |
|
for _ in range(e_shnum): |
|
sh_name = self._uint32(sh_header_offset, self.SH_NAME) |
|
name = self._string(str_offset + sh_name, str_end) |
|
if name.startswith(b".debug_"): |
|
has_debug = True |
|
if has_debug and build_id: |
|
break |
|
|
|
sh_type = self._uint32(sh_header_offset, self.SH_TYPE) |
|
if sh_type == self.SHT_NOTE: |
|
note_offset = self._offset(sh_header_offset, self.SH_OFFSET) |
|
note_end = note_offset + self._size(sh_header_offset, self.SH_SIZE) |
|
|
|
while note_offset < note_end: |
|
n_namsz = self._uint32(note_offset, self.N_NAMSZ) |
|
n_descsz = self._uint32(note_offset, self.N_DESCSZ) |
|
n_type = self._uint32(note_offset, self.N_TYPE) |
|
n_name = self._bytes(note_offset, self.N_NAME, n_namsz) |
|
|
|
n_descoff = self.N_NAME.copy() |
|
n_descoff[self.EClass.E32] += (n_namsz + 3) & ~3 |
|
n_descoff[self.EClass.E64] += (n_namsz + 3) & ~3 |
|
n_desc = self._bytes(note_offset, n_descoff, n_descsz) |
|
|
|
if ( |
|
n_type == self.NT_GNU_BUILD_ID |
|
and n_namsz == len(self.NT_GNU_BUILD_ID_NAME) |
|
and n_name == self.NT_GNU_BUILD_ID_NAME |
|
): |
|
build_id = n_desc.hex() |
|
if has_debug and build_id: |
|
break |
|
|
|
note_offset += n_descoff[self.e_class] + ((n_descsz + 3) & ~3) |
|
|
|
sh_header_offset += self.SH_HEADER_SIZE[self.e_class] |
|
|
|
return ElfInfo(build_id=build_id, has_debug=has_debug) |
|
|
|
def _bytes(self, base: int, offset: OffsetType, size: int) -> bytes: |
|
offset = base + offset[self.e_class] |
|
return self.mm[offset : offset + size] |
|
|
|
def _string(self, offset: int, end: int) -> bytes: |
|
for index in range(offset, end): |
|
if self.mm[index] == 0: |
|
return self.mm[offset:index] |
|
raise RuntimeError("Out of range access for string") |
|
|
|
def _byte(self, base: int, offset: OffsetType) -> int: |
|
offset = base + offset[self.e_class] |
|
return int(self.mm[offset]) |
|
|
|
def _uint16(self, base: int, offset: OffsetType) -> int: |
|
offset = base + offset[self.e_class] |
|
return struct.unpack("H", self.mm[offset : offset + 2])[0] |
|
|
|
def _uint32(self, base: int, offset: OffsetType) -> int: |
|
offset = base + offset[self.e_class] |
|
return struct.unpack("I", self.mm[offset : offset + 4])[0] |
|
|
|
def _offset(self, base: int, offset: OffsetType) -> int: |
|
offset = base + offset[self.e_class] |
|
size, fmt = {self.EClass.E32: (4, "I"), self.EClass.E64: (8, "Q")}[self.e_class] |
|
return struct.unpack(fmt, self.mm[offset : offset + size])[0] |
|
|
|
def _size(self, base: int, offset: OffsetType) -> int: |
|
return self._offset(base, offset) |
|
|
|
|
|
@dataclass |
|
class UnstrippedNativeLibrary: |
|
path: Path |
|
elf_info: ElfInfo |
|
|
|
|
|
class UnstrippedNativeLibraries: |
|
def __init__(self): |
|
self.is_build_id_check_required = False |
|
self.files_for_name: Dict[str, Path] = {} |
|
self.files_for_build_id: Dict[str, Path] = {} |
|
|
|
def find( |
|
self, |
|
name: str, |
|
directory: str, |
|
build_id: str, |
|
) -> Optional[UnstrippedNativeLibrary]: |
|
return self.files_for_build_id.get(build_id) or self.files_for_name.get(name) |
|
|
|
def parse(self, text_files: List[str]): |
|
start_time = time.time() |
|
|
|
self.executor = ThreadPoolExecutor() |
|
self.lock = threading.Lock() |
|
self.cv = threading.Condition() |
|
self.running = 0 |
|
self.errors = [] |
|
|
|
for text_file in text_files: |
|
with self.cv: |
|
self.running += 1 |
|
self.executor.submit(self.parse_text, text_file) |
|
|
|
with self.cv: |
|
self.cv.wait_for(lambda: self.running == 0) |
|
|
|
elapsed = str(round(time.time() - start_time, 2)) |
|
log( |
|
f"Unstripped-native-libraries parse done: {now()}: {elapsed}s\n" |
|
+ " ".join(text_files) |
|
) |
|
for error in self.errors: |
|
if error: |
|
log(f"Error: {error[0]}: {error[1]}") |
|
|
|
self.executor = None |
|
self.lock = None |
|
self.cv = None |
|
self.running = None |
|
self.errors = None |
|
|
|
def parse_text(self, text_file: str): |
|
try: |
|
path = Path(text_file).resolve() |
|
root = path.parent |
|
elfs = path.read_text() |
|
for elf in elfs.splitlines(): |
|
elf = elf.strip() |
|
if elf: |
|
with self.cv: |
|
self.running += 1 |
|
self.executor.submit(self.parse_line, path, root, elf) |
|
except Exception: |
|
self.errors.append((text_file, _e())) |
|
|
|
with self.cv: |
|
self.running -= 1 |
|
self.cv.notify() |
|
|
|
def parse_line(self, path: Path, root: Path, line: str): |
|
try: |
|
elf_path = Path(line) |
|
if elf_path.is_absolute(): |
|
pass |
|
else: |
|
# Relative path, concat unstripped_native_libraries text parent path. |
|
elf_path = root / elf_path |
|
|
|
elf_info = parse_elf(elf_path) |
|
|
|
self.files_for_build_id[elf_info.build_id] = UnstrippedNativeLibrary( |
|
path=elf_path, elf_info=elf_info |
|
) |
|
|
|
self.files_for_name[elf_path.name] = UnstrippedNativeLibrary( |
|
path=elf_path, elf_info=elf_info |
|
) |
|
except Exception: |
|
self.errors.append((elf_path, _e())) |
|
|
|
with self.cv: |
|
self.running -= 1 |
|
self.cv.notify() |
|
|
|
|
|
class ModuleLocator: |
|
def __init__( |
|
self, |
|
debugger: lldb.SBDebugger, |
|
): |
|
self.debugger = debugger |
|
self.local_lock = threading.Lock() |
|
self.unstripped_native_libraries = UnstrippedNativeLibraries() |
|
|
|
def parse_unstripped_native_libraries_in_background( |
|
self, |
|
options: argparse.Namespace, |
|
): |
|
threading.Thread( |
|
target=self.parse_unstripped_native_libraries_thread, |
|
args=(options,), |
|
daemon=True, |
|
).start() |
|
|
|
def parse_unstripped_native_libraries_thread(self, options: argparse.Namespace): |
|
try: |
|
with self.local_lock: |
|
with TemporaryDirectory() as tmpdir: |
|
if options.search_paths: |
|
self.traverse_search_paths(options, tmpdir) |
|
|
|
self.unstripped_native_libraries.parse( |
|
options.unstripped_native_libraries, |
|
) |
|
except Exception: |
|
log(f"Exception: {_e()}") |
|
|
|
def traverse_search_paths(self, options: argparse.Namespace, tmpdir: str): |
|
paths = [] |
|
for search_path in options.search_paths: |
|
log(f"Traverse search path: {search_path}") |
|
for dirpath, _, filenames in os.walk(search_path): |
|
for filename in filenames: |
|
if filename.endswith(".so"): |
|
paths.append((Path(dirpath) / filename).resolve()) |
|
|
|
files = Path(tmpdir) / "files.txt" |
|
if not options.unstripped_native_libraries: |
|
options.unstripped_native_libraries = [str(files)] |
|
else: |
|
options.unstripped_native_libraries.append(str(files)) |
|
|
|
with files.open("w") as f: |
|
for path in set(paths): |
|
print(path, file=f) |
|
|
|
def set_callback(self): |
|
self.debugger.GetSelectedPlatform().SetLocateModuleCallback( |
|
locate_module_callback |
|
) |
|
|
|
def locate_module_callback( |
|
self, |
|
module_spec: lldb.SBModuleSpec, |
|
module_file_spec: lldb.SBFileSpec, |
|
symbol_file_spec: lldb.SBFileSpec, |
|
) -> lldb.SBError: |
|
try: |
|
build_id = ctypes.string_at( |
|
int(module_spec.GetUUIDBytes()), module_spec.GetUUIDLength() |
|
).hex() |
|
|
|
name = module_spec.GetFileSpec().GetFilename() |
|
directory = module_spec.GetFileSpec().GetDirectory() |
|
log(f"\nLoaded: {now()}: {build_id}\n{module_spec}") |
|
|
|
result = self.find( |
|
name=name, |
|
directory=directory, |
|
build_id=build_id, |
|
module_file_spec=module_file_spec, |
|
symbol_file_spec=symbol_file_spec, |
|
) |
|
if result.Success(): |
|
return result |
|
|
|
return lldb.SBError("Not found") |
|
except Exception: |
|
log(f"locate_module_callback: Exception\n{_e()}") |
|
return lldb.SBError("Error") |
|
|
|
def find( |
|
self, |
|
name: str, |
|
directory: str, |
|
build_id: str, |
|
module_file_spec: lldb.SBFileSpec, |
|
symbol_file_spec: lldb.SBFileSpec, |
|
) -> lldb.SBError: |
|
with self.local_lock: |
|
lib = self.unstripped_native_libraries.find( |
|
name=name, |
|
directory=directory, |
|
build_id=build_id, |
|
) |
|
if lib: |
|
log( |
|
"Use unstripped native library: " |
|
f"has_debug={lib.elf_info.has_debug}: {lib.path}" |
|
) |
|
module_file_spec.SetDirectory(str(lib.path.parent)) |
|
module_file_spec.SetFilename(str(lib.path.name)) |
|
if lib.elf_info.has_debug: |
|
symbol_file_spec.SetDirectory(str(lib.path.parent)) |
|
symbol_file_spec.SetFilename(str(lib.path.name)) |
|
return lldb.SBError() |
|
|
|
return lldb.SBError("Not found") |
|
|
|
|
|
@dataclass |
|
class GlobalContext: |
|
log: NamedTemporaryFile |
|
debugger: lldb.SBDebugger |
|
adb: str |
|
run_as: [str] |
|
lldb_server: LldbServerContext |
|
debuggee_process: DebuggeeProcess |
|
jdwp_forwards: List[str] |
|
module_locator: ModuleLocator |
|
|
|
|
|
GLOBAL_CONTEXT = GlobalContext( |
|
log=NamedTemporaryFile( |
|
mode="w", prefix="lldb-", suffix=".log", delete=False, encoding="utf-8" |
|
), |
|
debugger=None, |
|
adb=None, |
|
run_as=[], |
|
lldb_server=None, |
|
debuggee_process=None, |
|
jdwp_forwards=[], |
|
module_locator=None, |
|
) |
|
|
|
|
|
class Result: |
|
def __init__(self, result: lldb.SBCommandReturnObject): |
|
self.result = result |
|
|
|
def print(self, message: str): |
|
log(message) |
|
self.result.Print(message + "\n") |
|
|
|
def warning(self, message: str): |
|
log(message) |
|
self.result.AppendWarning(message + "\n") |
|
|
|
def error(self, message: str, exception: Optional[str] = None): |
|
if exception: |
|
log(exception) |
|
log(message) |
|
self.result.SetError(message + "\n") |
|
|
|
|
|
class BaseCommand: |
|
program = "base" |
|
description = "base description" |
|
|
|
@classmethod |
|
def create_options(cls) -> ArgParser: |
|
raise RuntimeError(f"{cls} should implement create_options") |
|
|
|
@classmethod |
|
def register_lldb_command(cls, debugger: lldb.SBDebugger, module_name: str): |
|
parser = cls.create_options() |
|
cls.__doc__ = parser.format_help() |
|
lldb_handle_command( |
|
debugger, |
|
f"command script add -c {module_name}.{cls.__name__} {cls.program}", |
|
) |
|
|
|
def __init__(self, debugger: lldb.SBDebugger, unused: Dict): |
|
self.parser = self.create_options() |
|
self.help_string = self.parser.format_help() |
|
|
|
def get_long_help(self) -> str: |
|
return self.help_string |
|
|
|
def get_short_help(self) -> str: |
|
return self.description |
|
|
|
|
|
class CommandAndroidAttach(BaseCommand): |
|
program = "android-attach" |
|
description = "Attach LLDB to target process" |
|
|
|
@classmethod |
|
def create_options(cls) -> ArgParser: |
|
parser = ArgParser( |
|
description=cls.description, |
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
|
add_help=False, |
|
) |
|
|
|
parser.add_argument( |
|
"-A", |
|
"--am-arguments", |
|
help="Set am arguments to start app. " |
|
+ "e.g. --am-arguments='start -D -n com.meta.apk/.MainActivity'. " |
|
+ f"{cls.program} can restart the app with --force-stop.", |
|
) |
|
parser.add_argument( |
|
"-C", |
|
"--lldb-connection", |
|
help="Set LLDB connection type", |
|
default="unix", |
|
choices=["tcp", "unix"], |
|
) |
|
parser.add_argument( |
|
"-L", |
|
"--lldb-server-staging-base-path", |
|
help="Set device directory path to store lldb-server", |
|
default=LLDB_SERVER_STAGING_BASE_PATH, |
|
) |
|
parser.add_argument( |
|
"-a", |
|
"--adb", |
|
help="Set adb path", |
|
default="adb", |
|
) |
|
parser.add_argument( |
|
"-l", |
|
"--lldb-server", |
|
help="Set lldb-server path", |
|
required=True, |
|
) |
|
parser.add_argument( |
|
"-q", |
|
"--force-stop-wait-in-seconds", |
|
type=float, |
|
default=1, |
|
help="Set wait for force-stop finalization.", |
|
) |
|
parser.add_argument( |
|
"-r", |
|
"--run-as", |
|
help="Set package name for run-as", |
|
) |
|
parser.add_argument( |
|
"-s", |
|
"--search-path", |
|
action="append", |
|
dest="search_paths", |
|
help="Set paths to search unstripped native libraries. " |
|
+ f"{cls.program} will traverse the paths, it may take a long time. " |
|
+ "Recommend to use --unstripped-native-libraries instead.", |
|
) |
|
parser.add_argument( |
|
"-t", |
|
"--force-stop", |
|
action="store_true", |
|
help="Use 'am force-stop' to force stop the target process. " |
|
+ "This only works with --name and APK package name.", |
|
) |
|
parser.add_argument( |
|
"-u", |
|
"--unstripped-native-libraries", |
|
action="append", |
|
help="Text files with path of unstripped shared library per line. " |
|
+ "The path can be full path or relative path to the text file.", |
|
) |
|
parser.add_argument( |
|
"-w", |
|
"--wait-process-in-seconds", |
|
type=float, |
|
default=DEFAULT_WAIT_PROCESS_IN_SEC, |
|
help="Wait the target process when it does not exist. " |
|
+ "Only works with --name.", |
|
) |
|
|
|
target_group = parser.add_mutually_exclusive_group(required=True) |
|
target_group.add_argument( |
|
"-n", |
|
"--name", |
|
help="Set target process name to attach", |
|
) |
|
target_group.add_argument( |
|
"-p", |
|
"--pid", |
|
type=int, |
|
help="Set target process id to attach", |
|
) |
|
|
|
return parser |
|
|
|
def __init__(self, debugger: lldb.SBDebugger, internal_dict: Dict): |
|
super().__init__(debugger, internal_dict) |
|
|
|
def __call__( |
|
self, |
|
debugger: lldb.SBDebugger, |
|
command: str, |
|
exe_ctx: lldb.SBExecutionContext, |
|
lldb_result: lldb.SBCommandReturnObject, |
|
): |
|
print_message(f"log: {GLOBAL_CONTEXT.log.name}") |
|
result = Result(lldb_result) |
|
log_command(self.program, command) |
|
|
|
try: |
|
options = self.parse_options(command, result) |
|
|
|
GLOBAL_CONTEXT.module_locator = ModuleLocator(debugger) |
|
GLOBAL_CONTEXT.module_locator.parse_unstripped_native_libraries_in_background( |
|
options=options, |
|
) |
|
|
|
self.attach_process(debugger, options, result) |
|
except Exception as e: |
|
result.error(f"Failed to run {self.program}\n{e}", _e()) |
|
|
|
def parse_options(self, command: str, result: Result) -> argparse.Namespace: |
|
options = self.parser.parse_args(shlex.split(command)) |
|
|
|
if not Path(options.lldb_server).exists(): |
|
raise RuntimeError(f"--lldb-server {options.lldb_server} does not exist.") |
|
|
|
if not options.search_paths and not options.unstripped_native_libraries: |
|
result.warning( |
|
"Recommend to use --unstripped-native-libraries" |
|
+ " to specify the unstripped native libraries to optimize loading symbols." |
|
) |
|
|
|
log(str(options) + "\n") |
|
return options |
|
|
|
def attach_process( |
|
self, debugger: lldb.SBDebugger, options: argparse.Namespace, result: Result |
|
) -> Optional[DebuggeeProcess]: |
|
self.check_adb(options) |
|
GLOBAL_CONTEXT.adb = options.adb |
|
|
|
is_root = self.is_root() |
|
|
|
if options.force_stop: |
|
self.force_stop(options) |
|
|
|
if options.am_arguments: |
|
self.start_process(options) |
|
|
|
run_as = self.process_run_as(debugger, options, is_root) |
|
GLOBAL_CONTEXT.run_as = run_as |
|
|
|
debuggee_process = self.get_debuggee_process(options, result) |
|
|
|
lldb_server_process = self.run_lldb_server( |
|
debugger=debugger, |
|
options=options, |
|
is_root=is_root, |
|
result=result, |
|
) |
|
|
|
self.attach( |
|
debugger=debugger, |
|
options=options, |
|
debuggee_process=debuggee_process, |
|
lldb_server_process=lldb_server_process, |
|
result=result, |
|
) |
|
|
|
self.save_contexts( |
|
options=options, |
|
debuggee_process=debuggee_process, |
|
lldb_server_process=lldb_server_process, |
|
) |
|
|
|
result.print(f"Attached pid:{debuggee_process.process_id}") |
|
|
|
def check_adb(self, options: argparse.Namespace): |
|
try: |
|
args = [options.adb, "--version"] |
|
log(" ".join(args)) |
|
p = subprocess.run( |
|
args=args, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
) |
|
if p.returncode != 0: |
|
raise RuntimeError("Failed to run adb") |
|
log(p.stderr) |
|
log(p.stdout) |
|
if "Android Debug Bridge" not in p.stdout: |
|
raise RuntimeError(f"Unexpected adb output: {p.stdout}") |
|
except Exception as e: |
|
raise RuntimeError(f"Failed to check adb\n{e}") |
|
|
|
def is_root(self) -> bool: |
|
return adb_shell(["whoami"]).rstrip() == "root" |
|
|
|
def is_target_apk(self, options: argparse.Namespace) -> bool: |
|
return options.name and "." in options.name |
|
|
|
def force_stop(self, options: argparse.Namespace): |
|
if not self.is_target_apk(options): |
|
raise RuntimeError( |
|
"--force-stop requires --name to specify APK package name." |
|
) |
|
|
|
try: |
|
if self.get_debuggee_candidates(options): |
|
am = ["am", "force-stop", options.name] |
|
print_message("Force-stop app: " + " ".join(am)) |
|
adb_shell(am) |
|
|
|
print_message( |
|
f"Waiting force-stop finish for {options.force_stop_wait_in_seconds} seconds" |
|
) |
|
time.sleep(options.force_stop_wait_in_seconds) |
|
else: |
|
print_message(f"{options.name} not found. skip to force-stop.") |
|
except Exception as e: |
|
raise RuntimeError(f"Failed to stop {options.name}.\n{e}") |
|
|
|
def start_process(self, options: argparse.Namespace): |
|
am_arguments = self.resolve_activity(options, options.am_arguments) |
|
am = ["am", am_arguments] |
|
print_message("Start process: " + " ".join(am)) |
|
self.run_am(options, am) |
|
|
|
def resolve_activity( |
|
self, options: argparse.Namespace, arguments: str |
|
) -> List[str]: |
|
args = shlex.split(arguments) |
|
if "-n" not in args: |
|
return arguments |
|
index = args.index("-n") |
|
component_index = index + 1 |
|
if component_index < len(args): |
|
package_name = self.get_package_name(options) |
|
if args[component_index] == "__DEFAULT_ACTIVITY__": |
|
try: |
|
activity = self.find_activity(options, package_name) |
|
print_message(f"Found default activity: {activity}") |
|
args[component_index] = activity |
|
return " ".join(args) |
|
except Exception as e: |
|
raise RuntimeError( |
|
f"Failed to find default activity: {package_name}.\n{e}" |
|
) |
|
elif "/" not in args[component_index]: |
|
component = package_name + "/" + args[component_index] |
|
print_message( |
|
f"Use '{component}' instead of" + f" '{args[component_index]}'" |
|
) |
|
args[component_index] = component |
|
return " ".join(args) |
|
return arguments |
|
|
|
def find_activity(self, options: argparse.Namespace, package_name: str) -> str: |
|
error = "" |
|
try: |
|
# Try cmd package resolve-activity first |
|
for line in adb_shell( |
|
[ |
|
"cmd", |
|
"package", |
|
"resolve-activity", |
|
"-c", |
|
"android.intent.category.LAUNCHER", |
|
package_name, |
|
"|", |
|
"grep", |
|
"name=", |
|
], |
|
).splitlines(): |
|
key, value = line.strip().split("=") |
|
if key == "name" and not value.startswith("com.android"): |
|
return f"{package_name}/{value}" |
|
except Exception as e: |
|
error = str(e) |
|
|
|
try: |
|
# Fallback to pm dump |
|
found_main_action = False |
|
main_action = "android.intent.action.MAIN:" |
|
for line in adb_shell( |
|
["pm", "dump", package_name, "|", "grep", "-A1", main_action] |
|
).splitlines(): |
|
if found_main_action: |
|
for component in line.strip().split(): |
|
if "/" in component: |
|
return component |
|
found_main_action = False |
|
elif main_action in line: |
|
found_main_action = True |
|
except Exception as e: |
|
error += str(e) |
|
|
|
raise RuntimeError(error or "package does not have activity?") |
|
|
|
def run_am(self, options: argparse.Namespace, am: List[str]): |
|
try: |
|
adb_shell(am) |
|
except Exception as e: |
|
raise RuntimeError(f"Failed to start {options.name}.\n{e}") |
|
|
|
def get_debuggee_process( |
|
self, |
|
options: argparse.Namespace, |
|
result: Result, |
|
) -> DebuggeeProcess: |
|
if options.name: |
|
# Find the specified name process |
|
debuggee_process_id = self.find_debuggee_process(options, result) |
|
else: |
|
# Use the specified process ID |
|
debuggee_process_id = str(options.pid) |
|
|
|
return DebuggeeProcess( |
|
process_id=debuggee_process_id, |
|
) |
|
|
|
def find_debuggee_process(self, options: argparse.Namespace, result: Result) -> str: |
|
candidates = [] |
|
print_message( |
|
f"Waiting process {options.name} for {options.wait_process_in_seconds} seconds" |
|
) |
|
for _ in range(int(options.wait_process_in_seconds / CHECK_SLEEP_IN_SEC)): |
|
candidates = self.get_debuggee_candidates(options) |
|
if len(candidates) > 0: |
|
break |
|
time.sleep(CHECK_SLEEP_IN_SEC) |
|
|
|
if len(candidates) == 0: |
|
raise RuntimeError(f"Process {options.name} not found.") |
|
elif len(candidates) > 1: |
|
result.warning( |
|
f"{options.name} has multiple processes: " |
|
+ ", ".join(candidates) |
|
+ f". using {candidates[0]}" |
|
) |
|
|
|
return candidates[0] |
|
|
|
def get_debuggee_candidates( |
|
self, |
|
options: argparse.Namespace, |
|
) -> List[str]: |
|
candidates = [] |
|
for process in self.get_processes(options, "PID,ARGS=CMD", options.name): |
|
pid, cmd = process.lstrip().split(maxsplit=1) |
|
if (cmd + " ").startswith(options.name + " "): |
|
candidates.append(pid) |
|
return candidates |
|
|
|
def process_run_as( |
|
self, debugger: lldb.SBDebugger, options: argparse.Namespace, is_root: bool |
|
) -> List[str]: |
|
if is_root: |
|
# root does not need run-as |
|
return [] |
|
else: |
|
# non-root needs run-as to attach lldb-server to the target process |
|
name = options.run_as or self.get_package_name(options) |
|
lldb_handle_command( |
|
debugger, |
|
f"settings set platform.plugin.remote-android.package-name {name}", |
|
) |
|
return ["run-as", name] |
|
|
|
def get_package_name(self, options: argparse.Namespace) -> Optional[str]: |
|
name = options.name |
|
if name and ":" in name: |
|
# com.meta.apk:Subprocess -> com.meta.apk |
|
name = name.split(":", 1)[0] |
|
return name |
|
|
|
def run_lldb_server( |
|
self, |
|
debugger: lldb.SBDebugger, |
|
options: argparse.Namespace, |
|
is_root: bool, |
|
result: Result, |
|
) -> LldbServerProcess: |
|
lldb_server_args = self.push_lldb_server( |
|
debugger=debugger, |
|
options=options, |
|
is_root=is_root, |
|
) |
|
|
|
lldb_server_id = str(uuid.uuid4()) |
|
log(f"lldb-server ID: {lldb_server_id}") |
|
|
|
lldb_server_log = ( |
|
f"{options.lldb_server_staging_base_path}/.{lldb_server_id}.log" |
|
) |
|
|
|
lldb_server_cwd = None |
|
lldb_connection = ( |
|
LldbConnection.TCP |
|
if options.lldb_connection == "tcp" |
|
else LldbConnection.UNIX |
|
) |
|
if lldb_connection == LldbConnection.TCP: |
|
# mkfifo to listen the LLDB server port |
|
fifo = ( |
|
f"{options.lldb_server_staging_base_path}/.{lldb_server_id}" |
|
if is_root |
|
else f"{LLDB_SERVER_PKG_BASE_PATH}/.{lldb_server_id}" |
|
) |
|
adb_shell_run_as(["mkfifo", fifo]) |
|
|
|
lldb_connection_args = [ |
|
"--socket-file", |
|
fifo, |
|
"--listen", |
|
"*:0", |
|
] |
|
else: # lldb_connection == LldbConnection.UNIX: |
|
if is_root: |
|
unix_socket_base = options.lldb_server_staging_base_path |
|
else: |
|
lldb_server_cwd = adb_shell_run_as(["pwd"]).rstrip() |
|
unix_socket_base = f"{lldb_server_cwd}/{LLDB_SERVER_PKG_BASE_PATH}" |
|
unix_socket = f"{unix_socket_base}/.{lldb_server_id}.sock" |
|
|
|
lldb_connection_args = ["--listen", f"unix-abstract://{unix_socket}"] |
|
|
|
# Run LLDB server |
|
adb_shell( |
|
["nohup"] |
|
+ lldb_server_args |
|
+ ["platform", "--server"] |
|
+ lldb_connection_args |
|
+ [">", lldb_server_log, "2>&1", "&"], |
|
) |
|
|
|
# Wait until lldb-server shows up in ps command |
|
for _ in range(int(LLDB_SERVER_CHECK_WAIT_IN_SEC / CHECK_SLEEP_IN_SEC)): |
|
for process in self.get_processes( |
|
options, |
|
"PID,PGID,ARGS=CMD", |
|
fifo if lldb_connection == LldbConnection.TCP else unix_socket, |
|
): |
|
pid, pgid, cmd = process.lstrip().split(maxsplit=2) |
|
if lldb_connection == LldbConnection.TCP: |
|
if fifo in cmd: |
|
# Read the listening port from the FIFO |
|
lldb_server_port = adb_shell_run_as(["cat", fifo]) |
|
return LldbServerProcess( |
|
lldb_server_id=lldb_server_id, |
|
lldb_server_endpoint=( |
|
f"unix-connect://localhost:{lldb_server_port}/" |
|
), |
|
lldb_server_cwd=lldb_server_cwd, |
|
lldb_server_log=lldb_server_log, |
|
process_id=pid, |
|
process_group_id=pgid, |
|
) |
|
else: # lldb_connection == LldbConnection.UNIX: |
|
if unix_socket in cmd: |
|
return LldbServerProcess( |
|
lldb_server_id=lldb_server_id, |
|
lldb_server_endpoint=f"unix-abstract-connect://{unix_socket}", |
|
lldb_server_cwd=lldb_server_cwd, |
|
lldb_server_log=lldb_server_log, |
|
process_id=pid, |
|
process_group_id=pgid, |
|
) |
|
time.sleep(CHECK_SLEEP_IN_SEC) |
|
|
|
# Something went wrong |
|
output = adb_shell(["cat", lldb_server_log]) |
|
adb_shell(["rm", "-f", lldb_server_log]) |
|
raise RuntimeError(f"{LLDB_SERVER_FILE} did not start: \n{output}") |
|
|
|
def push_lldb_server( |
|
self, |
|
debugger: lldb.SBDebugger, |
|
options: argparse.Namespace, |
|
is_root: bool, |
|
) -> List[str]: |
|
# Push lldb-server to the staging path |
|
lldb_server_staging_path = ( |
|
f"{options.lldb_server_staging_base_path}/{LLDB_SERVER_FILE}" |
|
) |
|
try: |
|
adb_push(options.lldb_server, lldb_server_staging_path) |
|
except: |
|
pass |
|
try: |
|
adb_shell( |
|
["chown", "-R", "shell:shell", options.lldb_server_staging_base_path] |
|
) |
|
except: |
|
pass |
|
try: |
|
adb_shell(["chmod", "-R", "755", options.lldb_server_staging_base_path]) |
|
except: |
|
pass |
|
|
|
if is_root: |
|
# Use the staging path directly |
|
return [lldb_server_staging_path] |
|
else: |
|
# Need to copy the staging lldb-server into the target package |
|
lldb_server_pkg_path = f"{LLDB_SERVER_PKG_BASE_PATH}/{LLDB_SERVER_FILE}" |
|
try: |
|
adb_shell_run_as(["mkdir", "-p", LLDB_SERVER_PKG_BASE_PATH]) |
|
except: |
|
pass |
|
try: |
|
adb_shell_run_as(["cp", lldb_server_staging_path, lldb_server_pkg_path]) |
|
except: |
|
pass |
|
try: |
|
adb_shell_run_as(["chmod", "755", lldb_server_pkg_path]) |
|
except: |
|
pass |
|
return GLOBAL_CONTEXT.run_as + [lldb_server_pkg_path] |
|
|
|
def attach( |
|
self, |
|
debugger: lldb.SBDebugger, |
|
options: argparse.Namespace, |
|
debuggee_process: DebuggeeProcess, |
|
lldb_server_process: LldbServerProcess, |
|
result: Result, |
|
): |
|
try: |
|
self.connect_platform(options, debugger, lldb_server_process) |
|
GLOBAL_CONTEXT.module_locator.set_callback() |
|
|
|
return_object = lldb_handle_command_to_log( |
|
debugger, f"platform process attach -p {debuggee_process.process_id}" |
|
) |
|
if return_object.GetError(): |
|
raise RuntimeError(return_object.GetError()) |
|
except Exception as e: |
|
result.error(f"Failed to attach {debuggee_process.process_id}\n{e}", _e()) |
|
|
|
def connect_platform( |
|
self, |
|
options: argparse.Namespace, |
|
debugger: lldb.SBDebugger, |
|
lldb_server_process: LldbServerProcess, |
|
): |
|
return_object = lldb_handle_command_to_log( |
|
debugger, "platform select remote-android" |
|
) |
|
if return_object.GetError(): |
|
raise RuntimeError(return_object.GetError()) |
|
|
|
return_object = lldb_handle_command_to_log( |
|
debugger, |
|
f"platform connect {lldb_server_process.lldb_server_endpoint}", |
|
) |
|
if return_object.GetError(): |
|
raise RuntimeError(return_object.GetError()) |
|
|
|
def save_contexts( |
|
self, |
|
options: argparse.Namespace, |
|
debuggee_process: DebuggeeProcess, |
|
lldb_server_process: LldbServerProcess, |
|
): |
|
GLOBAL_CONTEXT.process = debuggee_process |
|
|
|
processes = self.get_processes( |
|
options, "PID,PGID,ARGS=CMD", f"-w {lldb_server_process.process_group_id}" |
|
) |
|
|
|
# Get forward list used by LLDB |
|
lldb_forwards = [] |
|
lldb_unix_sockets = [] |
|
try: |
|
if "-abstract-" in lldb_server_process.lldb_server_endpoint: |
|
lldb_forwards = self.get_lldb_unix_forwards( |
|
options, |
|
lldb_server_process, |
|
processes, |
|
lldb_unix_sockets, |
|
) |
|
else: |
|
lldb_forwards = self.get_lldb_tcp_forwards(options, lldb_server_process) |
|
lldb_forwards = sorted(lldb_forwards.keys()) |
|
log("lldb-server forward list: " + " ".join(lldb_forwards)) |
|
except Exception: |
|
log(f"Failed to get lldb-server forward list:\n{_e()}") |
|
|
|
GLOBAL_CONTEXT.lldb_server = LldbServerContext( |
|
process=lldb_server_process, |
|
forwards=lldb_forwards, |
|
unix_sockets=lldb_unix_sockets, |
|
) |
|
|
|
def get_lldb_unix_forwards( |
|
self, |
|
options: argparse.Namespace, |
|
lldb_server_process: LldbServerProcess, |
|
processes: List[str], |
|
lldb_unix_sockets: List[str], |
|
) -> List[str]: |
|
lldb_forwards = {} |
|
unix_sockets = {} |
|
UNIX_PREFIX = "localabstract:" |
|
forwards = {} |
|
for forward in run_adb(["forward", "--list"]).splitlines(): |
|
if forward: |
|
_, local_port, remote_port = forward.lstrip().split() |
|
if remote_port.startswith(UNIX_PREFIX): |
|
unix_socket = remote_port[len(UNIX_PREFIX) :] |
|
forwards[unix_socket] = local_port |
|
|
|
for process in processes: |
|
d = process.split("unix-abstract://") |
|
if len(d) == 2: |
|
unix_socket = d[1].split()[0] |
|
lldb_forwards[forwards[unix_socket]] = True |
|
unix_sockets[unix_socket] = True |
|
|
|
lldb_unix_sockets.extend(unix_sockets.keys()) |
|
return lldb_forwards |
|
|
|
def get_lldb_tcp_forwards( |
|
self, |
|
options: argparse.Namespace, |
|
lldb_server_process: LldbServerProcess, |
|
) -> List[str]: |
|
lldb_forwards = {} |
|
TCP_PREFIX = "tcp:" |
|
forwards = {} |
|
for forward in run_adb(["forward", "--list"]).splitlines(): |
|
if forward: |
|
_, local_port, remote_port = forward.lstrip().split() |
|
if remote_port.startswith(TCP_PREFIX): |
|
tcp_port = remote_port[len(TCP_PREFIX) :] |
|
forwards[tcp_port] = local_port |
|
|
|
for tcp_connection in adb_shell( |
|
["cat", f"/proc/{lldb_server_process.process_id}/net/tcp"], |
|
).splitlines(): |
|
if tcp_connection: |
|
_, local_address, _ = tcp_connection.lstrip().split(maxsplit=2) |
|
if ":" in local_address: |
|
_, tcp_port = local_address.split(":") |
|
tcp_port = str(int(tcp_port, 16)) |
|
if tcp_port in forwards: |
|
lldb_forwards[forwards[tcp_port]] = True |
|
return lldb_forwards |
|
|
|
def get_processes( |
|
self, options: argparse.Namespace, fields: str, filter: str |
|
) -> List[str]: |
|
return adb_shell( |
|
["ps", "-A", "-w", "-o", fields, "|", "grep", filter] |
|
).splitlines() |
|
|
|
|
|
class CommandAndroidExit(BaseCommand): |
|
program = "android-exit" |
|
description = "Clean up debug session" |
|
|
|
@classmethod |
|
def create_options(cls) -> ArgParser: |
|
return ArgParser(description=cls.description, add_help=False) |
|
|
|
def __init__(self, debugger: lldb.SBDebugger, internal_dict: Dict): |
|
super().__init__(debugger, internal_dict) |
|
|
|
def __call__( |
|
self, |
|
debugger: lldb.SBDebugger, |
|
command: str, |
|
exe_ctx: lldb.SBExecutionContext, |
|
lldb_result: lldb.SBCommandReturnObject, |
|
): |
|
result = Result(lldb_result) |
|
log_command(self.program, command) |
|
|
|
try: |
|
ctx = GLOBAL_CONTEXT.lldb_server |
|
if ctx: |
|
if GLOBAL_CONTEXT.jdwp_forwards: |
|
log( |
|
"Remove JDWP forwards: " |
|
+ " ".join(GLOBAL_CONTEXT.jdwp_forwards) |
|
) |
|
for host_port in GLOBAL_CONTEXT.jdwp_forwards: |
|
adb_forward(["--remove", host_port]) |
|
GLOBAL_CONTEXT.jdwp_forwards.clear() |
|
|
|
terminate_lldb_server(ctx) |
|
|
|
result.print("Successfully cleaned up debug session") |
|
except Exception as e: |
|
result.error(f"Exception while clean up debug session\n{e}", _e()) |
|
|
|
|
|
class CommandAndroidResumeJvm(BaseCommand): |
|
program = "android-resume-jvm" |
|
description = "Resume JVM" |
|
|
|
@classmethod |
|
def create_options(cls) -> ArgParser: |
|
return ArgParser(description=cls.description, add_help=False) |
|
|
|
def __init__(self, debugger: lldb.SBDebugger, internal_dict: Dict): |
|
super().__init__(debugger, internal_dict) |
|
|
|
def __call__( |
|
self, |
|
debugger: lldb.SBDebugger, |
|
command: str, |
|
exe_ctx: lldb.SBExecutionContext, |
|
lldb_result: lldb.SBCommandReturnObject, |
|
): |
|
result = Result(lldb_result) |
|
log_command(self.program, command) |
|
|
|
if not GLOBAL_CONTEXT.process: |
|
result.error("Run android-attach first") |
|
return |
|
|
|
result.print(f"Resume pid:{GLOBAL_CONTEXT.process.process_id}") |
|
threading.Thread(target=self.resume, daemon=True).start() |
|
|
|
def resume(self): |
|
try: |
|
jdwp_port = unused_tcp_port() |
|
forward = f"tcp:{jdwp_port}" |
|
process_id = GLOBAL_CONTEXT.process.process_id |
|
|
|
# Forward JDWP to the temporary TCP port |
|
GLOBAL_CONTEXT.jdwp_forwards.append(forward) |
|
adb_forward([forward, f"jdwp:{process_id}"]) |
|
|
|
# Send 'resume' command to JVM |
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
|
s.settimeout(RESUME_JVM_TIMEOUT_IN_SEC) |
|
s.connect(("127.0.0.1", jdwp_port)) |
|
s.send("JDWP-Handshake".encode()) |
|
|
|
# JVM returns "JDWP-Handshake" |
|
handshake = s.recv(14) |
|
log(f"handshake: jdwp:{process_id}: {handshake}") |
|
|
|
# VirtualMachine(1) Resume(9) |
|
s.send(struct.pack(">IIBBB", 11, 0, 0, 1, 9)) |
|
|
|
# Reply |
|
reply = struct.unpack(">IIBH", s.recv(11)) |
|
log(f"reply: jdwp:{process_id}: {reply}") |
|
|
|
# Wait for the process to be resumed |
|
time.sleep(1) |
|
|
|
# Remove the forward |
|
adb_forward(["--remove", forward]) |
|
GLOBAL_CONTEXT.jdwp_forwards.remove(forward) |
|
|
|
print_message("Resumed.") |
|
|
|
except Exception as e: |
|
print_message(f"Failed to resume\n:{e}") |
|
log(_e()) |
|
|
|
|
|
def locate_module_callback( |
|
module_spec: lldb.SBModuleSpec, |
|
module_file_spec: lldb.SBFileSpec, |
|
symbol_file_spec: lldb.SBFileSpec, |
|
) -> lldb.SBError: |
|
return GLOBAL_CONTEXT.module_locator.locate_module_callback( |
|
module_spec=module_spec, |
|
module_file_spec=module_file_spec, |
|
symbol_file_spec=symbol_file_spec, |
|
) |
|
|
|
|
|
def parse_elf(path: Path) -> Optional[ElfInfo]: |
|
try: |
|
with ElfParser(path) as elf_info: |
|
return elf_info |
|
except Exception: |
|
log(f"Failed to parse ELF: {path}\n{_e()}") |
|
return None |
|
|
|
|
|
def terminate_lldb_server( |
|
ctx: LldbServerContext, |
|
): |
|
log( |
|
f"Terminate lldb-server pid:{ctx.process.process_id} pgid:{ctx.process.process_group_id}" |
|
) |
|
adb_shell_run_as( |
|
[ |
|
"kill", |
|
"-TERM", |
|
f"-{ctx.process.process_group_id}", |
|
"2>/dev/null", |
|
"||", |
|
"true", |
|
], |
|
) |
|
adb_shell(["rm", "-f", ctx.process.lldb_server_log]) |
|
|
|
log("Remove forwards: " + " ".join(ctx.forwards)) |
|
for forward in ctx.forwards: |
|
adb_forward(["--remove", forward]) |
|
|
|
if ctx.unix_sockets: |
|
log("Remove unix domain sockets:") |
|
for unix_socket in ctx.unix_sockets: |
|
unix_socket = ( |
|
unix_socket |
|
if unix_socket.startswith("/") |
|
else f"{ctx.process.lldb_server_cwd}/{unix_socket}" |
|
) |
|
log(" " + unix_socket) |
|
adb_shell_run_as(["rm", "-f", unix_socket]) |
|
|
|
|
|
def adb_push(src: str, dst: str): |
|
run_adb(["push", src, dst]) |
|
|
|
|
|
def adb_forward(args: List[str]) -> str: |
|
return run_adb(["forward"] + args) |
|
|
|
|
|
def adb_shell_run_as(args: List[str]) -> str: |
|
return adb_shell(GLOBAL_CONTEXT.run_as + args) |
|
|
|
|
|
def adb_shell(args: List[str]) -> str: |
|
return run_adb(["shell"] + args) |
|
|
|
|
|
def run_adb(args: List[str], enable_logging: bool = True) -> str: |
|
cmds = [GLOBAL_CONTEXT.adb] + args |
|
if enable_logging: |
|
log(" ".join(["$"] + cmds)) |
|
p = subprocess.run( |
|
args=cmds, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
errors="backslashreplace", |
|
) |
|
returncode = p.returncode |
|
if p.stderr: |
|
if enable_logging: |
|
log(LINE + "stderr:\n" + p.stderr + LINE) |
|
if enable_logging: |
|
log(p.stdout) |
|
if returncode != 0: |
|
raise RuntimeError( |
|
"Error running: " |
|
+ " ".join(cmds) |
|
+ LINE |
|
+ "stderr:\n" |
|
+ p.stderr |
|
+ LINE |
|
+ "stdout:\n" |
|
+ p.stdout |
|
) |
|
return p.stdout |
|
|
|
|
|
def lldb_handle_command(debugger: lldb.SBDebugger, command: str): |
|
log(f"(lldb) {command}") |
|
debugger.HandleCommand(command) |
|
|
|
|
|
def lldb_handle_command_to_log( |
|
debugger: lldb.SBDebugger, command: str |
|
) -> lldb.SBCommandReturnObject: |
|
log(f"(lldb) {command}") |
|
return_object = lldb.SBCommandReturnObject() |
|
debugger.GetCommandInterpreter().HandleCommand(command, return_object) |
|
if return_object.GetOutput(): |
|
log(return_object.GetOutput()) |
|
if return_object.GetError(): |
|
log("ERROR:\n" + return_object.GetError()) |
|
return return_object |
|
|
|
|
|
def unused_tcp_port() -> int: |
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
|
s.bind(("", 0)) |
|
addr = s.getsockname() |
|
port = addr[1] |
|
return port |
|
|
|
|
|
def log_command(program: str, command: str): |
|
log(BOLDLINE + program + " " + command + "\n" + now() + BOLDLINE) |
|
|
|
|
|
def now(): |
|
return f"{datetime.datetime.now():%H:%M:%S.%f}" |
|
|
|
|
|
def _e() -> str: |
|
return traceback.format_exc() |
|
|
|
|
|
def print_message(*args, **kwargs): |
|
print(*args, **kwargs, file=GLOBAL_CONTEXT.log, flush=True) |
|
print(*args, **kwargs, flush=True) |
|
|
|
|
|
def log(message: str): |
|
GLOBAL_CONTEXT.log.file.write(message + "\n") |
|
GLOBAL_CONTEXT.log.file.flush() |
|
|
|
|
|
def __lldb_init_module(debugger: lldb.SBDebugger, dict: Dict): |
|
CommandAndroidAttach.register_lldb_command(debugger, __name__) |
|
CommandAndroidExit.register_lldb_command(debugger, __name__) |
|
CommandAndroidResumeJvm.register_lldb_command(debugger, __name__) |