Created
August 30, 2025 18:55
-
-
Save logickoder/8c696bbb990bad6ad28db3d5ea18bd25 to your computer and use it in GitHub Desktop.
Transfer issues from one repo to another
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| import json | |
| import os | |
| from datetime import datetime | |
| # Configuration | |
| SOURCE_REPO = "logickoder/source" | |
| DEST_REPO = "logickoder/destination" | |
| GITHUB_TOKEN = "{GITHUB_TOKEN}" | |
| HEADERS = { | |
| "Authorization": f"token {GITHUB_TOKEN}", | |
| "Accept": "application/vnd.github+json" | |
| } | |
| # State tracking | |
| STATE_FILE = "transfer_state.json" | |
| def load_state(): | |
| """Load existing transfer state or create new one""" | |
| if os.path.exists(STATE_FILE): | |
| with open(STATE_FILE, 'r') as f: | |
| return json.load(f) | |
| return { | |
| "completed_issues": {}, # original_number -> new_number | |
| "completed_prs": {}, # original_number -> new_number | |
| "failed_items": [], # list of items that failed | |
| "last_run": None, | |
| "source_repo": SOURCE_REPO, | |
| "dest_repo": DEST_REPO | |
| } | |
| def save_state(state): | |
| """Save current state to file""" | |
| state["last_run"] = datetime.now().isoformat() | |
| with open(STATE_FILE, 'w') as f: | |
| json.dump(state, f, indent=2) | |
| def is_already_processed(item_number, item_type, state): | |
| """Check if item was already processed""" | |
| if item_type == "issue": | |
| return str(item_number) in state["completed_issues"] | |
| else: # PR | |
| return str(item_number) in state["completed_prs"] | |
| def mark_completed(item_number, new_number, item_type, state): | |
| """Mark item as completed""" | |
| if item_type == "issue": | |
| state["completed_issues"][str(item_number)] = new_number | |
| else: # PR | |
| state["completed_prs"][str(item_number)] = new_number | |
| save_state(state) | |
| def mark_failed(item_number, item_type, error, state): | |
| """Mark item as failed""" | |
| state["failed_items"].append({ | |
| "number": item_number, | |
| "type": item_type, | |
| "error": str(error), | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| save_state(state) | |
| def get_issues(): | |
| issues = [] | |
| page = 1 | |
| while True: | |
| url = f"https://api.github.com/repos/{SOURCE_REPO}/issues?state=all&per_page=100&page={page}" | |
| response = requests.get(url, headers=HEADERS) | |
| if response.status_code != 200: | |
| print(f"Error fetching issues: {response.status_code} {response.text}") | |
| break | |
| page_issues = response.json() | |
| issues.extend(page_issues) | |
| if len(page_issues) < 100: | |
| break | |
| page += 1 | |
| return issues | |
| def truncate_body_if_needed(body, max_length=65000): | |
| """Truncate body if it exceeds GitHub's limit, preserving metadata header""" | |
| if len(body) <= max_length: | |
| return body | |
| # Find the metadata section end (look for the --- separator) | |
| metadata_end = body.find('\n---\n') | |
| if metadata_end == -1: | |
| # No metadata section found, just truncate | |
| return body[:max_length-100] + "\n\n**[Content truncated due to length limit]**" | |
| metadata_section = body[:metadata_end + 5] # Include the --- separator | |
| content_section = body[metadata_end + 5:] | |
| # Calculate remaining space for content | |
| remaining_space = max_length - len(metadata_section) - 100 # Reserve space for truncation message | |
| if remaining_space <= 0: | |
| # Metadata itself is too long, truncate it | |
| return body[:max_length-100] + "\n\n**[Content truncated due to length limit]**" | |
| if len(content_section) > remaining_space: | |
| truncated_content = content_section[:remaining_space] | |
| return metadata_section + truncated_content + "\n\n**[Content truncated due to length limit]**" | |
| return body | |
| def filter_valid_assignees(assignees, dest_repo): | |
| """Filter out assignees that may not be valid in the destination repository""" | |
| valid_assignees = [] | |
| for assignee in assignees: | |
| # Skip organization names that might not be valid individual assignees | |
| if assignee.lower() in ['some-random-assignee-in-old-repo-not-in-new']: | |
| continue | |
| valid_assignees.append(assignee) | |
| return valid_assignees | |
| def create_issue(issue, state): | |
| url = f"https://api.github.com/repos/{DEST_REPO}/issues" | |
| # Create enhanced body with issue metadata | |
| original_body = issue["body"] or "" | |
| # Safely extract issue metadata with fallbacks | |
| author = "Unknown" | |
| created_date = "Unknown" | |
| try: | |
| if issue.get('user') and issue['user'].get('login'): | |
| author = issue['user']['login'] | |
| except (KeyError, TypeError): | |
| pass | |
| try: | |
| if issue.get('created_at'): | |
| created_date = issue['created_at'] | |
| except (KeyError, TypeError): | |
| pass | |
| issue_info = f"""**[TRANSFERRED FROM ORIGINAL REPOSITORY]** | |
| Original Issue: #{issue['number']} | |
| Author: @{author} | |
| Created: {created_date} | |
| Status: {issue['state'].upper()} | |
| --- | |
| {original_body}""" | |
| data = { | |
| "title": issue["title"], | |
| "body": truncate_body_if_needed(issue_info), | |
| "labels": [label["name"] for label in issue.get("labels", [])], | |
| "assignees": filter_valid_assignees([assignee["login"] for assignee in issue.get("assignees", [])], DEST_REPO) | |
| } | |
| if issue.get("milestone"): | |
| data["milestone"] = issue["milestone"]["number"] | |
| try: | |
| response = requests.post(url, headers=HEADERS, json=data) | |
| if response.status_code == 201: | |
| created_issue = response.json() | |
| # Close the issue if original was closed | |
| if issue["state"] == "closed": | |
| close_url = f"https://api.github.com/repos/{DEST_REPO}/issues/{created_issue['number']}" | |
| close_data = {"state": "closed"} | |
| requests.patch(close_url, headers=HEADERS, json=close_data) | |
| return created_issue["number"] | |
| else: | |
| error_msg = f"HTTP {response.status_code}: {response.text}" | |
| mark_failed(issue['number'], "issue", error_msg, state) | |
| print(f"Error creating issue {issue['number']}: {error_msg}") | |
| return None | |
| except Exception as e: | |
| mark_failed(issue['number'], "issue", str(e), state) | |
| print(f"Exception creating issue {issue['number']}: {e}") | |
| return None | |
| def transfer_comments(issue_number, new_issue_number): | |
| url = f"https://api.github.com/repos/{SOURCE_REPO}/issues/{issue_number}/comments" | |
| response = requests.get(url, headers=HEADERS) | |
| if response.status_code == 200: | |
| for comment in response.json(): | |
| comment_data = {"body": comment["body"]} | |
| comment_response = requests.post( | |
| f"https://api.github.com/repos/{DEST_REPO}/issues/{new_issue_number}/comments", | |
| headers=HEADERS, | |
| json=comment_data | |
| ) | |
| if comment_response.status_code != 201: | |
| print(f"Error creating comment: {comment_response.status_code}") | |
| def create_issue_from_pr(pr, state): | |
| """Convert a pull request to an issue with additional context""" | |
| url = f"https://api.github.com/repos/{DEST_REPO}/issues" | |
| # Create enhanced body with PR context | |
| original_body = pr["body"] or "" | |
| # Safely extract PR metadata with fallbacks | |
| source_branch = "Unknown" | |
| target_branch = "Unknown" | |
| author = "Unknown" | |
| try: | |
| if pr.get('head') and pr['head'].get('ref'): | |
| source_branch = pr['head']['ref'] | |
| except (KeyError, TypeError): | |
| pass | |
| try: | |
| if pr.get('base') and pr['base'].get('ref'): | |
| target_branch = pr['base']['ref'] | |
| except (KeyError, TypeError): | |
| pass | |
| try: | |
| if pr.get('user') and pr['user'].get('login'): | |
| author = pr['user']['login'] | |
| except (KeyError, TypeError): | |
| pass | |
| pr_info = f"""**[CONVERTED FROM PULL REQUEST]** | |
| Original PR: #{pr['number']} | |
| Source Branch: `{source_branch}` | |
| Target Branch: `{target_branch}` | |
| Author: @{author} | |
| Status: {pr['state'].upper()} | |
| --- | |
| {original_body}""" | |
| data = { | |
| "title": f"[PR] {pr['title']}", | |
| "body": truncate_body_if_needed(pr_info), | |
| "labels": [label["name"] for label in pr.get("labels", [])] + ["converted-from-pr"], | |
| "assignees": filter_valid_assignees([assignee["login"] for assignee in pr.get("assignees", [])], DEST_REPO) | |
| } | |
| try: | |
| response = requests.post(url, headers=HEADERS, json=data) | |
| if response.status_code == 201: | |
| created_issue = response.json() | |
| # Close if original PR was closed/merged | |
| if pr["state"] in ["closed", "merged"]: | |
| close_url = f"https://api.github.com/repos/{DEST_REPO}/issues/{created_issue['number']}" | |
| close_data = {"state": "closed"} | |
| requests.patch(close_url, headers=HEADERS, json=close_data) | |
| return created_issue["number"] | |
| else: | |
| error_msg = f"HTTP {response.status_code}: {response.text}" | |
| mark_failed(pr['number'], "pr", error_msg, state) | |
| print(f"Error creating issue from PR {pr['number']}: {error_msg}") | |
| return None | |
| except Exception as e: | |
| mark_failed(pr['number'], "pr", str(e), state) | |
| print(f"Exception creating issue from PR {pr['number']}: {e}") | |
| return None | |
| def main(): | |
| # Load existing state | |
| state = load_state() | |
| # Validate we're working with the same repositories | |
| if (state.get("source_repo") != SOURCE_REPO or | |
| state.get("dest_repo") != DEST_REPO): | |
| print("Warning: Repository configuration has changed!") | |
| print(f"Previous: {state.get('source_repo')} -> {state.get('dest_repo')}") | |
| print(f"Current: {SOURCE_REPO} -> {DEST_REPO}") | |
| response = input("Continue anyway? (y/N): ") | |
| if response.lower() != 'y': | |
| return | |
| issues = get_issues() | |
| print(f"Found {len(issues)} issues/PRs to transfer") | |
| # Show previous progress | |
| completed_issues = len(state["completed_issues"]) | |
| completed_prs = len(state["completed_prs"]) | |
| failed_items = len(state["failed_items"]) | |
| if state["last_run"]: | |
| print(f"Previous run: {state['last_run']}") | |
| print(f"Already completed: {completed_issues} issues, {completed_prs} PRs") | |
| if failed_items > 0: | |
| print(f"Previous failures: {failed_items}") | |
| regular_issues = [] | |
| pull_requests = [] | |
| for issue in issues: | |
| if 'pull_request' in issue: | |
| pull_requests.append(issue) | |
| else: | |
| regular_issues.append(issue) | |
| print(f"Total to process: {len(regular_issues)} issues, {len(pull_requests)} PRs") | |
| # Filter out already completed items | |
| pending_issues = [i for i in regular_issues | |
| if not is_already_processed(i['number'], "issue", state)] | |
| pending_prs = [pr for pr in pull_requests | |
| if not is_already_processed(pr['number'], "pr", state)] | |
| print(f"Remaining: {len(pending_issues)} issues, {len(pending_prs)} PRs") | |
| if len(pending_issues) == 0 and len(pending_prs) == 0: | |
| print("All items already processed!") | |
| return | |
| # Process remaining regular issues | |
| for i, issue in enumerate(pending_issues, 1): | |
| print(f"[{i}/{len(pending_issues)}] Transferring issue #{issue['number']}: {issue['title']}") | |
| new_issue_number = create_issue(issue, state) | |
| if new_issue_number: | |
| transfer_comments(issue["number"], new_issue_number) | |
| mark_completed(issue['number'], new_issue_number, "issue", state) | |
| print(f" -> Created as issue #{new_issue_number}") | |
| # Process remaining PRs | |
| for i, pr in enumerate(pending_prs, 1): | |
| print(f"[{i}/{len(pending_prs)}] Converting PR #{pr['number']}: {pr['title']}") | |
| new_issue_number = create_issue_from_pr(pr, state) | |
| if new_issue_number: | |
| transfer_comments(pr["number"], new_issue_number) | |
| mark_completed(pr['number'], new_issue_number, "pr", state) | |
| print(f" -> Created as issue #{new_issue_number} (converted from PR)") | |
| # Final summary | |
| print(f"\n=== Transfer Complete ===") | |
| print(f"Total completed: {len(state['completed_issues'])} issues, {len(state['completed_prs'])} PRs") | |
| if state["failed_items"]: | |
| print(f"Failed items: {len(state['failed_items'])}") | |
| print("Check transfer_state.json for failure details") | |
| print(f"State saved to: {STATE_FILE}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment