Created
April 28, 2026 21:49
-
-
Save gartnera/2f57658d1c9f7729ff09ec475d2408e7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.11" | |
| # dependencies = [ | |
| # "PyGithub>=2.3", | |
| # ] | |
| # /// | |
| """Summarize your GitHub activity over a configurable time window. | |
| Auth: set GITHUB_TOKEN (a PAT with repo + read:user scopes) in the environment, | |
| or pass --token. Defaults to the previous 24 hours. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| from collections import defaultdict | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime, timedelta, timezone | |
| from github import Auth, Github | |
| def parse_args() -> argparse.Namespace: | |
| p = argparse.ArgumentParser(description=__doc__) | |
| p.add_argument("--since", help="ISO timestamp (UTC) for window start. Overrides --hours/--days.") | |
| p.add_argument("--until", help="ISO timestamp (UTC) for window end. Defaults to now.") | |
| p.add_argument("--hours", type=float, help="Window size in hours, ending at --until/now.") | |
| p.add_argument("--days", type=float, default=1.0, help="Window size in days (default: 1).") | |
| p.add_argument("--user", help="GitHub username (defaults to authenticated user).") | |
| p.add_argument("--token", help="GitHub token (defaults to $GITHUB_TOKEN).") | |
| p.add_argument("--stale-lookback-days", type=int, default=14, | |
| help="Look back this far for stale-PR detection (default: 14).") | |
| p.add_argument("--stale-threshold-days", type=float, default=2.0, | |
| help="Open PR with no update in this many days is flagged stale (default: 2).") | |
| p.add_argument("--no-stale", action="store_true", help="Skip the stale-PR section.") | |
| return p.parse_args() | |
| def resolve_window(args: argparse.Namespace) -> tuple[datetime, datetime]: | |
| until = datetime.fromisoformat(args.until).astimezone(timezone.utc) if args.until else datetime.now(timezone.utc) | |
| if args.since: | |
| since = datetime.fromisoformat(args.since).astimezone(timezone.utc) | |
| elif args.hours is not None: | |
| since = until - timedelta(hours=args.hours) | |
| else: | |
| since = until - timedelta(days=args.days) | |
| return since, until | |
| def fmt_repo(event) -> str: | |
| return event.repo.name if event.repo else "?" | |
| def g(obj, key, default=None): | |
| """Get a field from a payload value that may be a dict or a PyGithub object.""" | |
| if obj is None: | |
| return default | |
| if isinstance(obj, dict): | |
| return obj.get(key, default) | |
| return getattr(obj, key, default) | |
| def trunc(s: str, n: int = 100) -> str: | |
| s = (s or "").strip().splitlines()[0] if s else "" | |
| return s if len(s) <= n else s[: n - 1] + "…" | |
| def summarize(events, since: datetime, until: datetime, gh: Github | None = None, me: str | None = None) -> dict: | |
| # Per-repo, per-PR/issue threads with collected activity. | |
| threads: dict[tuple[str, int], dict] = {} | |
| pushes: dict[tuple[str, str], list[str]] = defaultdict(list) # (repo, branch) -> commit msgs | |
| misc: list[str] = [] | |
| total = 0 | |
| def thread(repo: str, num: int, title: str | None = None, is_pr: bool = False) -> dict: | |
| key = (repo, num) | |
| t = threads.get(key) | |
| if t is None: | |
| t = { | |
| "repo": repo, "num": num, "title": title or "", | |
| "actions": [], "comments": [], "reviews": [], "mine": False, | |
| "is_pr": is_pr, "state": "", "merged": False, "draft": False, | |
| "approvals": [], "changes_requested": [], "html_url": "", "author": "", | |
| } | |
| threads[key] = t | |
| if title and not t["title"]: | |
| t["title"] = title | |
| if is_pr: | |
| t["is_pr"] = True | |
| return t | |
| for e in events: | |
| created = e.created_at | |
| if created.tzinfo is None: | |
| created = created.replace(tzinfo=timezone.utc) | |
| if created < since: | |
| break | |
| if created > until: | |
| continue | |
| if e.type in ("CreateEvent", "DeleteEvent"): | |
| continue | |
| total += 1 | |
| repo = fmt_repo(e) | |
| try: | |
| payload = (e.raw_data or {}).get("payload") or {} | |
| except Exception: | |
| payload = e.payload or {} | |
| match e.type: | |
| case "PushEvent": | |
| ref = (g(payload, "ref") or "").replace("refs/heads/", "") | |
| for c in g(payload, "commits", []) or []: | |
| pushes[(repo, ref)].append(trunc(g(c, "message", ""))) | |
| case "PullRequestEvent": | |
| pr = g(payload, "pull_request") or {} | |
| action = g(payload, "action", "?") | |
| num = g(pr, "number") | |
| t = thread(repo, num, g(pr, "title"), is_pr=True) | |
| t["mine"] = True | |
| if action == "opened": | |
| t["actions"].append("opened") | |
| elif action == "closed" and g(pr, "merged"): | |
| t["actions"].append("merged") | |
| elif action == "closed": | |
| t["actions"].append("closed") | |
| elif action == "reopened": | |
| t["actions"].append("reopened") | |
| # synchronize / edited / etc. are noise — drop | |
| case "PullRequestReviewEvent": | |
| pr = g(payload, "pull_request") or {} | |
| review = g(payload, "review") or {} | |
| num = g(pr, "number") | |
| t = thread(repo, num, g(pr, "title"), is_pr=True) | |
| t["reviews"].append(g(review, "state") or "reviewed") | |
| case "PullRequestReviewCommentEvent" | "IssueCommentEvent": | |
| comment = g(payload, "comment") or {} | |
| issue = g(payload, "issue") or g(payload, "pull_request") or {} | |
| num = g(issue, "number") | |
| if num is None: | |
| continue | |
| # PR review comments are always PRs; issue comments are PRs iff issue.pull_request set. | |
| is_pr = e.type == "PullRequestReviewCommentEvent" or bool(g(issue, "pull_request")) | |
| t = thread(repo, num, g(issue, "title"), is_pr=is_pr) | |
| t["comments"].append(trunc(g(comment, "body", ""), 140)) | |
| case "IssuesEvent": | |
| issue = g(payload, "issue") or {} | |
| action = g(payload, "action", "?") | |
| num = g(issue, "number") | |
| t = thread(repo, num, g(issue, "title")) | |
| t["actions"].append(f"issue/{action}") | |
| case "ReleaseEvent": | |
| rel = g(payload, "release") or {} | |
| misc.append(f"release {repo} {g(rel, 'tag_name')}: {g(rel, 'name') or ''}") | |
| case "ForkEvent": | |
| misc.append(f"forked {repo}") | |
| case "WatchEvent": | |
| misc.append(f"starred {repo}") | |
| case other: | |
| misc.append(f"{other} {repo}") | |
| # Hydrate threads in parallel: titles + current state (open/closed/merged/draft) + outside review status. | |
| if gh is not None and threads: | |
| if me is None: | |
| me = gh.get_user().login | |
| def hydrate(t: dict) -> None: | |
| try: | |
| repo_obj = gh.get_repo(t["repo"]) | |
| if t["is_pr"]: | |
| pr = repo_obj.get_pull(t["num"]) | |
| t["title"] = t["title"] or pr.title or "" | |
| t["html_url"] = pr.html_url or "" | |
| t["draft"] = bool(pr.draft) | |
| if pr.user: | |
| t["author"] = pr.user.login | |
| if pr.user.login == me: | |
| t["mine"] = True | |
| if pr.merged: | |
| t["state"] = "merged" | |
| else: | |
| t["state"] = pr.state # "open" or "closed" | |
| t["merged"] = bool(pr.merged) | |
| # Latest review state per outside reviewer. | |
| latest: dict[str, str] = {} | |
| for r in pr.get_reviews(): | |
| user = r.user.login if r.user else None | |
| if not user or user == me: | |
| continue | |
| if r.state in ("APPROVED", "CHANGES_REQUESTED", "DISMISSED"): | |
| latest[user] = r.state | |
| t["approvals"] = [u for u, s in latest.items() if s == "APPROVED"] | |
| t["changes_requested"] = [u for u, s in latest.items() if s == "CHANGES_REQUESTED"] | |
| else: | |
| issue = repo_obj.get_issue(t["num"]) | |
| t["title"] = t["title"] or issue.title or "" | |
| t["html_url"] = issue.html_url or "" | |
| t["state"] = issue.state | |
| if issue.user: | |
| t["author"] = issue.user.login | |
| if issue.user.login == me: | |
| t["mine"] = True | |
| except Exception: | |
| pass | |
| with ThreadPoolExecutor(max_workers=16) as pool: | |
| list(pool.map(hydrate, threads.values())) | |
| return {"total": total, "threads": threads, "pushes": pushes, "misc": misc} | |
| # ANSI helpers — no-op when not a TTY. | |
| def _color(code: str): | |
| def fn(s: str) -> str: | |
| if not sys.stdout.isatty(): | |
| return s | |
| return f"\033[{code}m{s}\033[0m" | |
| return fn | |
| bold = _color("1") | |
| dim = _color("2") | |
| cyan = _color("36") | |
| green = _color("32") | |
| yellow = _color("33") | |
| red = _color("31") | |
| magenta = _color("35") | |
| def render(summary: dict) -> None: | |
| threads = summary["threads"] | |
| pushes = summary["pushes"] | |
| misc = summary["misc"] | |
| mine = [t for t in threads.values() if t["mine"]] | |
| reviewed = [t for t in threads.values() if not t["mine"]] | |
| action_color = { | |
| "opened": green, "merged": magenta, "closed": red, "reopened": yellow, | |
| } | |
| state_color = {"merged": magenta, "closed": red, "open": green} | |
| def render_thread(t: dict) -> None: | |
| num = t["num"] | |
| title = t["title"] or dim("(no title)") | |
| tags = [] | |
| if t["state"]: | |
| label = "draft" if t["draft"] and t["state"] == "open" else t["state"] | |
| tags.append(state_color.get(t["state"], yellow)(label)) | |
| if t["approvals"]: | |
| tags.append(green(f"approved by {', '.join(t['approvals'])}")) | |
| if t["changes_requested"]: | |
| tags.append(red(f"changes requested by {', '.join(t['changes_requested'])}")) | |
| for a in t["actions"]: | |
| # Don't repeat what state already shows. | |
| if a in ("merged", "closed", "opened") and t["state"]: | |
| continue | |
| tags.append(action_color.get(a, yellow)(a)) | |
| n_reviews = len(t["reviews"]) | |
| n_comments = len(t["comments"]) | |
| if n_reviews: | |
| states = ", ".join(sorted(set(t["reviews"]))) | |
| tags.append(dim(f"{n_reviews} review{'s' if n_reviews != 1 else ''} ({states})")) | |
| if n_comments: | |
| tags.append(dim(f"{n_comments} comment{'s' if n_comments != 1 else ''}")) | |
| tag_str = f" [{' · '.join(tags)}]" if tags else "" | |
| author = t["author"] | |
| author_str = f" {dim(f'by @{author}')}" if author and not t["mine"] else "" | |
| print(f" {bold(f'#{num}')} {title}{author_str}{tag_str}") | |
| for c in t["comments"]: | |
| if c: | |
| print(f" {dim('>')} {c}") | |
| def render_section(label, threads_): | |
| if not threads_: | |
| return | |
| print(bold(label)) | |
| by_repo: dict[str, list[dict]] = defaultdict(list) | |
| for t in threads_: | |
| by_repo[t["repo"]].append(t) | |
| for repo in sorted(by_repo): | |
| print(f" {cyan(repo)}") | |
| for t in sorted(by_repo[repo], key=lambda x: -(x["num"] or 0)): | |
| render_thread(t) | |
| print() | |
| if pushes: | |
| print(bold("Pushes")) | |
| by_repo_push: dict[str, list[tuple[str, list[str]]]] = defaultdict(list) | |
| for (repo, branch), msgs in pushes.items(): | |
| by_repo_push[repo].append((branch, msgs)) | |
| for repo in sorted(by_repo_push): | |
| print(f" {cyan(repo)}") | |
| for branch, msgs in by_repo_push[repo]: | |
| print(f" {yellow(branch or '?')} ({len(msgs)} commit{'s' if len(msgs) != 1 else ''})") | |
| for m in msgs[:5]: | |
| print(f" {dim('·')} {m}") | |
| if len(msgs) > 5: | |
| print(f" {dim(f'… and {len(msgs) - 5} more')}") | |
| print() | |
| render_section("Authored PRs / Issues", mine) | |
| render_section("Reviewed", reviewed) | |
| if misc: | |
| print(bold("Other")) | |
| for m in misc: | |
| print(f" {dim('·')} {m}") | |
| print() | |
| if not (pushes or mine or reviewed or misc): | |
| print(dim("No activity in window.")) | |
| def get_gh_cli_token() -> str | None: | |
| if not shutil.which("gh"): | |
| return None | |
| try: | |
| out = subprocess.run( | |
| ["gh", "auth", "token"], capture_output=True, text=True, check=True | |
| ) | |
| return out.stdout.strip() or None | |
| except subprocess.CalledProcessError: | |
| return None | |
| def main() -> int: | |
| args = parse_args() | |
| token = args.token or os.environ.get("GITHUB_TOKEN") or get_gh_cli_token() | |
| if not token: | |
| print("error: set GITHUB_TOKEN, pass --token, or run `gh auth login`", file=sys.stderr) | |
| return 2 | |
| since, until = resolve_window(args) | |
| gh = Github(auth=Auth.Token(token)) | |
| login = args.user or gh.get_user().login | |
| user = gh.get_user(login) # NamedUser — .get_events() returns events performed by this user | |
| print(bold(f"GitHub activity for @{login}")) | |
| print(dim(f"{since.strftime('%Y-%m-%d %H:%M %Z')} → {until.strftime('%Y-%m-%d %H:%M %Z')} ({until - since})")) | |
| print() | |
| summary = summarize(user.get_events(), since, until, gh=gh, me=login) | |
| render(summary) | |
| if not args.no_stale: | |
| render_stale(gh, login, args.stale_lookback_days, args.stale_threshold_days) | |
| return 0 | |
| def render_stale(gh: Github, login: str, lookback_days: int, threshold_days: float) -> None: | |
| now = datetime.now(timezone.utc) | |
| lookback = now - timedelta(days=lookback_days) | |
| threshold = now - timedelta(days=threshold_days) | |
| # Search: my open PRs created in lookback window, last updated before threshold. | |
| query = ( | |
| f"is:pr is:open author:{login} " | |
| f"created:>={lookback.strftime('%Y-%m-%d')} " | |
| f"updated:<{threshold.strftime('%Y-%m-%dT%H:%M:%SZ')}" | |
| ) | |
| try: | |
| results = list(gh.search_issues(query, sort="updated", order="asc")) | |
| except Exception as exc: | |
| print(dim(f"(stale-PR search failed: {exc})")) | |
| return | |
| if not results: | |
| return | |
| print(bold(f"Stale PRs (open, no activity in {threshold_days:g}d, created in last {lookback_days}d)")) | |
| by_repo: dict[str, list] = defaultdict(list) | |
| for issue in results: | |
| # repo full name from issue.repository_url ("https://api.github.com/repos/<owner>/<name>") | |
| repo_full = issue.repository.full_name if issue.repository else "?" | |
| by_repo[repo_full].append(issue) | |
| for repo in sorted(by_repo): | |
| print(f" {cyan(repo)}") | |
| for issue in by_repo[repo]: | |
| updated = issue.updated_at | |
| if updated.tzinfo is None: | |
| updated = updated.replace(tzinfo=timezone.utc) | |
| age = now - updated | |
| age_str = f"{age.days}d" if age.days else f"{age.seconds // 3600}h" | |
| label = "draft" if getattr(issue, "draft", False) else "open" | |
| color = yellow if label == "draft" else green | |
| print(f" {bold(f'#{issue.number}')} {issue.title} [{color(label)} · {dim(f'idle {age_str}')}]") | |
| print() | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment