Last active
June 17, 2025 18:43
-
-
Save robbiemu/73263b3da9369ac15eb3fde64b8f745e to your computer and use it in GitHub Desktop.
pr_capture.py - captures and converts GitHub PR data to comprehensive markdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
pr-capture: A CLI tool to capture GitHub PR data into a comprehensive markdown file. | |
""" | |
import argparse | |
from datetime import datetime | |
import json | |
import re | |
import subprocess | |
import sys | |
APP_VERSION = "1.7" | |
HEADING_MARKER_FORMAT = "§ {text}" | |
SECTIONS = [ | |
"overview", | |
"description", | |
"linked_issues", | |
"files", | |
"reviews", | |
"comments", | |
"commits", | |
] | |
FORMATTERS = {} | |
def create_heading(level, text): | |
"""Creates a specially formatted markdown heading using the global format.""" | |
markdown_prefix = "#" * level | |
custom_marker = HEADING_MARKER_FORMAT.format(text=text) | |
return f"{markdown_prefix} {custom_marker}" | |
def run_command(command, check=True): | |
"""Executes a shell command and returns its stdout.""" | |
try: | |
result = subprocess.run( | |
command, | |
capture_output=True, | |
text=True, | |
check=False, | |
encoding="utf-8", | |
) | |
if check and result.returncode != 0: | |
print( | |
f"Error: Command failed with exit code {result.returncode}", | |
file=sys.stderr, | |
) | |
print(f"Command: {' '.join(command)}", file=sys.stderr) | |
print(f"Stderr: {result.stderr.strip()}", file=sys.stderr) | |
sys.exit(1) | |
return result.stdout.strip() | |
except FileNotFoundError: | |
print( | |
f"Error: Command '{command[0]}' not found. Please ensure 'gh' is installed.", | |
file=sys.stderr, | |
) | |
sys.exit(1) | |
except Exception as e: | |
print(f"An unexpected error occurred: {e}", file=sys.stderr) | |
sys.exit(1) | |
def check_gh_cli(): | |
print("--> Checking for 'gh' CLI...") | |
run_command(["gh", "--version"]) | |
print("--> Checking 'gh' authentication status...") | |
run_command(["gh", "auth", "status"]) | |
print("'gh' is installed and authenticated.\n") | |
def parse_iso_date(date_str, fmt="%Y-%m-%d"): | |
if not date_str: | |
return "N/A" | |
if date_str.endswith("Z"): | |
date_str = date_str[:-1] + "+00:00" | |
return datetime.fromisoformat(date_str).strftime(fmt) | |
def fetch_pr_data(repo, pr_number): | |
print(f"--> Fetching core data for PR #{pr_number} from {repo} via GraphQL...") | |
owner, repo_name = repo.split("/") | |
graphql_query = """ | |
query($owner: String!, $repo: String!, $pr: Int!) { | |
repository(owner: $owner, name: $repo) { | |
pullRequest(number: $pr) { | |
title | |
body | |
author { login } | |
assignees(first: 10) { nodes { login } } | |
labels(first: 20) { nodes { name } } | |
milestone { title } | |
createdAt | |
updatedAt | |
mergedAt | |
closedAt | |
state | |
baseRefName | |
headRefName | |
reviewRequests(first: 10) { nodes { requestedReviewer { ... on User { login } } } } | |
comments(first: 100) { | |
nodes { | |
author { login } | |
createdAt | |
body | |
} | |
} | |
reviews(first: 50) { | |
nodes { | |
author { login } | |
state | |
body | |
comments(first: 50) { | |
nodes { | |
path | |
position | |
originalPosition | |
diffHunk | |
body | |
} | |
} | |
} | |
} | |
commits(first: 100) { | |
nodes { | |
commit { | |
oid | |
messageHeadline | |
} | |
} | |
} | |
} | |
} | |
} | |
""" | |
# Use -f for String variables and -F for Int variables. | |
command = [ | |
"gh", | |
"api", | |
"graphql", | |
"-f", | |
f"owner={owner}", # -f for String | |
"-f", | |
f"repo={repo_name}", # -f for String | |
"-F", | |
f"pr={pr_number}", # -F for Int! THIS IS THE FIX. | |
"--raw-field", | |
f"query={graphql_query}", | |
] | |
response_json = run_command(command) | |
data = json.loads(response_json) | |
pr_data = data.get("data", {}).get("repository", {}).get("pullRequest", {}) | |
if not pr_data: | |
print( | |
f"Error: Could not find PR #{pr_number} in repo {repo}. Check for typos.", | |
file=sys.stderr, | |
) | |
print(f"GraphQL Response: {response_json}", file=sys.stderr) | |
sys.exit(1) | |
return pr_data | |
def fetch_pr_files(repo, pr_number): | |
print(f"--> Fetching files changed for PR #{pr_number}...") | |
command = ["gh", "pr", "diff", str(pr_number), "--repo", repo, "--name-only"] | |
files_str = run_command(command) | |
return files_str.splitlines() if files_str else [] | |
def fetch_pr_diff(repo, pr_number): | |
print(f"--> Fetching full PR diff for #{pr_number}...") | |
command = ["gh", "pr", "diff", str(pr_number), "--repo", repo] | |
return run_command(command) | |
def fetch_linked_issues_data(repo, pr_body, debug=False): | |
print("--> Parsing and fetching linked issues...") | |
if debug: | |
print( | |
"\n[DEBUG] Full PR Body for issue parsing:\n-----------------------------------------\n" | |
+ pr_body | |
+ "\n-----------------------------------------" | |
) | |
pattern_hash = r"#(\d+)" | |
found_by_hash = re.findall(pattern_hash, pr_body) | |
pattern_url = rf"https?://github\.com/{repo}/issues/(\d+)" | |
found_by_url = re.findall(pattern_url, pr_body) | |
issue_numbers = set(found_by_hash + found_by_url) | |
if debug: | |
print(f"[DEBUG] Found issue numbers: {issue_numbers}\n") | |
if not issue_numbers: | |
print(" - No linked issues found in PR body.") | |
return [] | |
issues_data = [] | |
for number in issue_numbers: | |
print(f" - Fetching issue #{number}...") | |
command = [ | |
"gh", | |
"issue", | |
"view", | |
number, | |
"--repo", | |
repo, | |
"--json", | |
"title,state,body", | |
] | |
issue_json_str = run_command(command, check=False) | |
if debug: | |
print(f"[DEBUG] Raw JSON for issue #{number}: {issue_json_str}") | |
if issue_json_str: | |
try: | |
issue_data = json.loads(issue_json_str) | |
issue_data["number"] = number | |
issues_data.append(issue_data) | |
except json.JSONDecodeError: | |
print( | |
f" - Warning: Could not parse JSON for issue #{number}", | |
file=sys.stderr, | |
) | |
return issues_data | |
def format_overview(data, **kwargs): | |
lines = [ | |
f"# PR #{data['number']}: {data.get('title', 'N/A')}", | |
"", | |
create_heading(2, "Overview"), | |
] | |
lines.append(f"- **Author:** @{data['author']['login']}") | |
lines.append(f"- **Status:** {data.get('state', 'N/A').lower()}") | |
lines.append(f"- **Created:** {parse_iso_date(data.get('createdAt'))}") | |
if data.get("mergedAt"): | |
lines.append(f"- **Merged:** {parse_iso_date(data.get('mergedAt'))}") | |
lines.append( | |
f"- **Base:** {data.get('baseRefName', 'N/A')} ← **Head:** {data.get('headRefName', 'N/A')}" | |
) | |
return "\n".join(lines) | |
FORMATTERS["overview"] = format_overview | |
def format_description(data, **kwargs): | |
body = data.get("body") | |
if not body or not body.strip(): | |
return f"{create_heading(2, 'Description')}\n\n_No description provided._" | |
return f"{create_heading(2, 'Description')}\n\n{body}" | |
FORMATTERS["description"] = format_description | |
def format_linked_issues(issues_data, include_body=True, **kwargs): | |
if not issues_data: | |
return "" | |
lines = [create_heading(2, "Linked Issues")] | |
for issue in issues_data: | |
lines.append( | |
f"- **#{issue['number']}: {issue.get('title', 'N/A')}** ({issue.get('state', 'N/A')})" | |
) | |
if include_body: | |
issue_body = issue.get("body") | |
if issue_body and issue_body.strip(): | |
indented_body = "\n".join( | |
[f" {line}" for line in issue_body.splitlines()] | |
) | |
lines.append(indented_body) | |
lines.append("") | |
return "\n".join(lines) | |
FORMATTERS["linked_issues"] = format_linked_issues | |
def format_files(files_list, **kwargs): | |
if not files_list: | |
return "" | |
lines = [create_heading(2, f"Files Changed ({len(files_list)} files)")] | |
lines.extend([f"- `{file}`" for file in files_list]) | |
return "\n".join(lines) | |
FORMATTERS["files"] = format_files | |
def format_diff_snippet(diff_hunk, position, context_lines=2): | |
""" | |
Parses a full diff hunk and extracts a small snippet around a commented line. | |
Args: | |
diff_hunk (str): The full diff hunk from the API. | |
position (int): The line number in the file where the comment is. | |
context_lines (int): Number of lines to show before and after the target line. | |
Returns: | |
str: A formatted, concise snippet of the diff. | |
""" | |
lines = diff_hunk.split("\n") | |
if not lines: | |
return "" | |
# The first line is the hunk header, e.g., "@@ -0,0 +1,236 @@" | |
hunk_header = lines[0] | |
# Extract the starting line number for the new file from the header | |
match = re.search(r"\+([0-9]+)", hunk_header) | |
if not match: | |
# If we can't parse the header, just truncate the hunk as a fallback | |
return "\n".join(lines[: context_lines * 2 + 1]) | |
new_start_line = int(match.group(1)) | |
# Find the index of the commented line within the hunk | |
target_hunk_index = -1 | |
current_file_line = new_start_line - 1 | |
# We start searching from index 1 to skip the header | |
for i, line in enumerate(lines[1:], 1): | |
# Only count lines that are part of the file content (added or unchanged) | |
if line.startswith("+") or line.startswith(" "): | |
current_file_line += 1 | |
if current_file_line == position: | |
target_hunk_index = i | |
break | |
if target_hunk_index == -1: | |
# Fallback if we couldn't find the line (should be rare) | |
return "(Could not locate the specific line in the diff hunk)\n" + "\n".join( | |
lines[:5] | |
) | |
# Calculate the start and end of our snippet slice | |
start = max(1, target_hunk_index - context_lines) # Keep header if it's close | |
end = min(len(lines), target_hunk_index + context_lines + 1) | |
snippet_lines = lines[start:end] | |
# Add an ellipsis if the snippet is not at the start of the hunk | |
if start > 1: | |
snippet_lines.insert(0, "...") | |
# Add an ellipsis if the snippet is not at the end of the hunk | |
if end < len(lines): | |
snippet_lines.append("...") | |
return "\n".join(snippet_lines) | |
def format_reviews(data, **kwargs): | |
reviews_with_comments = [ | |
r | |
for r in data.get("reviews", {}).get("nodes", []) | |
if r.get("comments", {}).get("nodes") | |
] | |
if not reviews_with_comments: | |
return "" | |
lines = [create_heading(2, "Code Review Comments")] | |
for review in reviews_with_comments: | |
state = review["state"].replace("_", " ").title() | |
author_login = review.get("author", {}).get("login") or "ghost" | |
lines.append(create_heading(3, f"Review by @{author_login} ({state})")) | |
if review.get("body"): | |
lines.append(f"> {review['body']}\n") | |
for comment in review["comments"]["nodes"]: | |
position = comment.get("position") or comment.get("originalPosition") | |
# A true line-level comment must have a position AND a non-empty diff hunk. | |
# We use .strip() to ensure a hunk with only whitespace is treated as empty. | |
if position and comment.get("diffHunk", "").strip(): | |
# This is a comment on a specific line with context. | |
lines.append(f"**File:** `{comment['path']}:{position}`") | |
lines.append("**Context:**") | |
snippet = format_diff_snippet(comment["diffHunk"], position) | |
indented_diff = "```diff\n" + snippet + "\n```" | |
lines.append(indented_diff) | |
else: | |
# This is a file-level comment (or one without context). | |
lines.append(f"**File:** `{comment['path']}` (File-level comment)") | |
lines.append(f"**Comment:** {comment['body']}\n") | |
lines.append("") | |
return "\n".join(lines) | |
FORMATTERS["reviews"] = format_reviews | |
def format_comments(data, **kwargs): | |
# GraphQL nests the list in "nodes" | |
comments = data.get("comments", {}).get("nodes", []) | |
if not comments: | |
return "" | |
lines = [create_heading(2, "General Comments")] | |
for comment in comments: | |
author = comment.get("author", {}).get("login") or "ghost" | |
timestamp = parse_iso_date(comment["createdAt"], "%Y-%m-%d %H:%M") | |
lines.append(create_heading(3, f"@{author} - {timestamp}")) | |
lines.append(f"{comment['body']}\n") | |
return "\n".join(lines) | |
FORMATTERS["comments"] = format_comments | |
def format_commits(data, **kwargs): | |
# GraphQL nests commits differently | |
commits_data = data.get("commits", {}).get("nodes", []) | |
if not commits_data: | |
return "" | |
lines = [create_heading(2, f"Commits ({len(commits_data)} commits)")] | |
for node in commits_data: | |
commit = node.get("commit", {}) | |
oid_short = commit.get("oid", "-------")[:7] | |
message = commit.get("messageHeadline", "No commit message") | |
lines.append(f"- `{oid_short}`: {message}") | |
return "\n".join(lines) | |
FORMATTERS["commits"] = format_commits | |
def format_diff(diff_text, **kwargs): | |
if not diff_text: | |
return "" | |
return f"{create_heading(2, 'PR Diff')}\n\n```diff\n{diff_text}\n```" | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="A CLI tool that captures and converts GitHub PR data to comprehensive markdown.", | |
formatter_class=argparse.RawTextHelpFormatter, | |
) | |
parser.add_argument( | |
"--repo", required=True, help="The repository in OWNER/REPO format." | |
) | |
parser.add_argument( | |
"--pr", | |
dest="pr_number", | |
required=True, | |
type=int, | |
help="The pull request number.", | |
) | |
parser.add_argument( | |
"--output", help="Output file path. Defaults to 'pr-{number}-summary.md'." | |
) | |
parser.add_argument( | |
"--order", | |
default=",".join(SECTIONS), | |
help=f"Order of sections.\nAvailable sections: {', '.join(SECTIONS)}", | |
) | |
parser.add_argument( | |
"--debug", | |
action="store_true", | |
help="Enable verbose debugging output for troubleshooting.", | |
) | |
parser.add_argument( | |
"--include-diff", | |
action="store_true", | |
help="Include the full PR diff at the end of the document.", | |
) | |
parser.add_argument( | |
"--version", action="version", version=f"%(prog)s {APP_VERSION}" | |
) | |
for section in SECTIONS: | |
help_text = f"Disable the '{section.replace('_', ' ')}' section." | |
if section == "linked_issues": | |
help_text = ( | |
"Omit the body/description from linked issues (shows title only)." | |
) | |
parser.add_argument( | |
f"--no-{section.replace('_', '-')}", | |
dest=section, | |
action="store_false", | |
help=help_text, | |
) | |
args = parser.parse_args() | |
ordered_sections = [s.strip() for s in args.order.split(",")] | |
invalid_sections = [s for s in ordered_sections if s not in SECTIONS] | |
if invalid_sections: | |
print( | |
f"Error: Invalid section(s) in --order flag: {', '.join(invalid_sections)}", | |
file=sys.stderr, | |
) | |
print(f"Available sections are: {', '.join(SECTIONS)}", file=sys.stderr) | |
sys.exit(1) | |
check_gh_cli() | |
pr_data = fetch_pr_data(args.repo, args.pr_number) | |
pr_data["number"] = args.pr_number | |
files_list = fetch_pr_files(args.repo, args.pr_number) | |
linked_issues_data = fetch_linked_issues_data( | |
args.repo, pr_data.get("body", ""), args.debug | |
) | |
print("\nAll data fetched successfully.\n") | |
print("--> Assembling Markdown document...") | |
markdown_parts = [] | |
all_data = { | |
"data": pr_data, | |
"files_list": files_list, | |
"issues_data": linked_issues_data, | |
} | |
for section_name in ordered_sections: | |
formatter_func = FORMATTERS.get(section_name) | |
if not formatter_func: | |
continue | |
part = None | |
if section_name == "linked_issues": | |
print(f" - Formatting '{section_name}' section...") | |
part = formatter_func( | |
issues_data=linked_issues_data, include_body=args.linked_issues | |
) | |
elif getattr(args, section_name): | |
print(f" - Formatting '{section_name}' section...") | |
part = formatter_func(**all_data) | |
if part: | |
markdown_parts.append(part) | |
if args.include_diff: | |
diff_text = fetch_pr_diff(args.repo, args.pr_number) | |
diff_part = format_diff(diff_text) | |
if diff_part: | |
print(" - Formatting 'diff' section...") | |
markdown_parts.append(diff_part) | |
final_markdown = "\n\n".join(markdown_parts) | |
output_file = args.output or f"pr-{args.pr_number}-summary.md" | |
try: | |
with open(output_file, "w", encoding="utf-8") as f: | |
f.write(final_markdown) | |
print(f"\n✅ Success! PR summary saved to '{output_file}'") | |
except IOError as e: | |
print(f"\nError: Could not write to file '{output_file}': {e}", file=sys.stderr) | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment