Created
January 2, 2021 22:06
-
-
Save pirogoeth/aa25e72957ebe55b3c1aafbf1ba961fa to your computer and use it in GitHub Desktop.
apt metric collector for prometheus exposition
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# vim: set ai et ts=4 sts=4 sw=4: | |
import apt | |
import argparse | |
import os | |
import pathlib | |
import shutil | |
import sys | |
import tempfile | |
from typing import List | |
def packages_to_prometheus_exp(packages: List[apt.Package]) -> List[str]: | |
upgradable = len(list(filter( | |
lambda pkg: pkg.marked_upgrade, | |
packages, | |
))) | |
essential_upgrades = len(list(filter( | |
lambda pkg: pkg.essential and pkg.marked_upgrade, | |
packages, | |
))) | |
pending_install = len(list(filter( | |
lambda pkg: pkg.marked_install, | |
packages, | |
))) | |
pending_uninstall = len(list(filter( | |
lambda pkg: pkg.marked_delete, | |
packages, | |
))) | |
pending_reinstall = len(list(filter( | |
lambda pkg: pkg.marked_reinstall, | |
packages, | |
))) | |
pending_downgrade = len(list(filter( | |
lambda pkg: pkg.marked_downgrade, | |
packages, | |
))) | |
keep = len(list(filter( | |
lambda pkg: pkg.marked_keep, | |
packages, | |
))) | |
return [ | |
"# TYPE apt_upgradable_total gauge", | |
"# HELP apt_upgradable_total Total number of upgrades available for the host", | |
"apt_upgradable_total {upgradable}".format(upgradable=upgradable), | |
"# TYPE apt_essential_upgradable_total gauge", | |
"# HELP apt_essential_upgradable_total Number of upgrades that are for essential packages", | |
"apt_essential_upgradable_total {essential_upgrades}".format(essential_upgrades=essential_upgrades), | |
"# TYPE apt_pending_installs_total gauge", | |
"# HELP apt_pending_installs_total Number of packages that will be installed as a part of the upgrade", | |
"apt_pending_installs_total {pending_install}".format(pending_install=pending_install), | |
"# TYPE apt_pending_uninstalls_total gauge", | |
"# HELP apt_pending_uninstalls_total Number of packages that will be uninstalled as a part of the upgrade", | |
"apt_pending_uninstalls_total {pending_uninstall}".format(pending_uninstall=pending_uninstall), | |
"# TYPE apt_pending_reinstalls_total gauge", | |
"# HELP apt_pending_reinstalls_total Number of packages that will be reinstalled as a part of the upgrade", | |
"apt_pending_reinstalls_total {pending_reinstall}".format(pending_reinstall=pending_reinstall), | |
"# TYPE apt_pending_downgrades_total gauge", | |
"# HELP apt_pending_downgrades_total Number of packages that will be downgraded as a part of the upgrade", | |
"apt_pending_downgrades_total {pending_downgrade}".format(pending_downgrade=pending_downgrade), | |
"# TYPE apt_keeps_total gauge", | |
"# HELP apt_keeps_total Number of packages to be held back despite having an upgraded version available", | |
"apt_keeps_total {keep}".format(keep=keep), | |
] | |
def prepare_cache(dist_upgrade: bool) -> apt.Cache: | |
cache = apt.Cache() | |
cache.update() | |
cache.open(None) | |
cache.upgrade(dist_upgrade=dist_upgrade) | |
return cache | |
def main(args: argparse.Namespace): | |
cache = prepare_cache(args.dist_upgrade) | |
metrics = packages_to_prometheus_exp(cache.get_changes()) | |
handle, fname = tempfile.mkstemp(prefix="apt_collector", text=True) | |
os.write(handle, "\n".join(metrics).encode("utf-8")) | |
destfile = str(args.destdir / "apt.prom") | |
shutil.copyfile(fname, destfile) | |
os.remove(fname) | |
sys.exit(0) | |
def configure_args() -> argparse.ArgumentParser: | |
parser = argparse.ArgumentParser(description="Generate Prometheus metrics from the APT cache") | |
parser.add_argument( | |
"--collector.textfile.directory", | |
dest="destdir", | |
required=True, | |
type=pathlib.Path, | |
action="store", | |
help="Set the path where node_exporter expects textfiles to be placed", | |
) | |
parser.add_argument( | |
"--apt.dist-upgrade", | |
dest="dist_upgrade", | |
type=bool, | |
default=False, | |
action="store", | |
help="Set whether the metrics should be calculated from upgrade or dist-upgrade", | |
) | |
return parser | |
if __name__ == '__main__': | |
parser = configure_args() | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment