Created
February 21, 2025 15:32
-
-
Save ccerv1/690acbc5c7eadbaff93891290512fd4f to your computer and use it in GitHub Desktop.
Identify all developers who have contributed to a set of repos
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import csv | |
| import re | |
| import json | |
| from datetime import datetime, timedelta | |
| from githubkit import GitHub | |
| from githubkit.exception import RequestFailed | |
| GITHUB_TOKEN = "" # add your token here | |
| github = GitHub(GITHUB_TOKEN) | |
| repos = [] # add your list of repos here | |
| # Path to the JSON file used to store progress | |
| progress_file = "progress.json" | |
| # We'll store progress as: | |
| # { | |
| # "processed_repos": ["owner1/repo1", "owner2/repo2", ...], | |
| # "user_contributions": { | |
| # "username": { | |
| # "commit_count": int, | |
| # "repos": [list of "owner/repo"], | |
| # "last_commit": "YYYY-MM-DDTHH:MM:SSZ" | |
| # }, | |
| # ... | |
| # } | |
| # } | |
| user_contributions = {} | |
| processed_repos = [] | |
| # Load previous progress if available | |
| if os.path.exists(progress_file): | |
| print("Loading progress from JSON...") | |
| with open(progress_file, "r", encoding="utf-8") as f: | |
| progress_data = json.load(f) | |
| processed_repos = progress_data.get("processed_repos", []) | |
| loaded_contrib = progress_data.get("user_contributions", {}) | |
| # Convert loaded data: repos lists to sets and last_commit string to datetime. | |
| for username, data in loaded_contrib.items(): | |
| try: | |
| last_commit_dt = datetime.strptime(data["last_commit"], "%Y-%m-%dT%H:%M:%SZ") | |
| except Exception as e: | |
| last_commit_dt = datetime.min | |
| user_contributions[username] = { | |
| "commit_count": data["commit_count"], | |
| "repos": set(data["repos"]), | |
| "last_commit": last_commit_dt | |
| } | |
| print(f"Resuming. {len(processed_repos)} repos were already processed.") | |
| # Define the cutoff date (eg, 6 months ago) | |
| cutoff_date = datetime.utcnow() - timedelta(days=180) | |
| def parse_repo_url(url): | |
| """Extracts the owner and repository name from a GitHub URL.""" | |
| match = re.match(r'https://github\.com/([^/]+)/([^/]+)', url) | |
| if match: | |
| return match.group(1), match.group(2) | |
| return None, None | |
| def save_progress(): | |
| """Save the user contributions and processed repos to JSON.""" | |
| serializable_contrib = {} | |
| for username, data in user_contributions.items(): | |
| serializable_contrib[username] = { | |
| "commit_count": data["commit_count"], | |
| "repos": list(data["repos"]), | |
| "last_commit": data["last_commit"].strftime("%Y-%m-%dT%H:%M:%SZ") | |
| } | |
| progress_data = { | |
| "processed_repos": processed_repos, | |
| "user_contributions": serializable_contrib | |
| } | |
| with open(progress_file, "w", encoding="utf-8") as f: | |
| json.dump(progress_data, f, indent=2) | |
| print("Progress saved to JSON.") | |
| # Process each repository | |
| for repo_url in repos: | |
| owner, repo_name = parse_repo_url(repo_url) | |
| if not owner: | |
| print(f"Could not parse repo URL: {repo_url}") | |
| continue | |
| repo_identifier = f"{owner}/{repo_name}" | |
| if repo_identifier in processed_repos: | |
| print(f"Skipping {repo_identifier} (already processed).") | |
| continue | |
| print(f"Processing repository: {repo_identifier}") | |
| try: | |
| # Retrieve all commits using GitHubKit's pagination. | |
| commits = list(github.paginate(github.rest.repos.list_commits, owner=owner, repo=repo_name)) | |
| except RequestFailed as exception: | |
| if exception.response.status_code == 404: | |
| print(f"Repository {repo_identifier} not found, skipping.") | |
| processed_repos.append(repo_identifier) | |
| save_progress() | |
| continue | |
| else: | |
| raise exception | |
| print(f"Fetched a total of {len(commits)} commits for {repo_identifier}.") | |
| # Process each commit in the repository | |
| for commit in commits: | |
| # Check if there's an associated GitHub user | |
| if commit.author is None or commit.author.login is None: | |
| # Log a warning and skip the commit | |
| print(f"Could not find username associated with commit: url='{commit.commit.url}' sha='{commit.commit.tree.sha}'") | |
| continue | |
| username = commit.author.login | |
| # Get the commit date using attribute access. | |
| commit_date_str = commit.commit.author.date # e.g., "2023-07-15T12:34:56Z" | |
| try: | |
| commit_date = datetime.strptime(commit_date_str, "%Y-%m-%dT%H:%M:%SZ") | |
| except Exception as e: | |
| print(f"Error parsing date '{commit_date_str}' for commit in {repo_identifier}: {e}") | |
| continue | |
| # Initialize data for new users. | |
| if username not in user_contributions: | |
| user_contributions[username] = { | |
| 'commit_count': 0, | |
| 'repos': set(), | |
| 'last_commit': commit_date | |
| } | |
| # Increment the commit count and record the repository. | |
| user_contributions[username]['commit_count'] += 1 | |
| user_contributions[username]['repos'].add(repo_identifier) | |
| # Update the last commit date if this commit is more recent. | |
| if commit_date > user_contributions[username]['last_commit']: | |
| user_contributions[username]['last_commit'] = commit_date | |
| # Mark this repo as processed and save progress. | |
| processed_repos.append(repo_identifier) | |
| save_progress() | |
| # After processing all repos, filter developers by some simple heuristics | |
| qualified_devs = { | |
| username: data for username, data in user_contributions.items() | |
| if data['commit_count'] > 20 and data['last_commit'] >= cutoff_date | |
| } | |
| print(f"Found {len(qualified_devs)} qualified developers.") | |
| # Write results to a CSV file. | |
| output_file = "qualified_developers.csv" | |
| with open(output_file, 'w', newline='', encoding='utf-8') as csvfile: | |
| csv_writer = csv.writer(csvfile) | |
| csv_writer.writerow(["GitHub Username", "Repos Contributed"]) | |
| for username, data in qualified_devs.items(): | |
| # Convert the set of repositories to a sorted, comma-separated string. | |
| repo_list = sorted(list(data['repos'])) | |
| csv_writer.writerow([username, ",".join(repo_list)]) | |
| print(f"CSV file '{output_file}' has been created.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment