Created
          October 14, 2025 22:34 
        
      - 
      
 - 
        
Save CaselIT/4378c70cb6d03d69f6407f21b038d50f to your computer and use it in GitHub Desktop.  
    Get unresolved pr comments from github
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #!/usr/bin/env python3 | |
| """ | |
| Download unresolved comments from a GitHub PR and save them as JSON. | |
| Usage: | |
| pr_comments.py <pr_url> [--output PATH] | |
| Auth: | |
| Expects GITHUB_READ_TOKEN environment variable (recommended). If missing, | |
| the script will print an error suggesting creating a token with read access. | |
| This is a single-file, minimal implementation intended for small, ad-hoc use. | |
| NOTE: copyright copilot | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import sys | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| GITHUB_TOKEN_ENV = "GITHUB_READ_TOKEN" | |
| GRAPHQL_URL = "https://api.github.com/graphql" | |
| REST_API_BASE = "https://api.github.com" | |
| def parse_pr_url(url: str) -> Tuple[str, str, int]: | |
| """Parse a GitHub PR URL and return (owner, repo, number). | |
| Supports URLs like: | |
| https://github.com/owner/repo/pull/123 | |
| https://github.example.com/owner/repo/pull/123 | |
| """ | |
| m = re.search(r"/([^/]+)/([^/]+)/pull/(\d+)", url) | |
| if not m: | |
| raise ValueError(f"Could not parse PR URL: {url}") | |
| owner, repo, num = m.group(1), m.group(2), int(m.group(3)) | |
| return owner, repo, num | |
| def graphql_query(query: str, variables: dict, token: str) -> dict: | |
| headers = {"Authorization": f"bearer {token}"} | |
| resp = requests.post( | |
| GRAPHQL_URL, json={"query": query, "variables": variables}, headers=headers | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if "errors" in data: | |
| raise RuntimeError(f"GraphQL errors: {data['errors']}") | |
| return data["data"] | |
| def fetch_unresolved_threads_graphql( | |
| owner: str, repo: str, pr_number: int, token: str | |
| ) -> List[dict]: | |
| # Page through reviewThreads and collect unresolved ones. | |
| query = """ | |
| query($owner: String!, $repo: String!, $number: Int!, $cursor: String) { | |
| repository(owner: $owner, name: $repo) { | |
| pullRequest(number: $number) { | |
| reviewThreads(first: 50, after: $cursor) { | |
| pageInfo { hasNextPage endCursor } | |
| nodes { | |
| id | |
| isResolved | |
| path | |
| originalLine | |
| comments(first: 50) { | |
| nodes { | |
| id | |
| author { login } | |
| body | |
| createdAt | |
| url | |
| originalPosition | |
| position | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| threads: List[dict] = [] | |
| cursor: Optional[str] = None | |
| while True: | |
| variables = { | |
| "owner": owner, | |
| "repo": repo, | |
| "number": pr_number, | |
| "cursor": cursor, | |
| } | |
| data = graphql_query(query, variables, token) | |
| pr = data.get("repository", {}).get("pullRequest") | |
| if pr is None: | |
| raise RuntimeError("Pull request not found or insufficient permissions") | |
| rt = pr.get("reviewThreads") | |
| if not rt: | |
| break | |
| nodes = rt.get("nodes") or [] | |
| for n in nodes: | |
| # Only keep unresolved threads | |
| if n.get("isResolved"): | |
| continue | |
| comments: List[dict] = [] | |
| for c in n.get("comments", {}).get("nodes") or []: | |
| comments.append( | |
| { | |
| "id": c.get("id"), | |
| "author": (c.get("author") or {}).get("login"), | |
| "body": c.get("body"), | |
| "created_at": c.get("createdAt"), | |
| "url": c.get("url"), | |
| "original_position": c.get("originalPosition"), | |
| "position": c.get("position"), | |
| } | |
| ) | |
| threads.append( | |
| { | |
| "path": n.get("path"), | |
| "diff_hunk": None, | |
| "original_line": n.get("originalLine"), | |
| "original_start_line": None, | |
| "original_end_line": None, | |
| "permalink": None, | |
| "is_resolved": False, | |
| "comments": comments, | |
| "file_link": ( | |
| f"{n.get('path')}:{n.get('originalLine')}" | |
| if n.get('originalLine') is not None | |
| else n.get('path') | |
| ), | |
| } | |
| ) | |
| page_info = rt.get("pageInfo") or {} | |
| if page_info.get("hasNextPage"): | |
| cursor = page_info.get("endCursor") | |
| else: | |
| break | |
| return threads | |
| def fetch_review_comments_rest( | |
| owner: str, repo: str, pr_number: int, token: Optional[str] | |
| ) -> List[dict]: | |
| # Fallback: list review comments for the PR via REST. This returns all review comments | |
| # but does not reliably indicate resolution state. We'll return comments grouped by path | |
| url = f"{REST_API_BASE}/repos/{owner}/{repo}/pulls/{pr_number}/comments" | |
| headers = {} | |
| if token: | |
| headers["Authorization"] = f"token {token}" | |
| comments: List[dict] = [] | |
| params = {"per_page": 100} | |
| while url: | |
| resp = requests.get(url, headers=headers, params=params) | |
| resp.raise_for_status() | |
| page = resp.json() | |
| for c in page: | |
| comments.append( | |
| { | |
| "id": c.get("id"), | |
| "path": c.get("path"), | |
| "position": c.get("position"), | |
| "original_position": c.get("original_position"), | |
| "body": c.get("body"), | |
| "user": (c.get("user") or {}).get("login"), | |
| "created_at": c.get("created_at"), | |
| "url": c.get("html_url"), | |
| } | |
| ) | |
| # Link header pagination | |
| link = resp.headers.get("Link") | |
| next_url = None | |
| if link: | |
| # naive parse | |
| for part in link.split(","): | |
| if 'rel="next"' in part: | |
| m = re.search(r"<([^>]+)>", part) | |
| if m: | |
| next_url = m.group(1) | |
| break | |
| url = next_url | |
| # Group by path into threads-like structure | |
| threads_by_path: Dict[str, List[dict]] = {} | |
| for c in comments: | |
| threads_by_path.setdefault(c.get("path") or "", []).append(c) | |
| threads: List[dict] = [] | |
| for path, items in threads_by_path.items(): | |
| # try to pick a sensible line from the first comment if present | |
| first_pos = None | |
| if items: | |
| first = items[0] | |
| first_pos = first.get("original_position") or first.get("position") | |
| threads.append( | |
| { | |
| "path": path, | |
| "diff_hunk": None, | |
| "original_line": None, | |
| "permalink": None, | |
| "is_resolved": None, | |
| "comments": items, | |
| "file_link": f"{path}:{first_pos}" if first_pos is not None else None, | |
| } | |
| ) | |
| return threads | |
| def format_threads_text(result: dict) -> str: | |
| """Return a human-friendly text representation of the threads. | |
| This is exported so tests can import it directly. | |
| """ | |
| lines: List[str] = [] | |
| pr = result.get("pr", {}) | |
| header = ( | |
| f"PR: {pr.get('owner')}/{pr.get('repo')}#{pr.get('number')} - {pr.get('url')}" | |
| ) | |
| lines.append(header) | |
| lines.append("-" * len(header)) | |
| for t in result.get("threads", []): | |
| path = t.get("path") or "<unknown>" | |
| orig = t.get("original_line") | |
| if orig is not None: | |
| lines.append(f"\nFile: {path}:{orig}") | |
| else: | |
| lines.append(f"\nFile: {path}") | |
| resolved = t.get("is_resolved") | |
| if resolved is True: | |
| lines.append(" Resolved: yes") | |
| elif resolved is False: | |
| lines.append(" Resolved: no") | |
| else: | |
| lines.append(" Resolved: unknown") | |
| for c in t.get("comments", []): | |
| author = c.get("author") or c.get("user") or "<unknown>" | |
| created = c.get("created_at") or c.get("createdAt") or "" | |
| pos = c.get("original_position") or c.get("position") | |
| url = c.get("url") or c.get("html_url") or "" | |
| lines.append(f" - {author} @ {created} (pos: {pos})") | |
| # indent body lines | |
| body = (c.get("body") or "").strip() | |
| for bl in body.splitlines(): | |
| lines.append(f" {bl}") | |
| if url: | |
| lines.append(f" link: {url}") | |
| return "\n".join(lines) | |
| def main(argv: Optional[List[str]] = None) -> int: | |
| epilog = ( | |
| "Requires a GitHub read token in the environment: GITHUB_READ_TOKEN.\n" | |
| "Create a fine-grained personal access token (repo read access) and set it before running.\n\n" | |
| "Example (PowerShell):\n" | |
| "$env:GITHUB_READ_TOKEN = '<your-token-here>'\n" | |
| "python scripts/pr_comments.py https://github.com/owner/repo/pull/123 -o comments.json -f json\n" | |
| ) | |
| parser = argparse.ArgumentParser( | |
| description="Download unresolved review threads from a GitHub PR and save them.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=epilog, | |
| ) | |
| parser.add_argument( | |
| "pr_url", help="Pull request URL (e.g. https://github.com/owner/repo/pull/123)" | |
| ) | |
| parser.add_argument( | |
| "--output", "-o", help="Output file path. If omitted, prints to stdout." | |
| ) | |
| parser.add_argument( | |
| "--format", | |
| "-f", | |
| choices=("json", "text"), | |
| default="json", | |
| help="Output format: json (default) or text (human-friendly)", | |
| ) | |
| args = parser.parse_args(argv) | |
| try: | |
| owner, repo, pr_number = parse_pr_url(args.pr_url) | |
| except ValueError as e: | |
| print(f"Error: {e}", file=sys.stderr) | |
| return 2 | |
| out_path = args.output | |
| if out_path: | |
| if os.path.exists(out_path): | |
| print(f"Error: output file already exists: {out_path}", file=sys.stderr) | |
| return 3 | |
| token = os.environ.get(GITHUB_TOKEN_ENV) | |
| if not token: | |
| print("Error: GITHUB_READ_TOKEN environment variable not set.") | |
| print( | |
| "Create a fine-grained personal access token with read access to the repository and set GITHUB_READ_TOKEN." | |
| ) | |
| print("See: https://github.com/settings/tokens (or use fine-grained tokens)") | |
| return 4 | |
| # Try GraphQL first | |
| try: | |
| threads = fetch_unresolved_threads_graphql(owner, repo, pr_number, token) | |
| except Exception as e: | |
| print(f"GraphQL fetch failed: {e}", file=sys.stderr) | |
| print( | |
| "Falling back to REST comments (unresolved state may be unavailable).", | |
| file=sys.stderr, | |
| ) | |
| try: | |
| threads = fetch_review_comments_rest(owner, repo, pr_number, token) | |
| except Exception as e2: | |
| print(f"REST fallback failed: {e2}", file=sys.stderr) | |
| return 5 | |
| result = { | |
| "pr": {"owner": owner, "repo": repo, "number": pr_number, "url": args.pr_url}, | |
| "threads": threads, | |
| } | |
| if args.format == "json": | |
| out_text = json.dumps(result, indent=2, ensure_ascii=False) | |
| else: | |
| out_text = format_threads_text(result) | |
| if out_path: | |
| with open(out_path, "w", encoding="utf-8") as f: | |
| f.write(out_text) | |
| print(f"Wrote {len(threads)} threads to {out_path} (format={args.format})") | |
| else: | |
| print(out_text) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment