-
-
Save coroa/405c9cf032d419b4a067489db665f829 to your computer and use it in GitHub Desktop.
Simple filter for the DOT-language (Graphviz) graph file describing a DAG.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!python | |
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "networkx", | |
# "pydot", | |
# ] | |
# /// | |
import sys | |
from shutil import copyfileobj | |
import networkx as nx | |
HELP_TEXT = """ | |
usage: {name} [-h] [pattern ...] | |
Tool for filtering Graphviz/DOT directed graphs. Pass the source graph on STDIN, the filtered graph | |
will be sent to STDOUT. | |
positional arguments: | |
pattern One or more patterns for selecting nodes starting from the empty set | |
"foo" adds anything that matches foo | |
"foo.." also adds downstream of foo | |
"..bar" adds upstream of bar | |
"foo..bar" adds everything between foo and bar | |
".." adds everything | |
individual patterns can also be prefixed by "+", "-" or "&" | |
to change how they modify the previous selection. | |
options: | |
-h, --help show this help message and exit | |
""".strip() | |
def extract_modifier(pattern, modifiers, default): | |
id = modifiers.get(pattern[0]) | |
return (pattern, default) if id is None else (pattern[1:], id) | |
def discard_stdin_until(matches): | |
while True: | |
buf = sys.stdin.buffer.peek(1024) | |
if buf == b"": | |
# closed stream | |
return | |
pos = min(i if (i := buf.find(m)) >= 0 else len(buf) for m in matches) | |
if pos > 0: | |
# discard until match or until end of what we saw already | |
sys.stdin.buffer.read(pos) | |
if pos != len(buf): | |
return | |
def help(): | |
print(HELP_TEXT.format(name=sys.argv[0])) | |
exit(0) | |
def main(args=sys.argv[1:]): | |
if any(arg in ("--help", "-h") for arg in args): | |
help() | |
discard_stdin_until([b"digraph ", b"graph "]) | |
if not args: | |
# fast lane | |
copyfileobj(sys.stdin, sys.stdout) | |
exit(0) | |
SELECTORS = {"<": nx.ancestors, ">": nx.descendants, "=": lambda g, n: set()} | |
def select(graph, pattern, selector=None): | |
if selector is None: | |
pattern, selector = extract_modifier(pattern, SELECTORS, SELECTORS["="]) | |
return set.union( | |
set(), | |
*( | |
{n} | selector(graph, n) | |
for n, attrs in graph.nodes.items() | |
if pattern in attrs["label"] | |
), | |
) | |
graph = nx.nx_pydot.read_dot(sys.stdin) | |
selected = set() | |
for pattern in args: | |
pattern, operation = extract_modifier( | |
pattern, | |
{ | |
"+": set.update, | |
"-": set.difference_update, | |
"&": set.intersection_update, | |
}, | |
set.update, | |
) | |
if pattern == "": | |
sel = graph.nodes.keys() | |
elif ".." in pattern: | |
pat1, pat2 = pattern.split("..", 1) | |
if pat1 and pat2: | |
sel = select(graph, pat1, SELECTORS[">"]) & select( | |
graph, pat2, SELECTORS["<"] | |
) | |
elif pat1: | |
sel = select(graph, pat1, SELECTORS[">"]) | |
elif pat2: | |
sel = select(graph, pat2, SELECTORS["<"]) | |
else: | |
sel = graph.nodes.keys() | |
else: | |
sel = select(graph, pattern) | |
operation(selected, sel) | |
nx.nx_pydot.write_dot(graph.subgraph(selected), sys.stdout) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment