Skip to content

Instantly share code, notes, and snippets.

@julian-klode
Created July 23, 2024 07:16
Show Gist options
  • Save julian-klode/95818246eaef0ac6a54588f7f368e25c to your computer and use it in GitHub Desktop.
Save julian-klode/95818246eaef0ac6a54588f7f368e25c to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
#
# Copyright (C) 2024 Canonical Ltd
#
# Authors:
# Julian Andres Klode<[email protected]>
#
# SPDX-License-Identifier: GPL-3.0
import argparse
import asyncio
import json
import os
import shutil
import subprocess
from elftools.elf.elffile import ELFFile
class DistroAnalyser:
"""Analyses an entire distribution using containers.
This creates a container image, analyse-<distro> and then gets the list
of packages for the components and creates a bunch of workers (OCI containers)
that will run the FileAnalyser below.
Each message from the FileAnalyser is mapped into a dict, so if we analyse 'packagename'
with the messages:
{"error": object1}
{"success": object2}
{"summary": summaryObject}
we get an object:
{packagename: {
"errors": [object1],
"successes": [object1],
"summary": summaryObject,
}
If we analyse multiple packages, there will be multiple keys in the root object.
"""
def __init__(self, args: argparse.Namespace) -> None:
self.args = args
try:
with open(self.args.output) as state:
self.state = json.load(state)
except FileNotFoundError:
self.state = {}
self.image = f"analyse-{args.distro}"
for oci_runner in ["podman", "docker"]:
if shutil.which(oci_runner):
self.oci_runner = oci_runner
break
else:
raise ValueError("Need a runner for OCI images: podman or docker")
async def ensure_image(self) -> None:
images_proc = await asyncio.create_subprocess_exec(
self.oci_runner,
"images",
self.image,
"--format=json",
stdout=subprocess.PIPE,
)
stdout, _ = await images_proc.communicate()
if json.loads(stdout):
print(f"Reusing existing image {self.image}")
return
print(f"Building image {self.image}")
proc = await asyncio.create_subprocess_exec(
self.oci_runner, "build", "-t", self.image, "-", stdin=subprocess.PIPE
)
# Noble has incomplete ddebs, so we append oracular to it.
aux_suite = "oracular" if self.args.distro == "noble" else ""
aux_pin = (
f"printf >> /etc/apt/preferences.d/{aux_suite} '"
f"Package: *\\n"
f"Pin: release {aux_suite}\\n"
f"Pin-Priority: 1\\n' &&"
if aux_suite
else ""
)
await proc.communicate(
(
f"FROM ubuntu:{self.args.distro}\n"
f"ENV DEBIAN_FRONTEND=noninteractive\n"
f"ENV http_proxy=http://host.containers.internal:8000\n"
f"RUN echo force-unsafe-io >> /etc/dpkg/dpkg.cfg &&"
f" rm /etc/apt/apt.conf.d/01autoremove &&"
f" apt update &&"
f" apt install dctrl-tools eatmydata moreutils ubuntu-dbgsym-keyring python3-pyelftools -qq -y --no-install-recommends parallel adduser &&"
f"printf >> /etc/apt/sources.list.d/ubuntu.sources '"
f"Types: deb\\n"
f"URIs: http://ddebs.ubuntu.com/ubuntu/\\n"
f"Suites: noble noble-updates {aux_suite}\\n"
f"Components: main universe multiverse restricted\\n"
f"Signed-By: /usr/share/keyrings/ubuntu-dbgsym-keyring.gpg\\n' &&"
f" apt update &&"
f"{aux_pin}"
f"true" # lazy ending
).encode("utf-8")
)
async def get_pkgnames(self) -> list[str]:
if self.args.packages:
return self.args.packages # type: ignore
print("Getting names of packages to analyse")
proc = await asyncio.create_subprocess_exec(
"podman",
"run",
"--rm",
self.image,
"sh",
"-c",
f"for comp in {' '.join(self.args.components)}; do /usr/lib/apt/apt-helper cat-file /var/lib/apt/lists/*_${{comp}}_*amd64*Packages*; done | awk '/^Package:/ {{print $2}}' | grep -v -- -dbgsym$ | grep -v ^linux- | sort -u",
stdout=subprocess.PIPE,
)
stdout, _ = await proc.communicate()
return stdout.decode("utf-8").splitlines()
async def worker(self, queue: asyncio.Queue[str]) -> None:
while True:
package = await queue.get()
if package in self.state:
queue.task_done()
continue
print(f"{package}:", "analysing")
proc = await asyncio.create_subprocess_exec(
self.oci_runner,
"run",
"--rm",
"--name",
"analyse" + "-" + package.replace("+", "_plus_"),
"--replace",
"-v",
f"{os.path.realpath(__file__)}:/analyse.py",
self.image,
"python3",
"/analyse.py",
"package",
package,
stdout=subprocess.PIPE,
)
stdout, _ = await proc.communicate()
state: dict[str, list[dict[str, str]]] = {
"errors": [],
"successes": [],
}
for line in stdout.splitlines():
obj = json.loads(line)
if "status" in obj:
print(f"{package}:", obj["status"]["message"])
if "error" in obj:
state["errors"].append(obj["error"])
if "success" in obj:
state["successes"].append(obj["success"])
if "summary" in obj:
state["summary"] = obj["summary"]
if "summary" not in state:
print(f"Failed to analyse {package}:")
print(json.dumps(state, indent=4))
else:
self.state[package] = state
with open(self.args.output + ".new", "w") as out:
json.dump(self.state, out, indent=2)
os.rename(self.args.output + ".new", self.args.output)
queue.task_done()
async def run(self) -> None:
await self.ensure_image()
names = await self.get_pkgnames()
queue = asyncio.Queue[str]()
for name in names:
queue.put_nowait(name)
async with asyncio.TaskGroup() as tg:
tasks = []
for i in range(self.args.jobs):
tasks.append(tg.create_task(self.worker(queue)))
await queue.join()
for task in tasks:
task.cancel()
class FileAnalyser:
"""Analyse a file.
This is run by the DistroAnalyser inside a container. It may install additional packages if run as root.
It logs single-line JSON objects of the form
{"error": {"package": <name or null>, "filename": <name or null>, "category": <string>, "note": <string>"}}
{"status": {"package": <name or null>, "filename": <name or null>, "message": <string>"}}
{"success": {"package": <name or null>, "filename": <name or null>, "message": <string>"}}
"""
def __init__(self, package: str | None = None):
self.package = package
def get_producers(self, filename: str, elffile: ELFFile) -> list[dict[str, str]]:
"""Get all producers of the given ELF binary."""
dwarfinfo = elffile.get_dwarf_info(follow_links=True) # type: ignore
producers = []
for CU in dwarfinfo.iter_CUs():
die = CU.get_top_DIE()
if "DW_AT_producer" not in die.attributes:
continue
producers.append(
{
"unit": die.attributes["DW_AT_name"].value.decode("utf-8"),
"producer": die.attributes["DW_AT_producer"].value.decode("utf-8"),
}
)
return producers
def get_package(self, filename: str | None) -> str | None:
"""Get the name of the package of the file.
If a package has been set in the object, it will be returned.
"""
if self.package:
return self.package
if not filename:
return None
try:
return (
subprocess.check_output(f"dpkg -S {filename} | cut -f1 -d:", shell=True)
.splitlines()[0]
.decode("utf-8")
)
except Exception:
return None
def install_dbgsym(self, filename: str, build_id: str) -> None:
"""Install the debug symbols for the given build id."""
packages = (
subprocess.check_output(
f"/usr/lib/apt/apt-helper cat-file /var/lib/apt/lists/*Packages* | grep-dctrl -nsPackage -FBuild-IDs {build_id} | xargs",
shell=True,
)
.decode("utf-8")
.strip()
)
if not packages:
self.status(filename, f"no debugging symbols for {build_id}")
return
if os.getuid() == 0:
self.status(filename, f"Installing {packages}")
subprocess.check_call(
f"DEBIAN_FRONTEND=noninteractive eatmydata chronic apt install -y -qq {packages}",
shell=True,
)
else:
self.status(filename, f"Would have installed {packages}")
def status(self, filename: str | None, message: str) -> None:
"""Print a status message."""
print(
json.dumps(
{
"status": {
"package": self.get_package(filename),
"filename": filename,
"message": message,
}
}
)
)
def error(self, filename: str | None, category: str, note: str) -> None:
"""Print an error message."""
print(
json.dumps(
{
"error": {
"package": self.get_package(filename),
"filename": filename,
"category": category,
"note": note,
}
}
)
)
def success(self, filename: str, units: list[dict[str, str]]) -> None:
"""Print a success message."""
print(
json.dumps(
{
"success": {
"package": self.get_package(filename),
"filename": filename,
"units": units,
}
}
)
)
def process_file(self, filename: str) -> bool | None:
"""Process a single file.
Returns true if the file could be analysed, false is if it could not
be analysed, and None if the file was not an ELF binary.
"""
try:
elffile = ELFFile.load_from_path(filename) # type:ignore
except Exception:
return None
missing_dbgsym = []
producers: list[dict[str, str]] = []
for sect in elffile.iter_sections():
if sect.name != ".note.gnu.build-id":
continue
for note in sect.iter_notes():
build_id = note["n_desc"]
debug_file = (
f"/usr/lib/debug/.build-id/{build_id[:2]}/{build_id[2:]}.debug"
)
if not os.path.exists(debug_file):
self.install_dbgsym(filename, build_id)
if not os.path.exists(debug_file):
missing_dbgsym.append(debug_file)
continue
if debug_file != filename:
debug_elf = ELFFile.load_from_path(debug_file) # type: ignore
producers += self.get_producers(filename, debug_elf)
try:
producers += self.get_producers(filename, elffile)
except FileNotFoundError as e:
missing_dbgsym.append(e.filename.decode("utf-8"))
legacy_debug_filename = "/usr/lib/debug" + os.path.realpath(filename)
if os.path.exists(legacy_debug_filename):
legacy_debug_elf = ELFFile.load_from_path(legacy_debug_filename) # type: ignore
producers += self.get_producers(filename, legacy_debug_elf)
else:
missing_dbgsym.append(legacy_debug_filename)
if not producers:
self.error(
filename, "missing-dbgsym", ",".join(sorted(set(missing_dbgsym)))
)
return False
else:
self.success(filename, producers)
return True
def process(self) -> None:
"""Process the package the object was initialized with.
This iterates over all real files in the package, and analyses them. At the end
it logs a summary object.
"""
if not self.package:
raise ValueError("Need to initialize FileAnalyser with a package name")
repkg = (
self.package.replace("+", "\\+")
.replace("t64", "(t64)?")
.replace("-dev", "(-dev)?")
)
if os.getuid() == 0:
self.status(None, f"Installing {self.package} and {repkg}-dbg")
status, out = subprocess.getstatusoutput(
f"DEBIAN_FRONTEND=noninteractive eatmydata apt install --auto-remove -y -qq {self.package} '?name(\"^{repkg}-dbg$\")?version(.)' --no-install-recommends 2>&1",
)
if status:
self.error(None, "install-failed", out)
print(
json.dumps(
{
"summary": {
"package": self.package,
"analysed": 0,
"failed": 0,
"skipped": 0,
}
}
)
)
return
else:
self.status(None, f"Would have installed {self.package} and {repkg}-dbg")
files = (
subprocess.check_output(
f"dpkg -L {self.package} | grep -v '^/usr/lib/debug/'", shell=True
)
.decode("utf-8")
.splitlines()
)
success = not_elf = failed = 0
for file in files:
if os.path.isfile(file) and not os.path.islink(file):
res = self.process_file(file)
if res:
success += 1
elif res is None:
not_elf += 1
else:
failed += 1
print(
json.dumps(
{
"summary": {
"package": self.package,
"analysed": success,
"failed": failed,
"skipped": not_elf,
}
}
)
)
def main() -> None:
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(title="subcommands", dest="command")
distro = subparsers.add_parser(
"distro", help="Analyse an entire distribution suite"
)
distro.add_argument("distro", help="Distribution to analyse")
distro.add_argument("-j", "--jobs", type=int, help="Where to store output.")
distro.add_argument("-o", "--output", help="Where to store output.")
distro.add_argument(
"-p", "--packages", help="Packages to limit analysis too", action="append"
)
distro.add_argument(
"-c", "--components", help="Components to analyse", action="append"
)
package = subparsers.add_parser("package", help="Analyse an entire package")
package.add_argument("package", help="Package to analyse")
file = subparsers.add_parser("file", help="Analyse an entire file")
file.add_argument("file", help="File to analyse")
args = parser.parse_args()
report = subparsers.add_parser("report", help="Write a human-readable report on a given distro state")
report.add_argument("file", help="File to analyse")
if args.command == "distro":
analyser = DistroAnalyser(args)
asyncio.run(analyser.run())
elif args.command == "package":
FileAnalyser(args.package).process()
elif args.command == "file":
FileAnalyser().process_file(args.file)
elif args.command == "report":
DistroReporter().process_file(args.file)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment