Skip to content

Instantly share code, notes, and snippets.

@chriscarrollsmith
Last active June 8, 2026 03:07
Show Gist options
  • Select an option

  • Save chriscarrollsmith/b5891873aef4e6cca1f71124cf9de0fb to your computer and use it in GitHub Desktop.

Select an option

Save chriscarrollsmith/b5891873aef4e6cca1f71124cf9de0fb to your computer and use it in GitHub Desktop.
From open GitHub issues, auto-generate a workflow DAG that uses "blocking" relationships and subgraphs to prioritize work
#!/usr/bin/env python3
"""Build an interactive workflow graph from GitHub issues across one or more repos.
Fetches open issues, groups them by label hierarchy, draws blocking dependencies,
and renders either a Graphviz DOT file or a Cytoscape preset page for browsing.
**Why:** The graph helps teams coordinate—see what blocks what, pick work that
unblocks others, and focus on different areas of the codebase in parallel without
overlap.
**Grouping issues (slash labels):** Add labels like `api_audit/leaf_config` to nest
issues under sections of the codebase. The part before `/` is the top-level group;
the part after is a sub-group. Single-level labels (e.g. `major`) group without
nesting. Issues labeled only with `deliverables` or `deliverables/...` are omitted.
**Blocking relationships:** In GitHub, mark issue B as blocked by issue A (issue
dependencies). The script reads those links and draws `blocks` edges so blockers
stand out in the graph.
**Requirements:** Python 3.10+, Graphviz `dot` on PATH (for cytoscape output),
and a GitHub token (`GITHUB_TOKEN`, `gh auth token`, or `--github-token`). The
token needs repo read access and permission to read issue dependencies.
**Usage:**
python build_workflow.py dot --owner ORG --repos repo-a repo-b
python build_workflow.py dot --render-svg
python build_workflow.py cytoscape --docs-dir docs
Open `docs/index.html` (after `cytoscape`) to explore the graph; click issue
nodes to open them on GitHub.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Any
GITHUB_API_VERSION = "2026-03-10"
DEFAULT_REPOS = [
# Your repository names here
]
DEFAULT_OWNER = None # Your user/org name here
ISSUE_LABEL_WIDTH = 200
GRAPH_FONT_NAME = "Arial"
GRAPH_FONT_SIZE = 10
GRAPH_NODE_SEP = 0.5
GRAPH_RANK_SEP = 0.8
@dataclass(frozen=True)
class WorkflowIssue:
repo: str
number: int
title: str
pre_group_key: str | None
pre_group_label: str | None
post_group_key: str | None
post_group_label: str | None
@property
def node_id(self) -> str:
repo_key = self.repo.replace("-", "_")
return f"{repo_key}_{self.number}"
def _quote(value: str) -> str:
escaped = value.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped}"'
def _escape_html(value: str) -> str:
return (
value.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace('"', "&quot;")
)
def _normalize_repo_key(repo: str) -> str:
return repo.replace("-", "_")
def _request_with_retry(request: urllib.request.Request) -> Any:
backoff = 1.0
max_attempts = 5
for attempt in range(max_attempts):
try:
with urllib.request.urlopen(request, timeout=30) as response:
body = response.read()
return json.loads(body.decode()) if body else None
except urllib.error.HTTPError as error:
if error.code in {429, 502, 503, 504} and attempt < max_attempts - 1:
retry_after = error.headers.get("Retry-After")
wait_for = float(retry_after) if retry_after else backoff
time.sleep(wait_for)
backoff *= 2
continue
detail = error.read().decode(errors="replace")
raise RuntimeError(f"GitHub API request failed ({error.code}): {detail}") from error
raise RuntimeError("GitHub API retry loop exhausted")
def _github_request(
owner: str,
repo: str,
path: str,
token: str,
*,
query: dict[str, str] | None = None,
) -> Any:
url = f"https://api.github.com/repos/{owner}/{repo}{path}"
if query:
url = f"{url}?{urllib.parse.urlencode(query)}"
request = urllib.request.Request(
url=url,
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": GITHUB_API_VERSION,
},
)
return _request_with_retry(request)
def _resolve_token(token: str | None) -> str:
if token:
return token
env_token = os.environ.get("GITHUB_TOKEN")
if env_token:
return env_token
result = subprocess.run(
["gh", "auth", "token"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0 or not result.stdout.strip():
raise RuntimeError("No GitHub token provided. Set GITHUB_TOKEN or pass --github-token.")
return result.stdout.strip()
def fetch_open_issues(owner: str, repo: str, token: str) -> list[dict[str, Any]]:
page = 1
issues: list[dict[str, Any]] = []
while True:
rows = _github_request(
owner,
repo,
"/issues",
token,
query={
"state": "open",
"per_page": "100",
"page": str(page),
},
)
assert isinstance(rows, list)
if not rows:
break
for row in rows:
if "pull_request" in row:
continue
issues.append(row)
page += 1
return issues
def fetch_label_descriptions(owner: str, repo: str, token: str) -> dict[str, str]:
page = 1
descriptions: dict[str, str] = {}
while True:
rows = _github_request(
owner,
repo,
"/labels",
token,
query={"per_page": "100", "page": str(page)},
)
assert isinstance(rows, list)
if not rows:
break
for row in rows:
descriptions[row["name"]] = row.get("description") or row["name"]
page += 1
return descriptions
def _choose_group_label(label_names: list[str]) -> str | None:
slash_candidates = []
single_level_candidates = []
for name in label_names:
if name == "deliverables" or name.startswith("deliverables/"):
continue
if "/" in name:
parts = name.split("/", 1)
if len(parts) != 2 or not parts[0] or not parts[1]:
continue
slash_candidates.append(name)
continue
if name:
single_level_candidates.append(name)
if slash_candidates:
return sorted(slash_candidates)[0]
if single_level_candidates:
return sorted(single_level_candidates)[0]
return None
def _has_deliverables_group_label(label_names: list[str]) -> bool:
return any(name == "deliverables" or name.startswith("deliverables/") for name in label_names)
def build_workflow_issues(
issues_by_repo: dict[str, list[dict[str, Any]]],
label_descriptions_by_repo: dict[str, dict[str, str]],
) -> list[WorkflowIssue]:
workflow_issues: list[WorkflowIssue] = []
for repo, issues in issues_by_repo.items():
repo_descriptions = label_descriptions_by_repo.get(repo, {})
for issue in issues:
label_names = [label["name"] for label in issue.get("labels", [])]
group_label = _choose_group_label(label_names)
if group_label is None:
if _has_deliverables_group_label(label_names):
continue
pre_key = None
post_key = None
pre_label = None
post_label = None
else:
if "/" in group_label:
pre_key, post_key = group_label.split("/", 1)
pre_label = repo_descriptions.get(pre_key, pre_key)
post_label = repo_descriptions.get(group_label, post_key)
else:
pre_key = group_label
post_key = None
pre_label = repo_descriptions.get(group_label, group_label)
post_label = None
workflow_issues.append(
WorkflowIssue(
repo=repo,
number=int(issue["number"]),
title=issue["title"],
pre_group_key=pre_key,
pre_group_label=pre_label,
post_group_key=post_key,
post_group_label=post_label,
)
)
workflow_issues.sort(
key=lambda i: (
i.repo,
i.pre_group_key or "",
i.post_group_key or "",
i.number,
)
)
return workflow_issues
def fetch_blocks_edges(
owner: str,
token: str,
workflow_issues: list[WorkflowIssue],
) -> list[tuple[str, int, str, int]]:
issue_lookup = {(issue.repo, issue.number): issue for issue in workflow_issues}
edges: set[tuple[str, int, str, int]] = set()
for issue in workflow_issues:
blocked_by = _github_request(
owner,
issue.repo,
f"/issues/{issue.number}/dependencies/blocked_by",
token,
)
assert isinstance(blocked_by, list)
for blocker in blocked_by:
blocker_repo = blocker["repository"]["name"]
blocker_number = int(blocker["number"])
if (blocker_repo, blocker_number) not in issue_lookup:
continue
edges.add((blocker_repo, blocker_number, issue.repo, issue.number))
return sorted(edges)
def build_dot(
issues: list[WorkflowIssue],
blocks_edges: list[tuple[str, int, str, int]],
*,
owner: str | None = None,
rankdir: str = "LR",
) -> str:
by_repo_direct: dict[str, list[WorkflowIssue]] = {}
by_repo_pre_only: dict[str, dict[str, list[WorkflowIssue]]] = {}
by_repo_grouped: dict[str, dict[str, dict[str, list[WorkflowIssue]]]] = {}
for issue in issues:
if issue.pre_group_key is None:
by_repo_direct.setdefault(issue.repo, []).append(issue)
continue
if issue.post_group_key is None:
pre_only_map = by_repo_pre_only.setdefault(issue.repo, {})
pre_only_map.setdefault(issue.pre_group_key, []).append(issue)
continue
pre_map = by_repo_grouped.setdefault(issue.repo, {})
post_map = pre_map.setdefault(issue.pre_group_key, {})
post_map.setdefault(issue.post_group_key, []).append(issue)
pre_labels = {
(issue.repo, issue.pre_group_key): issue.pre_group_label
for issue in issues
if issue.pre_group_key is not None and issue.pre_group_label is not None
}
post_labels = {
(issue.repo, issue.pre_group_key, issue.post_group_key): issue.post_group_label
for issue in issues
if issue.pre_group_key is not None
and issue.post_group_key is not None
and issue.post_group_label is not None
}
all_repos = sorted(set(by_repo_direct) | set(by_repo_pre_only) | set(by_repo_grouped))
lines: list[str] = []
lines.append("digraph workflow {")
lines.append(
f" graph [compound=true, rankdir={rankdir}, nodesep={GRAPH_NODE_SEP}, ranksep={GRAPH_RANK_SEP}];"
)
lines.append(
f' node [shape=box, style="rounded", fontname={_quote(GRAPH_FONT_NAME)}, fontsize={GRAPH_FONT_SIZE}];'
)
lines.append(f" edge [fontname={_quote(GRAPH_FONT_NAME)}, fontsize=9];")
lines.append("")
def append_issue_node(indent: str, issue: WorkflowIssue) -> None:
number_html = _escape_html(f"#{issue.number}")
title_html = _escape_html(issue.title)
label_html = (
'<<TABLE BORDER="0" CELLBORDER="0" CELLPADDING="4" CELLSPACING="0">'
f'<TR><TD WIDTH="{ISSUE_LABEL_WIDTH}" ALIGN="LEFT"><B>{number_html}</B> &#183; {title_html}</TD></TR>'
"</TABLE>>"
)
node_label = f"#{issue.number} · {issue.title}"
attrs = [f"label={label_html}"]
if owner is not None:
issue_url = f"https://github.com/{owner}/{issue.repo}/issues/{issue.number}"
attrs.extend(
[
f"URL={_quote(issue_url)}",
f"href={_quote(issue_url)}",
f"tooltip={_quote(node_label)}",
'target="_blank"',
]
)
lines.append(f"{indent}{issue.node_id} [{', '.join(attrs)}];")
for repo in all_repos:
repo_key = _normalize_repo_key(repo)
lines.append(f' subgraph "cluster_repo_{repo_key}" {{')
lines.append(f" label={_quote(f'{repo} GitHub issues')};")
lines.append("")
for issue in sorted(by_repo_direct.get(repo, []), key=lambda item: item.number):
append_issue_node(" ", issue)
if by_repo_direct.get(repo):
lines.append("")
for pre_key in sorted(by_repo_pre_only.get(repo, {})):
pre_key_safe = re.sub(r"[^A-Za-z0-9_]", "_", pre_key)
pre_label = pre_labels[(repo, pre_key)] or pre_key
lines.append(f' subgraph "cluster_repo_{repo_key}_{pre_key_safe}" {{')
lines.append(f" label={_quote(pre_label)};")
for issue in sorted(by_repo_pre_only[repo][pre_key], key=lambda item: item.number):
append_issue_node(" ", issue)
lines.append(" }")
lines.append("")
for pre_key in sorted(by_repo_grouped.get(repo, {})):
pre_key_safe = re.sub(r"[^A-Za-z0-9_]", "_", pre_key)
pre_label = pre_labels[(repo, pre_key)] or pre_key
lines.append(f' subgraph "cluster_repo_{repo_key}_{pre_key_safe}" {{')
lines.append(f" label={_quote(pre_label)};")
lines.append("")
for post_key in sorted(by_repo_grouped[repo][pre_key]):
post_key_safe = re.sub(r"[^A-Za-z0-9_]", "_", post_key)
post_label = post_labels[(repo, pre_key, post_key)] or post_key
lines.append(
f' subgraph "cluster_repo_{repo_key}_{pre_key_safe}_{post_key_safe}" {{'
)
lines.append(f" label={_quote(post_label)};")
for issue in sorted(
by_repo_grouped[repo][pre_key][post_key], key=lambda item: item.number
):
append_issue_node(" ", issue)
lines.append(" }")
lines.append("")
lines.append(" }")
lines.append("")
lines.append(" }")
lines.append("")
for blocker_repo, blocker_number, blocked_repo, blocked_number in blocks_edges:
blocker_id = f"{_normalize_repo_key(blocker_repo)}_{blocker_number}"
blocked_id = f"{_normalize_repo_key(blocked_repo)}_{blocked_number}"
lines.append(f' {blocker_id} -> {blocked_id} [label="blocks"];')
lines.append("}")
lines.append("")
return "\n".join(lines)
def render_dot_to_svg(*, dot_path: Path, svg_path: Path) -> None:
dot_binary = os.environ.get("DOT_BIN", "dot")
result = subprocess.run(
[dot_binary, "-Tsvg", str(dot_path), "-o", str(svg_path)],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
detail = result.stderr.strip() or result.stdout.strip() or "unknown graphviz error"
raise RuntimeError(f"Graphviz render failed: {detail}")
def _parse_graphviz_json(dot_text: str, *, dot_bin: str = "dot") -> dict[str, Any]:
result = subprocess.run(
[dot_bin, "-Tjson"],
input=dot_text,
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
detail = result.stderr.strip() or result.stdout.strip() or "unknown graphviz error"
raise RuntimeError(f"Graphviz JSON render failed: {detail}")
return json.loads(result.stdout)
def _parse_xy(pos: str) -> tuple[float, float]:
x_text, y_text = pos.split(",", 1)
return float(x_text), float(y_text)
def _graphviz_size_inches_to_points(value: Any) -> float | None:
"""Graphviz JSON emits node width/height as inch amounts (strings or numbers)."""
if value is None:
return None
try:
inches = float(value)
except (TypeError, ValueError):
return None
return inches * 72.0
def _graphviz_max_y(graph_json: dict[str, Any]) -> float:
bb = graph_json.get("bb")
if not isinstance(bb, str):
return 0.0
# bb is "x0,y0,x1,y1"
parts = bb.split(",")
if len(parts) != 4:
return 0.0
return float(parts[3])
def _cluster_depths(cluster_parents: dict[int, int | None]) -> dict[int, int]:
cache: dict[int, int] = {}
def depth(cluster_id: int) -> int:
if cluster_id in cache:
return cache[cluster_id]
parent = cluster_parents.get(cluster_id)
if parent is None:
cache[cluster_id] = 0
else:
cache[cluster_id] = depth(parent) + 1
return cache[cluster_id]
for cluster_id in cluster_parents:
depth(cluster_id)
return cache
def build_cytoscape_preset_payload(
*,
owner: str,
workflow_issues: list[WorkflowIssue],
graphviz_json: dict[str, Any],
) -> dict[str, Any]:
objects = graphviz_json.get("objects", [])
if not isinstance(objects, list):
raise RuntimeError("Graphviz JSON missing objects list")
objects_by_gvid: dict[int, dict[str, Any]] = {}
node_gvid_by_name: dict[str, int] = {}
cluster_gvids: set[int] = set()
for obj in objects:
gvid = obj.get("_gvid")
name = obj.get("name")
if not isinstance(gvid, int) or not isinstance(name, str):
continue
objects_by_gvid[gvid] = obj
node_gvid_by_name[name] = gvid
if name.startswith("cluster_"):
cluster_gvids.add(gvid)
cluster_parents: dict[int, int | None] = {cluster_id: None for cluster_id in cluster_gvids}
for cluster_id in cluster_gvids:
cluster_obj = objects_by_gvid[cluster_id]
for child in cluster_obj.get("subgraphs", []) or []:
if isinstance(child, int) and child in cluster_gvids:
cluster_parents[child] = cluster_id
# Fallback parent inference when Graphviz JSON lacks explicit `subgraphs`.
cluster_nodes: dict[int, set[int]] = {
cluster_id: {
n for n in (objects_by_gvid[cluster_id].get("nodes", []) or []) if isinstance(n, int)
}
for cluster_id in cluster_gvids
}
for cluster_id in cluster_gvids:
if cluster_parents[cluster_id] is not None:
continue
my_nodes = cluster_nodes[cluster_id]
if not my_nodes:
continue
candidates: list[tuple[int, int]] = []
for other_id in cluster_gvids:
if other_id == cluster_id:
continue
other_nodes = cluster_nodes[other_id]
if my_nodes < other_nodes:
candidates.append((len(other_nodes), other_id))
if candidates:
candidates.sort()
cluster_parents[cluster_id] = candidates[0][1]
cluster_depth = _cluster_depths(cluster_parents)
node_to_clusters: dict[int, set[int]] = {}
for cluster_id in cluster_gvids:
cluster_obj = objects_by_gvid[cluster_id]
for node_gvid in cluster_obj.get("nodes", []) or []:
if not isinstance(node_gvid, int):
continue
node_to_clusters.setdefault(node_gvid, set()).add(cluster_id)
issue_by_node_name = {issue.node_id: issue for issue in workflow_issues}
elements_nodes: list[dict[str, Any]] = []
elements_edges: list[dict[str, Any]] = []
max_y = _graphviz_max_y(graphviz_json)
cluster_node_id_by_gvid = {
gvid: f"cluster::{objects_by_gvid[gvid]['name']}" for gvid in cluster_gvids
}
for cluster_id in sorted(cluster_gvids):
cluster_obj = objects_by_gvid[cluster_id]
label = cluster_obj.get("label") or cluster_obj.get("name")
data: dict[str, Any] = {
"id": cluster_node_id_by_gvid[cluster_id],
"label": label,
"type": "cluster",
"cluster_name": cluster_obj.get("name"),
}
parent_id = cluster_parents.get(cluster_id)
if parent_id is not None:
data["parent"] = cluster_node_id_by_gvid[parent_id]
elements_nodes.append({"data": data})
for issue in workflow_issues:
node_gvid = node_gvid_by_name.get(issue.node_id)
if node_gvid is None:
continue
node_obj = objects_by_gvid[node_gvid]
pos = node_obj.get("pos")
if not isinstance(pos, str):
continue
x, y = _parse_xy(pos)
y = max_y - y
data: dict[str, Any] = {
"id": issue.node_id,
"label": f"#{issue.number} · {issue.title}",
"type": "issue",
"repo": issue.repo,
"number": issue.number,
"title": issue.title,
"url": f"https://github.com/{owner}/{issue.repo}/issues/{issue.number}",
}
w_pt = _graphviz_size_inches_to_points(node_obj.get("width"))
h_pt = _graphviz_size_inches_to_points(node_obj.get("height"))
if w_pt is not None and h_pt is not None:
data["gv_width"] = w_pt
data["gv_height"] = h_pt
containing_clusters = node_to_clusters.get(node_gvid, set())
if containing_clusters:
leaf_cluster = max(containing_clusters, key=lambda cid: cluster_depth.get(cid, 0))
data["parent"] = cluster_node_id_by_gvid[leaf_cluster]
elements_nodes.append({"data": data, "position": {"x": x, "y": y}})
issue_node_names = set(issue_by_node_name)
for edge in graphviz_json.get("edges", []) or []:
tail = edge.get("tail")
head = edge.get("head")
if not isinstance(tail, int) or not isinstance(head, int):
continue
tail_obj = objects_by_gvid.get(tail)
head_obj = objects_by_gvid.get(head)
if tail_obj is None or head_obj is None:
continue
source = tail_obj.get("name")
target = head_obj.get("name")
if not isinstance(source, str) or not isinstance(target, str):
continue
if source not in issue_node_names or target not in issue_node_names:
continue
elements_edges.append(
{
"data": {
"id": f"edge::{source}->{target}",
"source": source,
"target": target,
"type": "blocks",
"label": "blocks",
}
}
)
return {
"meta": {
"owner": owner,
"repos": sorted({issue.repo for issue in workflow_issues}),
"issue_count": len(
[node for node in elements_nodes if node["data"]["type"] == "issue"]
),
"cluster_count": len(
[node for node in elements_nodes if node["data"]["type"] == "cluster"]
),
"edge_count": len(elements_edges),
},
"elements": {
"nodes": elements_nodes,
"edges": elements_edges,
},
}
def build_index_html(*, json_filename: str = "workflow.json") -> str:
return f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Workflow Graph (Graphviz preset + Cytoscape)</title>
<style>
:root {{
color-scheme: light dark;
font-family: Inter, Segoe UI, Roboto, Helvetica, Arial, sans-serif;
}}
body {{
margin: 0;
display: grid;
grid-template-columns: 320px 1fr;
height: 100vh;
}}
#sidebar {{
border-right: 1px solid #8884;
padding: 12px;
overflow: auto;
}}
#cy {{
width: 100%;
height: 100%;
display: block;
}}
input, select, button {{
width: 100%;
margin: 0.3rem 0;
padding: 0.45rem;
box-sizing: border-box;
}}
.muted {{
opacity: 0.8;
font-size: 0.9rem;
}}
</style>
</head>
<body>
<aside id="sidebar">
<h3>Workflow (Graphviz preset)</h3>
<div id="summary" class="muted">Loading...</div>
<label for="repoFilter">Repo filter</label>
<select id="repoFilter">
<option value="__all__">All repos</option>
</select>
<label for="searchBox">Search issues</label>
<input id="searchBox" type="text" placeholder="issue #, title..." />
<button id="resetView">Reset view</button>
<p class="muted">
Positions come from Graphviz (`dot -Tjson`) and are rendered with Cytoscape preset layout.
Click issue nodes to open GitHub.
</p>
</aside>
<main id="cy"></main>
<script src="https://unpkg.com/cytoscape@3.30.2/dist/cytoscape.min.js"></script>
<script>
async function init() {{
const res = await fetch({json.dumps(json_filename)});
if (!res.ok) {{
throw new Error(`Failed to load {json_filename}: ${{res.status}}`);
}}
const graph = await res.json();
const nodes = graph.elements.nodes || [];
const edges = graph.elements.edges || [];
const cy = cytoscape({{
container: document.getElementById('cy'),
elements: [...nodes, ...edges],
style: [
{{
selector: 'node',
style: {{
'label': 'data(label)',
'font-size': 11,
'text-wrap': 'wrap',
'text-max-width': 220,
'text-valign': 'center',
'text-halign': 'center',
'shape': 'round-rectangle',
}}
}},
{{
selector: 'node[type = "cluster"]',
style: {{
'background-opacity': 0.06,
'border-width': 1.4,
'border-style': 'dashed',
'border-color': '#666',
'font-size': 12,
'font-weight': 600,
'text-valign': 'top',
'text-halign': 'center',
'text-wrap': 'wrap',
'text-max-width': 260,
'text-margin-y': -8,
'padding': '14px',
}}
}},
{{
selector: 'node[type = "issue"]',
style: {{
'background-opacity': 1,
'background-color': '#6DA6FF',
'font-size': 10,
'text-wrap': 'wrap',
// Match Graphviz layout: node bbox from `dot -Tjson` (gv_width/gv_height), not wrapped-label bounds.
'text-max-width': (ele) => {{
const w = ele.data('gv_width');
if (w == null || Number.isNaN(Number(w))) return 220;
return Math.max(40, Number(w) - 24);
}},
'width': (ele) => {{
const w = ele.data('gv_width');
return (w != null && !Number.isNaN(Number(w))) ? Number(w) : 'label';
}},
'height': (ele) => {{
const h = ele.data('gv_height');
return (h != null && !Number.isNaN(Number(h))) ? Number(h) : 'label';
}},
'padding': '12px',
}}
}},
{{
selector: 'edge',
style: {{
'curve-style': 'bezier',
'target-arrow-shape': 'triangle',
'width': 1.6,
'line-color': '#666',
'target-arrow-color': '#666',
'label': 'data(label)',
'font-size': 9,
'text-background-opacity': 1,
'text-background-color': '#fff',
'text-background-padding': 1,
}}
}},
{{
selector: '.hidden',
style: {{
'display': 'none'
}}
}}
],
layout: {{
name: 'preset',
fit: true,
padding: 30
}}
}});
const summary = document.getElementById('summary');
summary.textContent = `${{graph.meta.issue_count}} issues · ${{graph.meta.edge_count}} blocks edges`;
const repoFilter = document.getElementById('repoFilter');
for (const repo of graph.meta.repos || []) {{
const option = document.createElement('option');
option.value = repo;
option.textContent = repo;
repoFilter.appendChild(option);
}}
const applyFilters = () => {{
const selectedRepo = repoFilter.value;
const query = document.getElementById('searchBox').value.trim().toLowerCase();
cy.elements().removeClass('hidden');
if (selectedRepo !== '__all__') {{
cy.nodes('[type = "issue"]').forEach((node) => {{
if (node.data('repo') !== selectedRepo) {{
node.addClass('hidden');
}}
}});
}}
if (query) {{
cy.nodes('[type = "issue"]').forEach((node) => {{
const hay = `${{node.data('number')}} ${{node.data('title')}}`.toLowerCase();
if (!hay.includes(query)) {{
node.addClass('hidden');
}}
}});
}}
cy.edges().forEach((edge) => {{
if (edge.source().hasClass('hidden') || edge.target().hasClass('hidden')) {{
edge.addClass('hidden');
}}
}});
}};
repoFilter.addEventListener('change', applyFilters);
document.getElementById('searchBox').addEventListener('input', applyFilters);
document.getElementById('resetView').addEventListener('click', () => {{
repoFilter.value = '__all__';
document.getElementById('searchBox').value = '';
cy.elements().removeClass('hidden');
cy.fit();
}});
cy.on('tap', 'node[type = "issue"]', (evt) => {{
const url = evt.target.data('url');
if (url) {{
window.open(url, '_blank', 'noopener');
}}
}});
}}
init().catch((error) => {{
document.getElementById('summary').textContent = String(error);
console.error(error);
}});
</script>
</body>
</html>
"""
def _fetch_workflow_data(
owner: str,
repos: list[str],
token: str,
) -> tuple[list[WorkflowIssue], list[tuple[str, int, str, int]]]:
issues_by_repo: dict[str, list[dict[str, Any]]] = {}
label_descriptions_by_repo: dict[str, dict[str, str]] = {}
for repo in repos:
issues_by_repo[repo] = fetch_open_issues(owner, repo, token)
label_descriptions_by_repo[repo] = fetch_label_descriptions(owner, repo, token)
workflow_issues = build_workflow_issues(issues_by_repo, label_descriptions_by_repo)
blocks_edges = fetch_blocks_edges(owner, token, workflow_issues)
return workflow_issues, blocks_edges
def _add_shared_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--owner", default=DEFAULT_OWNER)
parser.add_argument(
"--repos",
nargs="+",
default=DEFAULT_REPOS,
help="Repository names under --owner.",
)
parser.add_argument(
"--layout",
choices=["LR", "TB"],
default="LR",
help="Graph layout direction: TB (top-down, default) or LR (left-right).",
)
parser.add_argument("--github-token")
def _run_dot_command(args: argparse.Namespace) -> None:
token = _resolve_token(args.github_token)
workflow_issues, blocks_edges = _fetch_workflow_data(args.owner, args.repos, token)
dot = build_dot(workflow_issues, blocks_edges, owner=args.owner, rankdir=args.layout)
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(dot, encoding="utf-8")
print(f"Wrote DOT graph to {args.output}")
print(f"Issues included: {len(workflow_issues)}")
print(f"Blocks edges included: {len(blocks_edges)}")
if args.render_svg:
svg_output = (
args.svg_output if args.svg_output is not None else args.output.with_suffix(".svg")
)
svg_output.parent.mkdir(parents=True, exist_ok=True)
render_dot_to_svg(dot_path=args.output, svg_path=svg_output)
print(f"Rendered SVG graph to {svg_output}")
def _run_cytoscape_command(args: argparse.Namespace) -> None:
token = _resolve_token(args.github_token)
workflow_issues, blocks_edges = _fetch_workflow_data(args.owner, args.repos, token)
dot_text = build_dot(workflow_issues, blocks_edges, owner=args.owner, rankdir=args.layout)
dot_bin = args.dot_bin or "dot"
graphviz_json = _parse_graphviz_json(dot_text, dot_bin=dot_bin)
payload = build_cytoscape_preset_payload(
owner=args.owner,
workflow_issues=workflow_issues,
graphviz_json=graphviz_json,
)
json_output = args.docs_dir / "workflow.json"
html_output = args.docs_dir / "index.html"
json_output.parent.mkdir(parents=True, exist_ok=True)
json_output.write_text(json.dumps(payload, indent=2), encoding="utf-8")
html_output.parent.mkdir(parents=True, exist_ok=True)
html_output.write_text(build_index_html(json_filename=json_output.name), encoding="utf-8")
print(f"Wrote workflow JSON to {json_output}")
print(f"Wrote workflow HTML to {html_output}")
print(f"Issues included: {payload['meta']['issue_count']}")
print(f"Clusters included: {payload['meta']['cluster_count']}")
print(f"Blocks edges included: {payload['meta']['edge_count']}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Build workflow graphs from open GitHub issues and labels."
)
subparsers = parser.add_subparsers(dest="command", required=True)
dot_parser = subparsers.add_parser("dot", help="Write a Graphviz DOT file (optionally SVG).")
_add_shared_args(dot_parser)
dot_parser.add_argument(
"--output",
type=Path,
default=Path("artifacts/workflow.dot"),
)
dot_parser.add_argument(
"--render-svg",
action="store_true",
help="Render SVG from generated DOT using Graphviz `dot`.",
)
dot_parser.add_argument(
"--svg-output",
type=Path,
help="Path for rendered SVG output (default: <output with .svg suffix>).",
)
dot_parser.set_defaults(handler=_run_dot_command)
cytoscape_parser = subparsers.add_parser(
"cytoscape",
help="Write Cytoscape preset JSON and HTML under docs/.",
)
_add_shared_args(cytoscape_parser)
cytoscape_parser.add_argument("--docs-dir", type=Path, default=Path("docs"))
cytoscape_parser.add_argument(
"--dot-bin",
default=None,
help="Path to Graphviz dot executable.",
)
cytoscape_parser.set_defaults(handler=_run_cytoscape_command)
args = parser.parse_args()
args.handler(args)
if __name__ == "__main__":
main()
@solar-flare99

Copy link
Copy Markdown

This is great! We would love for you to checkout immunity-agent, security layer for coding agents to nudge them and write secure code (like secret management, prompt injection defense and choosing right open source packages)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment