Last active
April 24, 2024 12:24
-
-
Save calebccff/435d944eb1a7c518d6029badf6894950 to your computer and use it in GitHub Desktop.
A script to watch U-Boot serial output and automatically decode register dumps when a synchronous abort occurs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# SPDX-License-Identifier: GPL-2.0 | |
# Copyright 2023 Linaro Ltd. | |
# Author: Caleb Connolly <[email protected]> | |
# | |
# Listens to a serial port and automatically detects and decodes | |
# Synchronous aborts. DO NOT run your serial monitor at the same | |
# time as this tool! It will configure a PTY and symlink it to | |
# /tmp/abortwatch.pty, you should point your serial monitor there | |
# instead! | |
from argparse import ArgumentParser | |
import serial | |
import os | |
import pty | |
import select | |
from threading import Thread | |
from queue import Queue, Empty | |
from time import sleep | |
from pathlib import Path | |
import re | |
def enqueue_output(in_fd, queue: Queue): | |
in_fh = os.fdopen(in_fd, 'rb') | |
while True: | |
b = in_fh.read(1) | |
queue.put(b) | |
class Port(serial.Serial): | |
def __init__(self, port: str, baud: int): | |
super().__init__(port, baud, timeout=0) | |
# Master , Slave | |
(pty_in, pty_out) = pty.openpty() | |
self.pty_name = os.ttyname(pty_out) | |
self.pty_out = os.fdopen(pty_in, 'ab', buffering=0) | |
self.pty_fd = pty_in | |
self.q = Queue() | |
self.t = Thread(target=enqueue_output, args=(pty_in, self.q)) | |
self.t.daemon = True | |
self.t.start() | |
def __del__(self) -> None: | |
os.close(self.pty_fd) # This ought to cause the thread to die | |
return super().__del__() | |
# Wrap serial read to write all bytes read to | |
# the PTY | |
def read(self, size: int = 1) -> bytes: | |
data = super().read(size) | |
#print(f"READ {data}") | |
self.pty_out.write(data) | |
return data | |
def open_port(port: str, baud: int, wait_for_port: bool=True) -> Port: | |
retry = 3 | |
if not os.path.exists(port): | |
if not wait_for_port: | |
print(f"Port '{port}' doesn't exit and --no-wait specified, exiting") | |
raise FileNotFoundError() | |
print(f"Waiting for port '{port}'...") | |
while not os.path.exists(port): | |
sleep(0.1) | |
error: Exception = None | |
for _ in range(retry): | |
try: | |
console = Port(port, baud) | |
print(f"Opened port '{port}'") | |
print(f"Port mirror is {console.pty_name}") | |
return console | |
except PermissionError as e: | |
error = e | |
sleep(0.05) # We might be racing with udev to set correct permissions | |
print(f"Failed to open port after {retry} tries...") | |
print(error) | |
def record_abort(console: Port, firstline: str) -> [str]: | |
print(f"{firstline}", end="") | |
abort = [firstline] | |
console.timeout = None # Blocking read | |
state = 0 | |
while True: | |
line = console.readline().decode() | |
abort.append(line) | |
if state == 1: | |
if line.startswith("Backtrace:"): | |
pass | |
elif line.strip().startswith("<0x"): | |
pass | |
elif len(line.strip()) == 0: | |
pass | |
else: | |
console.timeout = 0 | |
return abort | |
if line.startswith("Code: "): | |
state = 1 | |
# console.timeout = 0 # Non-blocking again | |
# return abort | |
print(f"{line}", end="") | |
if len(abort) > 24 + 10 * state: | |
print("!!! Code line not found after 24 lines, giving up") | |
console.timeout = 0 | |
return abort | |
def detect_abort(console: Port) -> [str]: | |
line = "" | |
while True: | |
(fds, _, _) = select.select([console.fd], [], [], 0.05) | |
for fd in fds: | |
match fd: | |
case console.fd: | |
data = console.read_until() | |
try: | |
line += data.decode() | |
except UnicodeDecodeError: | |
line += repr(data) | |
while True: | |
try: | |
c = console.q.get_nowait() | |
console.write(c) | |
except Empty: | |
break | |
if len(line) == 0 or line[-1] != '\n': | |
continue | |
# If we see a new line, check if the last line was an abort | |
if "\"Synchronous Abort\" handler" not in line: | |
line = "" | |
continue | |
print("\n\n") | |
# Strip the start of the line | |
line = line[line.index("\"Synchronous Abort\" handler"):] | |
return record_abort(console, line) | |
# Parse the u-boot.map file and find address, offset, file, and function | |
# that the LR register address corresponds to (ie, where u-boot crashed) | |
def find_func_file(addr: int, outdir: str) -> tuple[int, int, str, str]: | |
syms = open(os.path.join(outdir, "u-boot.sym")).readlines() | |
syms.sort() | |
with open(os.path.join(outdir, "u-boot.sym.sorted"), "w") as f: | |
f.write("".join(syms)) | |
func = None | |
last_func = None | |
fn_addr = 0 | |
last_addr = 0 | |
file = None | |
match_sym = re.compile(r"^([a-fA-F0-9]+).* ([a-zA-Z0-9_.-]+)$") | |
while len(syms) > 0: | |
line = syms.pop(0) | |
m = match_sym.search(line) | |
#print(f"{line} -- {m}") | |
if not m: | |
continue | |
fn_addr = int(m.group(1), 16) | |
if fn_addr == 0: | |
continue | |
last_func = func | |
func = m.group(2) | |
#print(m.groups()) | |
if fn_addr > addr: | |
break | |
last_addr = fn_addr | |
if len(syms) == 0: | |
print(f"Couldn't find function for address {addr:#x}") | |
return (0, 0, "", "") | |
# now find the file... | |
lines = open(os.path.join(outdir, "u-boot.map")).readlines() | |
while "Discarded input sections" not in lines.pop(0): | |
continue | |
is_next = False | |
for line in lines: | |
if is_next: | |
file = line.strip().split()[-1] | |
break | |
if last_func in line: | |
is_next = True | |
return (last_addr, addr - last_addr, file, last_func) | |
# Parse the register dump, find the nearest function symbol and decode the object | |
# file it belongs to, then find the line number in the file of the instruction that | |
# caused the abort and print it. Or just print the file path | |
def decode_abort(abort: [str], outdir: str): | |
cross_compile = "" | |
if "CROSS_COMPILE" in os.environ: | |
cross_compile = os.environ["CROSS_COMPILE"] | |
code = os.popen(f"bash -c 'echo \"{abort[-1]}\" | scripts/decodecode'").readlines() | |
for line in code: | |
print(f"\t{line}", end="") | |
print() | |
elr = int(abort[1].split(":")[1].strip().split(" ")[0].strip(), 16) | |
#print(f"Searching for function @ [{elr:#x}]") | |
(addr, offset, file, func) = find_func_file(elr, outdir) | |
if addr == 0: | |
return | |
print(f"Crashed in {func}() +{offset:#x} (symbol addr {addr:#x})") | |
decomp_path = os.path.join("/tmp/", f"{os.path.basename(file)[:-2]}.S") | |
os.popen(f"bash -c '{cross_compile}objdump -Sdl {os.path.join(outdir, file)} > {decomp_path}'").readlines() | |
decomp = open(decomp_path).readlines() | |
m_func = None | |
path = None | |
match_path = re.compile(r"^([/a-zA-Z0-9_.-]+:\d+)") | |
match_func = re.compile(r"^[a-fA-F0-9]+ <([a-zA-Z0-9_]+)>:$") | |
match_off = re.compile(r"^\s*([a-fA-F0-9]+):") | |
for (i, line) in enumerate(decomp): | |
# Keep track of the last source path in the decomp | |
m = match_path.search(line) | |
if m: | |
path = m.group(1) | |
continue | |
m = match_func.search(line) | |
if not m and not m_func: | |
continue | |
elif m: | |
m_func = m.group(1) | |
if m_func != func: | |
continue | |
# if ":" not in line: | |
# continue | |
m = match_off.search(line) | |
if not m: | |
continue | |
off = int(m.group(1), 16) | |
if off == offset: | |
print() | |
if path: | |
print(f"U-Boot crashed in {Path(path).resolve()}") | |
print(f"Disassembly: {decomp_path}:{i+1}") | |
return | |
print(f"Couldn't automatically determine crashing line in {decomp_path}") | |
if __name__ == '__main__': | |
ap = ArgumentParser(description='Synchronous abort decoder') | |
ap.add_argument('--outdir', '-o', help='U-Boot output directory', default='.') | |
ap_action = ap.add_subparsers(dest='action', required=True) | |
ap_decode = ap_action.add_parser('decode', help='Decode a single abort from a file') | |
ap_decode.add_argument('decode', help='File to decode') | |
ap_port = ap_action.add_parser('port', help='Serial port to watch on') | |
ap_port.add_argument('--baudrate', '-b', help='Serial port baud rate', type=int, | |
default=115200) | |
ap_port.add_argument('--no-wait', help="Don't wait for serial port device", | |
action='store_false', default=False) | |
ap_port.add_argument('port', help='Serial port device') | |
args = ap.parse_args() | |
if args.action == 'decode': | |
abort = open(args.decode).readlines() | |
decode_abort(abort, args.outdir) | |
exit(0) | |
console = None | |
while True: | |
if console is None: | |
console = open_port(args.port, args.baudrate, not args.no_wait) | |
abort = "" | |
try: | |
abort = detect_abort(console) | |
decode_abort(abort, args.outdir) | |
except serial.serialutil.SerialException as e: | |
print(f"Serial exception {e}") | |
if not args.no_wait: | |
console = None | |
continue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment