Skip to content

Instantly share code, notes, and snippets.

@coroa
Forked from slotrans/dotfilter.py
Last active November 24, 2024 19:29
Show Gist options
  • Save coroa/405c9cf032d419b4a067489db665f829 to your computer and use it in GitHub Desktop.
Save coroa/405c9cf032d419b4a067489db665f829 to your computer and use it in GitHub Desktop.
Simple filter for the DOT-language (Graphviz) graph file describing a DAG.
#!python
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "networkx",
# "pydot",
# ]
# ///
import sys
from shutil import copyfileobj
import networkx as nx
HELP_TEXT = """
usage: {name} [-h] [pattern ...]
Tool for filtering Graphviz/DOT directed graphs. Pass the source graph on STDIN, the filtered graph
will be sent to STDOUT.
positional arguments:
pattern One or more patterns for selecting nodes starting from the empty set
"foo" adds anything that matches foo
"foo.." also adds downstream of foo
"..bar" adds upstream of bar
"foo..bar" adds everything between foo and bar
".." adds everything
individual patterns can also be prefixed by "+", "-" or "&"
to change how they modify the previous selection.
options:
-h, --help show this help message and exit
""".strip()
def extract_modifier(pattern, modifiers, default):
id = modifiers.get(pattern[0])
return (pattern, default) if id is None else (pattern[1:], id)
def discard_stdin_until(matches):
while True:
buf = sys.stdin.buffer.peek(1024)
if buf == b"":
# closed stream
return
pos = min(i if (i := buf.find(m)) >= 0 else len(buf) for m in matches)
if pos > 0:
# discard until match or until end of what we saw already
sys.stdin.buffer.read(pos)
if pos != len(buf):
return
def help():
print(HELP_TEXT.format(name=sys.argv[0]))
exit(0)
def main(args=sys.argv[1:]):
if any(arg in ("--help", "-h") for arg in args):
help()
discard_stdin_until([b"digraph ", b"graph "])
if not args:
# fast lane
copyfileobj(sys.stdin, sys.stdout)
exit(0)
SELECTORS = {"<": nx.ancestors, ">": nx.descendants, "=": lambda g, n: set()}
def select(graph, pattern, selector=None):
if selector is None:
pattern, selector = extract_modifier(pattern, SELECTORS, SELECTORS["="])
return set.union(
set(),
*(
{n} | selector(graph, n)
for n, attrs in graph.nodes.items()
if pattern in attrs["label"]
),
)
graph = nx.nx_pydot.read_dot(sys.stdin)
selected = set()
for pattern in args:
pattern, operation = extract_modifier(
pattern,
{
"+": set.update,
"-": set.difference_update,
"&": set.intersection_update,
},
set.update,
)
if pattern == "":
sel = graph.nodes.keys()
elif ".." in pattern:
pat1, pat2 = pattern.split("..", 1)
if pat1 and pat2:
sel = select(graph, pat1, SELECTORS[">"]) & select(
graph, pat2, SELECTORS["<"]
)
elif pat1:
sel = select(graph, pat1, SELECTORS[">"])
elif pat2:
sel = select(graph, pat2, SELECTORS["<"])
else:
sel = graph.nodes.keys()
else:
sel = select(graph, pattern)
operation(selected, sel)
nx.nx_pydot.write_dot(graph.subgraph(selected), sys.stdout)
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment