Skip to content

Instantly share code, notes, and snippets.

@logickoder
Created August 30, 2025 18:55
Show Gist options
  • Select an option

  • Save logickoder/8c696bbb990bad6ad28db3d5ea18bd25 to your computer and use it in GitHub Desktop.

Select an option

Save logickoder/8c696bbb990bad6ad28db3d5ea18bd25 to your computer and use it in GitHub Desktop.
Transfer issues from one repo to another
import requests
import json
import os
from datetime import datetime
# Configuration
SOURCE_REPO = "logickoder/source"
DEST_REPO = "logickoder/destination"
GITHUB_TOKEN = "{GITHUB_TOKEN}"
HEADERS = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github+json"
}
# State tracking
STATE_FILE = "transfer_state.json"
def load_state():
"""Load existing transfer state or create new one"""
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
return json.load(f)
return {
"completed_issues": {}, # original_number -> new_number
"completed_prs": {}, # original_number -> new_number
"failed_items": [], # list of items that failed
"last_run": None,
"source_repo": SOURCE_REPO,
"dest_repo": DEST_REPO
}
def save_state(state):
"""Save current state to file"""
state["last_run"] = datetime.now().isoformat()
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def is_already_processed(item_number, item_type, state):
"""Check if item was already processed"""
if item_type == "issue":
return str(item_number) in state["completed_issues"]
else: # PR
return str(item_number) in state["completed_prs"]
def mark_completed(item_number, new_number, item_type, state):
"""Mark item as completed"""
if item_type == "issue":
state["completed_issues"][str(item_number)] = new_number
else: # PR
state["completed_prs"][str(item_number)] = new_number
save_state(state)
def mark_failed(item_number, item_type, error, state):
"""Mark item as failed"""
state["failed_items"].append({
"number": item_number,
"type": item_type,
"error": str(error),
"timestamp": datetime.now().isoformat()
})
save_state(state)
def get_issues():
issues = []
page = 1
while True:
url = f"https://api.github.com/repos/{SOURCE_REPO}/issues?state=all&per_page=100&page={page}"
response = requests.get(url, headers=HEADERS)
if response.status_code != 200:
print(f"Error fetching issues: {response.status_code} {response.text}")
break
page_issues = response.json()
issues.extend(page_issues)
if len(page_issues) < 100:
break
page += 1
return issues
def truncate_body_if_needed(body, max_length=65000):
"""Truncate body if it exceeds GitHub's limit, preserving metadata header"""
if len(body) <= max_length:
return body
# Find the metadata section end (look for the --- separator)
metadata_end = body.find('\n---\n')
if metadata_end == -1:
# No metadata section found, just truncate
return body[:max_length-100] + "\n\n**[Content truncated due to length limit]**"
metadata_section = body[:metadata_end + 5] # Include the --- separator
content_section = body[metadata_end + 5:]
# Calculate remaining space for content
remaining_space = max_length - len(metadata_section) - 100 # Reserve space for truncation message
if remaining_space <= 0:
# Metadata itself is too long, truncate it
return body[:max_length-100] + "\n\n**[Content truncated due to length limit]**"
if len(content_section) > remaining_space:
truncated_content = content_section[:remaining_space]
return metadata_section + truncated_content + "\n\n**[Content truncated due to length limit]**"
return body
def filter_valid_assignees(assignees, dest_repo):
"""Filter out assignees that may not be valid in the destination repository"""
valid_assignees = []
for assignee in assignees:
# Skip organization names that might not be valid individual assignees
if assignee.lower() in ['some-random-assignee-in-old-repo-not-in-new']:
continue
valid_assignees.append(assignee)
return valid_assignees
def create_issue(issue, state):
url = f"https://api.github.com/repos/{DEST_REPO}/issues"
# Create enhanced body with issue metadata
original_body = issue["body"] or ""
# Safely extract issue metadata with fallbacks
author = "Unknown"
created_date = "Unknown"
try:
if issue.get('user') and issue['user'].get('login'):
author = issue['user']['login']
except (KeyError, TypeError):
pass
try:
if issue.get('created_at'):
created_date = issue['created_at']
except (KeyError, TypeError):
pass
issue_info = f"""**[TRANSFERRED FROM ORIGINAL REPOSITORY]**
Original Issue: #{issue['number']}
Author: @{author}
Created: {created_date}
Status: {issue['state'].upper()}
---
{original_body}"""
data = {
"title": issue["title"],
"body": truncate_body_if_needed(issue_info),
"labels": [label["name"] for label in issue.get("labels", [])],
"assignees": filter_valid_assignees([assignee["login"] for assignee in issue.get("assignees", [])], DEST_REPO)
}
if issue.get("milestone"):
data["milestone"] = issue["milestone"]["number"]
try:
response = requests.post(url, headers=HEADERS, json=data)
if response.status_code == 201:
created_issue = response.json()
# Close the issue if original was closed
if issue["state"] == "closed":
close_url = f"https://api.github.com/repos/{DEST_REPO}/issues/{created_issue['number']}"
close_data = {"state": "closed"}
requests.patch(close_url, headers=HEADERS, json=close_data)
return created_issue["number"]
else:
error_msg = f"HTTP {response.status_code}: {response.text}"
mark_failed(issue['number'], "issue", error_msg, state)
print(f"Error creating issue {issue['number']}: {error_msg}")
return None
except Exception as e:
mark_failed(issue['number'], "issue", str(e), state)
print(f"Exception creating issue {issue['number']}: {e}")
return None
def transfer_comments(issue_number, new_issue_number):
url = f"https://api.github.com/repos/{SOURCE_REPO}/issues/{issue_number}/comments"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
for comment in response.json():
comment_data = {"body": comment["body"]}
comment_response = requests.post(
f"https://api.github.com/repos/{DEST_REPO}/issues/{new_issue_number}/comments",
headers=HEADERS,
json=comment_data
)
if comment_response.status_code != 201:
print(f"Error creating comment: {comment_response.status_code}")
def create_issue_from_pr(pr, state):
"""Convert a pull request to an issue with additional context"""
url = f"https://api.github.com/repos/{DEST_REPO}/issues"
# Create enhanced body with PR context
original_body = pr["body"] or ""
# Safely extract PR metadata with fallbacks
source_branch = "Unknown"
target_branch = "Unknown"
author = "Unknown"
try:
if pr.get('head') and pr['head'].get('ref'):
source_branch = pr['head']['ref']
except (KeyError, TypeError):
pass
try:
if pr.get('base') and pr['base'].get('ref'):
target_branch = pr['base']['ref']
except (KeyError, TypeError):
pass
try:
if pr.get('user') and pr['user'].get('login'):
author = pr['user']['login']
except (KeyError, TypeError):
pass
pr_info = f"""**[CONVERTED FROM PULL REQUEST]**
Original PR: #{pr['number']}
Source Branch: `{source_branch}`
Target Branch: `{target_branch}`
Author: @{author}
Status: {pr['state'].upper()}
---
{original_body}"""
data = {
"title": f"[PR] {pr['title']}",
"body": truncate_body_if_needed(pr_info),
"labels": [label["name"] for label in pr.get("labels", [])] + ["converted-from-pr"],
"assignees": filter_valid_assignees([assignee["login"] for assignee in pr.get("assignees", [])], DEST_REPO)
}
try:
response = requests.post(url, headers=HEADERS, json=data)
if response.status_code == 201:
created_issue = response.json()
# Close if original PR was closed/merged
if pr["state"] in ["closed", "merged"]:
close_url = f"https://api.github.com/repos/{DEST_REPO}/issues/{created_issue['number']}"
close_data = {"state": "closed"}
requests.patch(close_url, headers=HEADERS, json=close_data)
return created_issue["number"]
else:
error_msg = f"HTTP {response.status_code}: {response.text}"
mark_failed(pr['number'], "pr", error_msg, state)
print(f"Error creating issue from PR {pr['number']}: {error_msg}")
return None
except Exception as e:
mark_failed(pr['number'], "pr", str(e), state)
print(f"Exception creating issue from PR {pr['number']}: {e}")
return None
def main():
# Load existing state
state = load_state()
# Validate we're working with the same repositories
if (state.get("source_repo") != SOURCE_REPO or
state.get("dest_repo") != DEST_REPO):
print("Warning: Repository configuration has changed!")
print(f"Previous: {state.get('source_repo')} -> {state.get('dest_repo')}")
print(f"Current: {SOURCE_REPO} -> {DEST_REPO}")
response = input("Continue anyway? (y/N): ")
if response.lower() != 'y':
return
issues = get_issues()
print(f"Found {len(issues)} issues/PRs to transfer")
# Show previous progress
completed_issues = len(state["completed_issues"])
completed_prs = len(state["completed_prs"])
failed_items = len(state["failed_items"])
if state["last_run"]:
print(f"Previous run: {state['last_run']}")
print(f"Already completed: {completed_issues} issues, {completed_prs} PRs")
if failed_items > 0:
print(f"Previous failures: {failed_items}")
regular_issues = []
pull_requests = []
for issue in issues:
if 'pull_request' in issue:
pull_requests.append(issue)
else:
regular_issues.append(issue)
print(f"Total to process: {len(regular_issues)} issues, {len(pull_requests)} PRs")
# Filter out already completed items
pending_issues = [i for i in regular_issues
if not is_already_processed(i['number'], "issue", state)]
pending_prs = [pr for pr in pull_requests
if not is_already_processed(pr['number'], "pr", state)]
print(f"Remaining: {len(pending_issues)} issues, {len(pending_prs)} PRs")
if len(pending_issues) == 0 and len(pending_prs) == 0:
print("All items already processed!")
return
# Process remaining regular issues
for i, issue in enumerate(pending_issues, 1):
print(f"[{i}/{len(pending_issues)}] Transferring issue #{issue['number']}: {issue['title']}")
new_issue_number = create_issue(issue, state)
if new_issue_number:
transfer_comments(issue["number"], new_issue_number)
mark_completed(issue['number'], new_issue_number, "issue", state)
print(f" -> Created as issue #{new_issue_number}")
# Process remaining PRs
for i, pr in enumerate(pending_prs, 1):
print(f"[{i}/{len(pending_prs)}] Converting PR #{pr['number']}: {pr['title']}")
new_issue_number = create_issue_from_pr(pr, state)
if new_issue_number:
transfer_comments(pr["number"], new_issue_number)
mark_completed(pr['number'], new_issue_number, "pr", state)
print(f" -> Created as issue #{new_issue_number} (converted from PR)")
# Final summary
print(f"\n=== Transfer Complete ===")
print(f"Total completed: {len(state['completed_issues'])} issues, {len(state['completed_prs'])} PRs")
if state["failed_items"]:
print(f"Failed items: {len(state['failed_items'])}")
print("Check transfer_state.json for failure details")
print(f"State saved to: {STATE_FILE}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment