Created
July 23, 2024 07:16
-
-
Save julian-klode/95818246eaef0ac6a54588f7f368e25c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
# Copyright (C) 2024 Canonical Ltd | |
# | |
# Authors: | |
# Julian Andres Klode<[email protected]> | |
# | |
# SPDX-License-Identifier: GPL-3.0 | |
import argparse | |
import asyncio | |
import json | |
import os | |
import shutil | |
import subprocess | |
from elftools.elf.elffile import ELFFile | |
class DistroAnalyser: | |
"""Analyses an entire distribution using containers. | |
This creates a container image, analyse-<distro> and then gets the list | |
of packages for the components and creates a bunch of workers (OCI containers) | |
that will run the FileAnalyser below. | |
Each message from the FileAnalyser is mapped into a dict, so if we analyse 'packagename' | |
with the messages: | |
{"error": object1} | |
{"success": object2} | |
{"summary": summaryObject} | |
we get an object: | |
{packagename: { | |
"errors": [object1], | |
"successes": [object1], | |
"summary": summaryObject, | |
} | |
If we analyse multiple packages, there will be multiple keys in the root object. | |
""" | |
def __init__(self, args: argparse.Namespace) -> None: | |
self.args = args | |
try: | |
with open(self.args.output) as state: | |
self.state = json.load(state) | |
except FileNotFoundError: | |
self.state = {} | |
self.image = f"analyse-{args.distro}" | |
for oci_runner in ["podman", "docker"]: | |
if shutil.which(oci_runner): | |
self.oci_runner = oci_runner | |
break | |
else: | |
raise ValueError("Need a runner for OCI images: podman or docker") | |
async def ensure_image(self) -> None: | |
images_proc = await asyncio.create_subprocess_exec( | |
self.oci_runner, | |
"images", | |
self.image, | |
"--format=json", | |
stdout=subprocess.PIPE, | |
) | |
stdout, _ = await images_proc.communicate() | |
if json.loads(stdout): | |
print(f"Reusing existing image {self.image}") | |
return | |
print(f"Building image {self.image}") | |
proc = await asyncio.create_subprocess_exec( | |
self.oci_runner, "build", "-t", self.image, "-", stdin=subprocess.PIPE | |
) | |
# Noble has incomplete ddebs, so we append oracular to it. | |
aux_suite = "oracular" if self.args.distro == "noble" else "" | |
aux_pin = ( | |
f"printf >> /etc/apt/preferences.d/{aux_suite} '" | |
f"Package: *\\n" | |
f"Pin: release {aux_suite}\\n" | |
f"Pin-Priority: 1\\n' &&" | |
if aux_suite | |
else "" | |
) | |
await proc.communicate( | |
( | |
f"FROM ubuntu:{self.args.distro}\n" | |
f"ENV DEBIAN_FRONTEND=noninteractive\n" | |
f"ENV http_proxy=http://host.containers.internal:8000\n" | |
f"RUN echo force-unsafe-io >> /etc/dpkg/dpkg.cfg &&" | |
f" rm /etc/apt/apt.conf.d/01autoremove &&" | |
f" apt update &&" | |
f" apt install dctrl-tools eatmydata moreutils ubuntu-dbgsym-keyring python3-pyelftools -qq -y --no-install-recommends parallel adduser &&" | |
f"printf >> /etc/apt/sources.list.d/ubuntu.sources '" | |
f"Types: deb\\n" | |
f"URIs: http://ddebs.ubuntu.com/ubuntu/\\n" | |
f"Suites: noble noble-updates {aux_suite}\\n" | |
f"Components: main universe multiverse restricted\\n" | |
f"Signed-By: /usr/share/keyrings/ubuntu-dbgsym-keyring.gpg\\n' &&" | |
f" apt update &&" | |
f"{aux_pin}" | |
f"true" # lazy ending | |
).encode("utf-8") | |
) | |
async def get_pkgnames(self) -> list[str]: | |
if self.args.packages: | |
return self.args.packages # type: ignore | |
print("Getting names of packages to analyse") | |
proc = await asyncio.create_subprocess_exec( | |
"podman", | |
"run", | |
"--rm", | |
self.image, | |
"sh", | |
"-c", | |
f"for comp in {' '.join(self.args.components)}; do /usr/lib/apt/apt-helper cat-file /var/lib/apt/lists/*_${{comp}}_*amd64*Packages*; done | awk '/^Package:/ {{print $2}}' | grep -v -- -dbgsym$ | grep -v ^linux- | sort -u", | |
stdout=subprocess.PIPE, | |
) | |
stdout, _ = await proc.communicate() | |
return stdout.decode("utf-8").splitlines() | |
async def worker(self, queue: asyncio.Queue[str]) -> None: | |
while True: | |
package = await queue.get() | |
if package in self.state: | |
queue.task_done() | |
continue | |
print(f"{package}:", "analysing") | |
proc = await asyncio.create_subprocess_exec( | |
self.oci_runner, | |
"run", | |
"--rm", | |
"--name", | |
"analyse" + "-" + package.replace("+", "_plus_"), | |
"--replace", | |
"-v", | |
f"{os.path.realpath(__file__)}:/analyse.py", | |
self.image, | |
"python3", | |
"/analyse.py", | |
"package", | |
package, | |
stdout=subprocess.PIPE, | |
) | |
stdout, _ = await proc.communicate() | |
state: dict[str, list[dict[str, str]]] = { | |
"errors": [], | |
"successes": [], | |
} | |
for line in stdout.splitlines(): | |
obj = json.loads(line) | |
if "status" in obj: | |
print(f"{package}:", obj["status"]["message"]) | |
if "error" in obj: | |
state["errors"].append(obj["error"]) | |
if "success" in obj: | |
state["successes"].append(obj["success"]) | |
if "summary" in obj: | |
state["summary"] = obj["summary"] | |
if "summary" not in state: | |
print(f"Failed to analyse {package}:") | |
print(json.dumps(state, indent=4)) | |
else: | |
self.state[package] = state | |
with open(self.args.output + ".new", "w") as out: | |
json.dump(self.state, out, indent=2) | |
os.rename(self.args.output + ".new", self.args.output) | |
queue.task_done() | |
async def run(self) -> None: | |
await self.ensure_image() | |
names = await self.get_pkgnames() | |
queue = asyncio.Queue[str]() | |
for name in names: | |
queue.put_nowait(name) | |
async with asyncio.TaskGroup() as tg: | |
tasks = [] | |
for i in range(self.args.jobs): | |
tasks.append(tg.create_task(self.worker(queue))) | |
await queue.join() | |
for task in tasks: | |
task.cancel() | |
class FileAnalyser: | |
"""Analyse a file. | |
This is run by the DistroAnalyser inside a container. It may install additional packages if run as root. | |
It logs single-line JSON objects of the form | |
{"error": {"package": <name or null>, "filename": <name or null>, "category": <string>, "note": <string>"}} | |
{"status": {"package": <name or null>, "filename": <name or null>, "message": <string>"}} | |
{"success": {"package": <name or null>, "filename": <name or null>, "message": <string>"}} | |
""" | |
def __init__(self, package: str | None = None): | |
self.package = package | |
def get_producers(self, filename: str, elffile: ELFFile) -> list[dict[str, str]]: | |
"""Get all producers of the given ELF binary.""" | |
dwarfinfo = elffile.get_dwarf_info(follow_links=True) # type: ignore | |
producers = [] | |
for CU in dwarfinfo.iter_CUs(): | |
die = CU.get_top_DIE() | |
if "DW_AT_producer" not in die.attributes: | |
continue | |
producers.append( | |
{ | |
"unit": die.attributes["DW_AT_name"].value.decode("utf-8"), | |
"producer": die.attributes["DW_AT_producer"].value.decode("utf-8"), | |
} | |
) | |
return producers | |
def get_package(self, filename: str | None) -> str | None: | |
"""Get the name of the package of the file. | |
If a package has been set in the object, it will be returned. | |
""" | |
if self.package: | |
return self.package | |
if not filename: | |
return None | |
try: | |
return ( | |
subprocess.check_output(f"dpkg -S {filename} | cut -f1 -d:", shell=True) | |
.splitlines()[0] | |
.decode("utf-8") | |
) | |
except Exception: | |
return None | |
def install_dbgsym(self, filename: str, build_id: str) -> None: | |
"""Install the debug symbols for the given build id.""" | |
packages = ( | |
subprocess.check_output( | |
f"/usr/lib/apt/apt-helper cat-file /var/lib/apt/lists/*Packages* | grep-dctrl -nsPackage -FBuild-IDs {build_id} | xargs", | |
shell=True, | |
) | |
.decode("utf-8") | |
.strip() | |
) | |
if not packages: | |
self.status(filename, f"no debugging symbols for {build_id}") | |
return | |
if os.getuid() == 0: | |
self.status(filename, f"Installing {packages}") | |
subprocess.check_call( | |
f"DEBIAN_FRONTEND=noninteractive eatmydata chronic apt install -y -qq {packages}", | |
shell=True, | |
) | |
else: | |
self.status(filename, f"Would have installed {packages}") | |
def status(self, filename: str | None, message: str) -> None: | |
"""Print a status message.""" | |
print( | |
json.dumps( | |
{ | |
"status": { | |
"package": self.get_package(filename), | |
"filename": filename, | |
"message": message, | |
} | |
} | |
) | |
) | |
def error(self, filename: str | None, category: str, note: str) -> None: | |
"""Print an error message.""" | |
print( | |
json.dumps( | |
{ | |
"error": { | |
"package": self.get_package(filename), | |
"filename": filename, | |
"category": category, | |
"note": note, | |
} | |
} | |
) | |
) | |
def success(self, filename: str, units: list[dict[str, str]]) -> None: | |
"""Print a success message.""" | |
print( | |
json.dumps( | |
{ | |
"success": { | |
"package": self.get_package(filename), | |
"filename": filename, | |
"units": units, | |
} | |
} | |
) | |
) | |
def process_file(self, filename: str) -> bool | None: | |
"""Process a single file. | |
Returns true if the file could be analysed, false is if it could not | |
be analysed, and None if the file was not an ELF binary. | |
""" | |
try: | |
elffile = ELFFile.load_from_path(filename) # type:ignore | |
except Exception: | |
return None | |
missing_dbgsym = [] | |
producers: list[dict[str, str]] = [] | |
for sect in elffile.iter_sections(): | |
if sect.name != ".note.gnu.build-id": | |
continue | |
for note in sect.iter_notes(): | |
build_id = note["n_desc"] | |
debug_file = ( | |
f"/usr/lib/debug/.build-id/{build_id[:2]}/{build_id[2:]}.debug" | |
) | |
if not os.path.exists(debug_file): | |
self.install_dbgsym(filename, build_id) | |
if not os.path.exists(debug_file): | |
missing_dbgsym.append(debug_file) | |
continue | |
if debug_file != filename: | |
debug_elf = ELFFile.load_from_path(debug_file) # type: ignore | |
producers += self.get_producers(filename, debug_elf) | |
try: | |
producers += self.get_producers(filename, elffile) | |
except FileNotFoundError as e: | |
missing_dbgsym.append(e.filename.decode("utf-8")) | |
legacy_debug_filename = "/usr/lib/debug" + os.path.realpath(filename) | |
if os.path.exists(legacy_debug_filename): | |
legacy_debug_elf = ELFFile.load_from_path(legacy_debug_filename) # type: ignore | |
producers += self.get_producers(filename, legacy_debug_elf) | |
else: | |
missing_dbgsym.append(legacy_debug_filename) | |
if not producers: | |
self.error( | |
filename, "missing-dbgsym", ",".join(sorted(set(missing_dbgsym))) | |
) | |
return False | |
else: | |
self.success(filename, producers) | |
return True | |
def process(self) -> None: | |
"""Process the package the object was initialized with. | |
This iterates over all real files in the package, and analyses them. At the end | |
it logs a summary object. | |
""" | |
if not self.package: | |
raise ValueError("Need to initialize FileAnalyser with a package name") | |
repkg = ( | |
self.package.replace("+", "\\+") | |
.replace("t64", "(t64)?") | |
.replace("-dev", "(-dev)?") | |
) | |
if os.getuid() == 0: | |
self.status(None, f"Installing {self.package} and {repkg}-dbg") | |
status, out = subprocess.getstatusoutput( | |
f"DEBIAN_FRONTEND=noninteractive eatmydata apt install --auto-remove -y -qq {self.package} '?name(\"^{repkg}-dbg$\")?version(.)' --no-install-recommends 2>&1", | |
) | |
if status: | |
self.error(None, "install-failed", out) | |
print( | |
json.dumps( | |
{ | |
"summary": { | |
"package": self.package, | |
"analysed": 0, | |
"failed": 0, | |
"skipped": 0, | |
} | |
} | |
) | |
) | |
return | |
else: | |
self.status(None, f"Would have installed {self.package} and {repkg}-dbg") | |
files = ( | |
subprocess.check_output( | |
f"dpkg -L {self.package} | grep -v '^/usr/lib/debug/'", shell=True | |
) | |
.decode("utf-8") | |
.splitlines() | |
) | |
success = not_elf = failed = 0 | |
for file in files: | |
if os.path.isfile(file) and not os.path.islink(file): | |
res = self.process_file(file) | |
if res: | |
success += 1 | |
elif res is None: | |
not_elf += 1 | |
else: | |
failed += 1 | |
print( | |
json.dumps( | |
{ | |
"summary": { | |
"package": self.package, | |
"analysed": success, | |
"failed": failed, | |
"skipped": not_elf, | |
} | |
} | |
) | |
) | |
def main() -> None: | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers(title="subcommands", dest="command") | |
distro = subparsers.add_parser( | |
"distro", help="Analyse an entire distribution suite" | |
) | |
distro.add_argument("distro", help="Distribution to analyse") | |
distro.add_argument("-j", "--jobs", type=int, help="Where to store output.") | |
distro.add_argument("-o", "--output", help="Where to store output.") | |
distro.add_argument( | |
"-p", "--packages", help="Packages to limit analysis too", action="append" | |
) | |
distro.add_argument( | |
"-c", "--components", help="Components to analyse", action="append" | |
) | |
package = subparsers.add_parser("package", help="Analyse an entire package") | |
package.add_argument("package", help="Package to analyse") | |
file = subparsers.add_parser("file", help="Analyse an entire file") | |
file.add_argument("file", help="File to analyse") | |
args = parser.parse_args() | |
report = subparsers.add_parser("report", help="Write a human-readable report on a given distro state") | |
report.add_argument("file", help="File to analyse") | |
if args.command == "distro": | |
analyser = DistroAnalyser(args) | |
asyncio.run(analyser.run()) | |
elif args.command == "package": | |
FileAnalyser(args.package).process() | |
elif args.command == "file": | |
FileAnalyser().process_file(args.file) | |
elif args.command == "report": | |
DistroReporter().process_file(args.file) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment