Created
June 30, 2022 14:07
-
-
Save plowsec/5e243d6e367030b2d9ed188158f9c00c to your computer and use it in GitHub Desktop.
Quick and dirty script to display systemd services as a directed graph
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import networkx as nx | |
import sys | |
import os | |
import logging | |
import configparser | |
import traceback | |
from typing import List | |
from collections import OrderedDict | |
logging.basicConfig(level=logging.DEBUG) | |
# run `find . -name '*.service' | xargs basename {} | sort -u | paste -d ' ' -s` | |
only_enabled = [] | |
class MultiOrderedDict(OrderedDict): | |
def __setitem__(self, key, value): | |
if isinstance(value, list) and key in self: | |
self[key].extend(value) | |
else: | |
super().__setitem__(key, value) | |
def get_files_with_extension(root_dir, extensions=[".service"]) -> List: | |
""" | |
:param root_dir: path to a folder | |
:param extensions: extensions to search for | |
:return: the list of files found | |
""" | |
logging.debug(f"Walking in {root_dir}") | |
tunits = [] | |
for (dirpath, dirs, files) in os.walk(root_dir): | |
for filename in files: | |
filename = os.path.join(dirpath, filename) | |
if os.path.splitext(filename)[1] in extensions: | |
tunits += [filename] | |
for folder in dirs: | |
tunits += get_files_with_extension(folder, extensions) | |
return tunits | |
def parse_systemd_service(path): | |
service = configparser.ConfigParser(dict_type=MultiOrderedDict, strict=False, interpolation=None) | |
service.read(path) | |
deps = set() | |
requirements = set() | |
if "After" in service["Unit"].keys(): | |
dep = service["Unit"]["After"].split() | |
logging.debug(f"Service {os.path.basename(path)} depends on {dep}") | |
deps.update(dep) | |
if "Requires" in service["Unit"].keys(): | |
dep = service["Unit"]["Requires"].split() | |
logging.debug(f"Service {os.path.basename(path)} depends on {dep}") | |
deps.update(dep) | |
if "Before" in service["Unit"].keys(): | |
requirements.update(service["Unit"]["Before"].split()) | |
logging.debug(f"Service {requirements} depends on {os.path.basename(path)}") | |
if "Install" in service.keys() and "WantedBy" in service["Install"].keys(): | |
requirements.update(service["Install"]["WantedBy"].split()) | |
logging.debug(f"Service {requirements} depends on {os.path.basename(path)} ") | |
return deps, requirements | |
def main(argv): | |
# enumerate systemd files | |
systemd_files = get_files_with_extension(os.getcwd()) | |
logging.debug(systemd_files) | |
# init digraph | |
graph = nx.DiGraph() | |
roots = set() | |
# collect WantedBy and After directives | |
# deps = set() | |
for path in systemd_files: | |
svc_name = os.path.basename(path) | |
if "service" in svc_name and not svc_name in only_enabled: | |
logging.debug(f"Skipping {svc_name} since it is not enabled") | |
continue | |
deps, reqs = parse_systemd_service(path) | |
if len(reqs) > 0: | |
for req in reqs: | |
roots.add(req) | |
graph.add_edge(req, svc_name) | |
if len(deps) == 0: | |
# roots.add(svc_name) | |
pass | |
else: | |
for dep in deps: | |
graph.add_edge(dep, svc_name) | |
# build digraph | |
already_printed = [] | |
for root in roots: | |
spacer = {root: 0} | |
try: | |
edges = list(nx.dfs_edges(graph, root)) | |
logging.debug(root) | |
if len(list(edges)) == 0: | |
raise Exception() | |
print(root) | |
for prereq, target in edges: | |
spacer[target] = spacer[prereq] + 4 | |
if spacer[target] == 4: | |
if target in already_printed: | |
break | |
# pass | |
else: | |
already_printed.append(target) | |
print( | |
'{spacer}+-{t}'.format( | |
spacer=' ' * spacer[prereq], | |
t=target)) | |
print() | |
except: | |
logging.debug(f"No dependency on {root}") | |
print() | |
if __name__ == '__main__': | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment