Skip to content

Instantly share code, notes, and snippets.

@justinmklam
Last active August 28, 2025 02:23
Show Gist options
  • Save justinmklam/59a6c72d98ffc0a67948e254286114ae to your computer and use it in GitHub Desktop.
Save justinmklam/59a6c72d98ffc0a67948e254286114ae to your computer and use it in GitHub Desktop.
Script to migrate disqus comments to giscus. For more details, see https://www.justinmklam.com/posts/2025/08/replacing-disqus-with-giscus/
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "requests",
# "PyJWT",
# "cryptography"
# ]
# ///
#
"""
Disqus to GitHub Discussions Migration Tool
This script converts Disqus XML exports to GitHub Discussions while:
- Maintaining conversation threading (flattened to GitHub's 2-level limit)
- Grouping comments by blog post
- Using configurable discussion categories
- Supporting dry-run mode for testing with same formatting as real migration
- Providing idempotent operations with local state tracking for safe re-runs
- Detailed progress reporting and resumption after failures
- Supporting both personal access token and GitHub App authentication
"""
import xml.etree.ElementTree as ET
import requests
import json
from datetime import datetime, timezone, timedelta
import argparse
import os
import sys
import time
from typing import Dict, List, Optional, Any
from urllib.parse import urlparse
import re
from pathlib import Path
import jwt
import base64
class GitHubAppAuth:
"""Handles GitHub App authentication and token management"""
def __init__(self, app_id: str, private_key_path: str, installation_id: str):
self.app_id = app_id
self.private_key_path = private_key_path
self.installation_id = installation_id
self._access_token = None
self._token_expires_at = None
def _load_private_key(self) -> bytes:
"""Load the private key from file"""
try:
with open(self.private_key_path, "rb") as f:
return f.read()
except FileNotFoundError:
raise ValueError(f"Private key file not found: {self.private_key_path}")
except IOError as e:
raise ValueError(f"Failed to read private key file: {e}")
def _generate_jwt_token(self) -> str:
"""Generate a JWT token for GitHub App authentication"""
private_key = self._load_private_key()
# JWT payload
now = datetime.now(timezone.utc)
payload = {
"iat": now,
"exp": now + timedelta(minutes=10), # Maximum 10 minutes
"iss": self.app_id,
}
# Generate JWT
token = jwt.encode(payload, private_key, algorithm="RS256")
return token
def _get_installation_token(self) -> tuple[str, datetime]:
"""Get an installation access token"""
jwt_token = self._generate_jwt_token()
headers = {
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
url = f"https://api.github.com/app/installations/{self.installation_id}/access_tokens"
response = requests.post(url, headers=headers)
if response.status_code != 201:
raise Exception(
f"Failed to get installation token: {response.status_code} {response.text}"
)
data = response.json()
token = data["token"]
expires_at = datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00"))
return token, expires_at
def get_access_token(self) -> str:
"""Get a valid access token, refreshing if necessary"""
now = datetime.now(timezone.utc)
# If we don't have a token or it's expired/expiring soon, get a new one
if (
self._access_token is None
or self._token_expires_at is None
or now >= (self._token_expires_at - timedelta(minutes=5))
):
self._access_token, self._token_expires_at = self._get_installation_token()
return self._access_token
# Configuration
class MigrationState:
"""Tracks migration state for idempotent operations"""
def __init__(self, state_file: str = "migration_state.json"):
self.state_file = state_file
self.state = self._load_state()
def _load_state(self) -> Dict[str, Any]:
"""Load existing state from file"""
if os.path.exists(self.state_file):
try:
with open(self.state_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"⚠️ Warning: Could not load state file {self.state_file}: {e}")
print("⚠️ Starting with empty state")
return {
"discussions": {}, # thread_id -> {"discussion_id": "", "title": "", "created": ""}
"comments": {}, # disqus_comment_id -> {"github_id": "", "thread_id": "", "created": ""}
"migration_info": {
"last_run": None,
"repo_owner": None,
"repo_name": None,
"category_name": None,
},
}
def save_state(self):
"""Save current state to file"""
try:
# Create backup of existing state file
if os.path.exists(self.state_file):
backup_file = f"{self.state_file}.backup"
Path(self.state_file).rename(backup_file)
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(self.state, f, indent=2, ensure_ascii=False)
# Remove backup on successful save
backup_file = f"{self.state_file}.backup"
if os.path.exists(backup_file):
os.remove(backup_file)
except IOError as e:
print(f"⚠️ Warning: Could not save state file: {e}")
def is_discussion_created(self, thread_id: str) -> bool:
"""Check if discussion for thread already exists"""
return thread_id in self.state.get("discussions", {})
def get_discussion_id(self, thread_id: str) -> Optional[str]:
"""Get GitHub discussion ID for thread"""
discussion_info = self.state.get("discussions", {}).get(thread_id)
return discussion_info.get("discussion_id") if discussion_info else None
def record_discussion(self, thread_id: str, discussion_id: str, title: str):
"""Record that a discussion was created"""
if "discussions" not in self.state:
self.state["discussions"] = {}
self.state["discussions"][thread_id] = {
"discussion_id": discussion_id,
"title": title,
"created": datetime.now().isoformat(),
}
self.save_state()
def is_comment_created(self, disqus_comment_id: str) -> bool:
"""Check if comment was already posted"""
return disqus_comment_id in self.state.get("comments", {})
def get_github_comment_id(self, disqus_comment_id: str) -> Optional[str]:
"""Get GitHub comment ID for a Disqus comment"""
comment_info = self.state.get("comments", {}).get(disqus_comment_id)
return comment_info.get("github_id") if comment_info else None
def record_comment(
self, disqus_comment_id: str, github_comment_id: str, thread_id: str
):
"""Record that a comment was created"""
if "comments" not in self.state:
self.state["comments"] = {}
self.state["comments"][disqus_comment_id] = {
"github_id": github_comment_id,
"thread_id": thread_id,
"created": datetime.now().isoformat(),
}
self.save_state()
def update_migration_info(
self, repo_owner: str, repo_name: str, category_name: str
):
"""Update migration metadata"""
self.state["migration_info"] = {
"last_run": datetime.now().isoformat(),
"repo_owner": repo_owner,
"repo_name": repo_name,
"category_name": category_name,
}
self.save_state()
def get_stats(self) -> Dict[str, int]:
"""Get migration statistics"""
return {
"discussions_created": len(self.state.get("discussions", {})),
"comments_created": len(self.state.get("comments", {})),
}
class Config:
def __init__(
self,
repo_owner: str = "",
repo_name: str = "",
discussion_category_name: str = "Announcements",
# GitHub App authentication parameters
app_id: Optional[str] = None,
private_key_path: Optional[str] = None,
installation_id: Optional[str] = None,
):
# Authentication - try GitHub App first, then personal token
self.auth_method = None
self.github_app_auth = None
self.github_token = None
# Check for GitHub App authentication
if app_id and private_key_path and installation_id:
try:
self.github_app_auth = GitHubAppAuth(
app_id, private_key_path, installation_id
)
self.auth_method = "github_app"
print("πŸ”‘ Using GitHub App authentication")
except Exception as e:
print(f"⚠️ GitHub App authentication failed: {e}")
print("πŸ”„ Falling back to personal access token")
# Fallback to personal access token if GitHub App auth not available/working
if self.auth_method != "github_app":
self.github_token = os.getenv("GITHUB_TOKEN")
if self.github_token:
self.auth_method = "personal_token"
print("πŸ”‘ Using personal access token authentication")
self.repo_owner = repo_owner
self.repo_name = repo_name
self.discussion_category_name = discussion_category_name
self.discussion_category_id = "" # Will be resolved from name during migration
# GitHub API settings
self.api_base = "https://api.github.com"
# Rate limiting
self.rate_limit_delay = 1.0 # seconds between API calls
def get_auth_headers(self) -> Dict[str, str]:
"""Get authentication headers for API requests"""
headers = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
if self.auth_method == "github_app" and self.github_app_auth:
token = self.github_app_auth.get_access_token()
headers["Authorization"] = f"Bearer {token}"
elif self.auth_method == "personal_token" and self.github_token:
headers["Authorization"] = f"Bearer {self.github_token}"
else:
# This should not happen if validate() passed
raise ValueError("No valid authentication method available")
return headers
def validate(self, dry_run: bool = False):
"""Validate configuration"""
if not dry_run:
# Check authentication
if self.auth_method == "github_app":
if not self.github_app_auth:
raise ValueError(
"GitHub App authentication is not properly configured"
)
# Test that we can get a token
try:
self.github_app_auth.get_access_token()
except Exception as e:
raise ValueError(f"Failed to authenticate with GitHub App: {e}")
elif self.auth_method == "personal_token":
if not self.github_token:
raise ValueError("GITHUB_TOKEN environment variable is required")
else:
raise ValueError(
"No valid authentication method configured. Either set up GitHub App credentials or GITHUB_TOKEN environment variable."
)
if not self.repo_owner:
raise ValueError("Repository owner is required (use --repo-owner)")
if not self.repo_name:
raise ValueError("Repository name is required (use --repo-name)")
return True
class DisqusComment:
def __init__(
self,
comment_id: str,
parent_id: Optional[str],
author: str,
created: str,
message: str,
is_deleted: bool = False,
):
self.id = comment_id
self.parent_id = parent_id
self.author = author
self.created = created
self.message = message
self.is_deleted = is_deleted
self.replies: List["DisqusComment"] = []
def add_reply(self, reply: "DisqusComment"):
self.replies.append(reply)
def to_dict(self):
return {
"id": self.id,
"parent_id": self.parent_id,
"author": self.author,
"created": self.created,
"message": self.message,
"is_deleted": self.is_deleted,
"replies": [reply.to_dict() for reply in self.replies],
}
class DisqusThread:
def __init__(self, thread_id: str, title: str, link: str, created: str):
self.id = thread_id
self.title = title
self.link = link
self.created = created
self.comments: List[DisqusComment] = []
def add_comment(self, comment: DisqusComment):
self.comments.append(comment)
def build_comment_tree(self) -> List[DisqusComment]:
"""Build threaded comment structure"""
comment_map = {c.id: c for c in self.comments}
root_comments = []
for comment in self.comments:
if comment.parent_id and comment.parent_id in comment_map:
comment_map[comment.parent_id].add_reply(comment)
else:
root_comments.append(comment)
return root_comments
def to_dict(self):
return {
"id": self.id,
"title": self.title,
"link": self.link,
"created": self.created,
"comments": [c.to_dict() for c in self.build_comment_tree()],
}
class DisqusParser:
def __init__(self, xml_path: str):
self.xml_path = xml_path
self.namespace = {
"d": "http://disqus.com",
"dsq": "http://disqus.com/disqus-internals",
}
def parse(self) -> Dict[str, DisqusThread]:
"""Parse Disqus XML export"""
print(f"πŸ“– Parsing Disqus XML: {self.xml_path}")
try:
tree = ET.parse(self.xml_path)
root = tree.getroot()
except ET.ParseError as e:
raise ValueError(f"Failed to parse XML: {e}")
threads = self._parse_threads(root)
self._parse_posts(root, threads)
print(f"βœ… Parsed {len(threads)} threads with comments")
return threads
def _parse_threads(self, root) -> Dict[str, DisqusThread]:
"""Parse thread elements"""
threads = {}
for thread_elem in root.findall("d:thread", self.namespace):
thread_id = thread_elem.get("{http://disqus.com/disqus-internals}id")
if not thread_id:
continue
title_elem = thread_elem.find("d:title", self.namespace)
link_elem = thread_elem.find("d:link", self.namespace)
created_elem = thread_elem.find("d:createdAt", self.namespace)
# Skip threads without essential data
if not all([title_elem is not None, link_elem is not None]):
continue
title = title_elem.text or "Untitled"
link = link_elem.text or ""
created = created_elem.text if created_elem is not None else ""
threads[thread_id] = DisqusThread(thread_id, title, link, created)
return threads
def _parse_posts(self, root, threads: Dict[str, DisqusThread]):
"""Parse post/comment elements"""
for post_elem in root.findall("d:post", self.namespace):
# Skip deleted posts
is_deleted_elem = post_elem.find("d:isDeleted", self.namespace)
if is_deleted_elem is not None and is_deleted_elem.text == "true":
continue
# Skip spam
is_spam_elem = post_elem.find("d:isSpam", self.namespace)
if is_spam_elem is not None and is_spam_elem.text == "true":
continue
post_id = post_elem.get("{http://disqus.com/disqus-internals}id")
if not post_id:
continue
# Get thread ID
thread_elem = post_elem.find("d:thread", self.namespace)
if thread_elem is None:
continue
thread_id = thread_elem.get("{http://disqus.com/disqus-internals}id")
if thread_id not in threads:
continue
# Get parent ID (for threading)
parent_elem = post_elem.find("d:parent", self.namespace)
parent_id = (
parent_elem.get("{http://disqus.com/disqus-internals}id")
if parent_elem is not None
else None
)
# Get comment data
author_elem = post_elem.find("d:author/d:name", self.namespace)
created_elem = post_elem.find("d:createdAt", self.namespace)
message_elem = post_elem.find("d:message", self.namespace)
author = author_elem.text if author_elem is not None else "Anonymous"
created = created_elem.text if created_elem is not None else ""
message = message_elem.text if message_elem is not None else ""
# Clean up HTML in message
message = self._clean_html(message)
comment = DisqusComment(post_id, parent_id, author, created, message)
threads[thread_id].add_comment(comment)
def _clean_html(self, html_content: str) -> str:
"""Convert HTML content to Markdown-friendly format"""
if not html_content:
return ""
# Remove CDATA wrapper
content = re.sub(r"<!\[CDATA\[(.*?)\]\]>", r"\1", html_content, flags=re.DOTALL)
# Convert basic HTML tags to Markdown
content = re.sub(r"<p>(.*?)</p>", r"\1\n\n", content, flags=re.DOTALL)
content = re.sub(r"<br\s*/?>", "\n", content)
content = re.sub(r"<strong>(.*?)</strong>", r"**\1**", content)
content = re.sub(r"<b>(.*?)</b>", r"**\1**", content)
content = re.sub(r"<em>(.*?)</em>", r"*\1*", content)
content = re.sub(r"<i>(.*?)</i>", r"*\1*", content)
content = re.sub(r"<code>(.*?)</code>", r"`\1`", content)
# Handle anchor tags with various attributes - extract href and text content only
content = re.sub(
r'<a\s+[^>]*href="([^"]*)"[^>]*>(.*?)</a>',
r"[\2](\1)",
content,
flags=re.IGNORECASE,
)
# Remove remaining HTML tags
content = re.sub(r"<[^>]+>", "", content)
# Clean up whitespace
content = re.sub(r"\n\s*\n\s*\n", "\n\n", content)
content = content.strip()
return content
class GitHubDiscussionsAPI:
def __init__(self, config: Config):
self.config = config
self.graphql_url = "https://api.github.com/graphql"
def _graphql_request(self, query: str, variables: Dict[str, Any]) -> Dict[str, Any]:
"""Make a GraphQL request to GitHub API"""
payload = {"query": query, "variables": variables}
headers = self.config.get_auth_headers()
headers["Content-Type"] = "application/json"
response = requests.post(self.graphql_url, headers=headers, json=payload)
if response.status_code != 200:
raise Exception(
f"GraphQL request failed: {response.status_code} {response.text}"
)
result = response.json()
if "errors" in result:
raise Exception(f"GraphQL errors: {result['errors']}")
return result.get("data", {})
def get_repository_info(self) -> Dict[str, Any]:
"""Get repository ID and discussion categories"""
query = """
query($owner: String!, $name: String!) {
repository(owner: $owner, name: $name) {
id
discussionCategories(first: 20) {
nodes {
id
name
}
}
}
}
"""
variables = {"owner": self.config.repo_owner, "name": self.config.repo_name}
data = self._graphql_request(query, variables)
repo_data = data.get("repository", {})
repo_id = repo_data.get("id")
categories = repo_data.get("discussionCategories", {}).get("nodes", [])
return {"repo_id": repo_id, "categories": categories}
def create_discussion(
self, repo_id: str, category_id: str, title: str, body: str
) -> str:
"""Create a new discussion and return its ID"""
query = """
mutation($repoId: ID!, $categoryId: ID!, $title: String!, $body: String!) {
createDiscussion(input: {repositoryId: $repoId, categoryId: $categoryId, title: $title, body: $body}) {
discussion {
id
number
}
}
}
"""
variables = {
"repoId": repo_id,
"categoryId": category_id,
"title": title,
"body": body,
}
data = self._graphql_request(query, variables)
discussion = data.get("createDiscussion", {}).get("discussion", {})
return discussion.get("id")
def add_discussion_comment(
self, discussion_id: str, body: str, reply_to_id: Optional[str] = None
) -> str:
"""Add a comment to a discussion"""
if reply_to_id:
query = """
mutation($discussionId: ID!, $body: String!, $replyToId: ID!) {
addDiscussionComment(input: {discussionId: $discussionId, body: $body, replyToId: $replyToId}) {
comment {
id
}
}
}
"""
variables = {
"discussionId": discussion_id,
"body": body,
"replyToId": reply_to_id,
}
else:
query = """
mutation($discussionId: ID!, $body: String!) {
addDiscussionComment(input: {discussionId: $discussionId, body: $body}) {
comment {
id
}
}
}
"""
variables = {"discussionId": discussion_id, "body": body}
data = self._graphql_request(query, variables)
comment = data.get("addDiscussionComment", {}).get("comment", {})
return comment.get("id")
class MigrationFormatter:
@staticmethod
def format_discussion_title(thread: DisqusThread) -> str:
"""Format discussion title from URL path"""
if not thread.link:
return thread.title or "Untitled"
try:
# Extract path from URL and remove leading/trailing slashes
path = urlparse(thread.link).path.strip("/")
# If we have a meaningful path, use it
if path:
return path
else:
# Fallback to original title
return thread.title or "Untitled"
except:
# If URL parsing fails, use original title
return thread.title or "Untitled"
@staticmethod
def format_discussion_body(thread: DisqusThread) -> str:
"""Format main discussion body"""
body = f"**Comments migrated from Disqus**\n\n"
body += f"Original post: {thread.link}\n\n"
if thread.created:
try:
created_date = datetime.fromisoformat(
thread.created.replace("Z", "+00:00")
)
body += f"Thread created: {created_date.strftime('%B %d, %Y')}\n\n"
except:
pass
body += "---\n\n"
return body
@staticmethod
def format_comment_for_github(
comment: DisqusComment, reply_context: str = ""
) -> str:
"""Format a single comment for GitHub (without threading info)"""
# Format timestamp
timestamp = ""
if comment.created:
try:
created_date = datetime.fromisoformat(
comment.created.replace("Z", "+00:00")
)
timestamp = created_date.strftime("%B %-d, %Y at %-I:%M %p UTC")
except:
timestamp = comment.created
# Build comment
formatted = f"*Originally posted by* **{comment.author}**"
if timestamp:
formatted += f" β€’ *{timestamp}*"
# Add reply context if this is a nested reply
if reply_context:
formatted += f" β€’ *{reply_context}*"
formatted += "\n\n"
if comment.message:
formatted += comment.message + "\n"
else:
formatted += "*No message content*\n"
return formatted
class DisqusToGiscusMigrator:
def __init__(self, config: Config, state_file: str = "migration_state.json"):
self.config = config
self.api = GitHubDiscussionsAPI(config)
self.state = MigrationState(state_file)
def migrate(
self,
xml_path: str,
dry_run: bool = True,
output_file: str = "migration_preview.md",
):
"""Main migration function"""
print("πŸš€ Starting Disqus to GitHub Discussions migration")
# Parse Disqus data
parser = DisqusParser(xml_path)
threads = parser.parse()
if not threads:
print("❌ No threads found to migrate")
return
if dry_run:
self._dry_run(threads, output_file)
else:
self._real_migration(threads)
def _dry_run(self, threads: Dict[str, DisqusThread], output_file: str):
"""Perform dry run and generate preview using same formatting as GitHub migration"""
print(f"πŸ” Performing dry run - generating preview to {output_file}")
# Show existing state
stats = self.state.get_stats()
if stats["discussions_created"] > 0 or stats["comments_created"] > 0:
print(
f"πŸ“Š Existing state: {stats['discussions_created']} discussions, {stats['comments_created']} comments already created"
)
# Filter out threads with no comments
threads_with_comments = {
tid: thread for tid, thread in threads.items() if thread.comments
}
skipped_count = len(threads) - len(threads_with_comments)
# Count what would be skipped vs created
already_created = sum(
1
for tid in threads_with_comments.keys()
if self.state.is_discussion_created(tid)
)
to_create = len(threads_with_comments) - already_created
with open(output_file, "w", encoding="utf-8") as f:
f.write("# Disqus to GitHub Discussions Migration Preview\n\n")
f.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"Total threads found: {len(threads)}\n")
f.write(f"Threads with comments: {len(threads_with_comments)}\n")
f.write(f"Already migrated: {already_created}\n")
f.write(f"Will be created: {to_create}\n")
if skipped_count > 0:
f.write(f"Threads skipped (no comments): {skipped_count}\n")
f.write("\n---\n\n")
for thread_id, thread in threads_with_comments.items():
title = MigrationFormatter.format_discussion_title(thread)
body = MigrationFormatter.format_discussion_body(thread)
# Mark if already created
status = (
"βœ… ALREADY MIGRATED"
if self.state.is_discussion_created(thread_id)
else "πŸ†• TO BE CREATED"
)
f.write(f"## Discussion: {title} ({status})\n\n")
f.write(body)
# Process comments using same logic as real migration
root_comments = thread.build_comment_tree()
total_root_comments = len(root_comments)
total_replies = sum(
len(self._flatten_replies(c)) for c in root_comments
)
total_comments = total_root_comments + total_replies
# Count already created comments for this thread
created_comments = sum(
1 for c in thread.comments if self.state.is_comment_created(c.id)
)
remaining_comments = total_comments - created_comments
f.write(
f"**{total_comments} comments total** ({total_root_comments} root comments, {total_replies} replies)\n"
)
if created_comments > 0:
f.write(
f"**{created_comments} already created, {remaining_comments} remaining**\n"
)
f.write("\n")
# Show how comments would be formatted for GitHub
for comment in root_comments:
comment_status = (
"βœ… CREATED"
if self.state.is_comment_created(comment.id)
else "πŸ†• TO CREATE"
)
f.write(f"### Root Comment ({comment_status}):\n")
f.write(MigrationFormatter.format_comment_for_github(comment))
f.write("\n")
# Show flattened replies
all_replies = self._flatten_replies(comment)
for reply in all_replies:
reply_context = ""
if reply.parent_id != comment.id:
# This was originally a nested reply, add context
original_parent = (
self._find_comment_by_id(
thread.comments, reply.parent_id
)
if reply.parent_id
else None
)
if original_parent:
reply_context = f"Replying to {original_parent.author}"
reply_status = (
"βœ… CREATED"
if self.state.is_comment_created(reply.id)
else "πŸ†• TO CREATE"
)
f.write(f"### Reply ({reply_status}):\n")
f.write(
MigrationFormatter.format_comment_for_github(
reply, reply_context
)
)
f.write("\n")
f.write("---\n\n")
f.write("=" * 80 + "\n\n")
print(f"βœ… Dry run complete! Preview saved to {output_file}")
print(
f"πŸ“Š Summary: {to_create} new threads ready for migration, {already_created} already exist"
)
if skipped_count > 0:
print(f"🚫 Skipped {skipped_count} threads with no comments")
def _real_migration(self, threads: Dict[str, DisqusThread]):
"""Perform actual migration to GitHub with state tracking"""
print("🚨 Starting REAL migration to GitHub Discussions")
# Filter out threads with no comments
threads_with_comments = {
tid: thread for tid, thread in threads.items() if thread.comments
}
skipped_count = len(threads) - len(threads_with_comments)
if skipped_count > 0:
print(f"🚫 Skipping {skipped_count} threads with no comments")
if not threads_with_comments:
print("❌ No threads with comments found to migrate")
return
# Show existing state
stats = self.state.get_stats()
already_created = sum(
1
for tid in threads_with_comments.keys()
if self.state.is_discussion_created(tid)
)
to_create = len(threads_with_comments) - already_created
print(
f"πŸ“Š Migration state: {already_created} discussions already exist, {to_create} to be created"
)
if stats["comments_created"] > 0:
print(
f"πŸ“Š {stats['comments_created']} comments already created from previous runs"
)
# Get repository info and categories
print("πŸ” Fetching repository information...")
try:
repo_info = self.api.get_repository_info()
repo_id = repo_info.get("repo_id")
categories = repo_info.get("categories", [])
if not repo_id:
print("❌ Repository not found or access denied")
return
# Find the discussion category by name
category_id = None
for cat in categories:
if (
cat.get("name", "").lower()
== self.config.discussion_category_name.lower()
):
category_id = cat.get("id")
print(
f"βœ… Found '{self.config.discussion_category_name}' category: {category_id}"
)
break
if not category_id:
print(
f"❌ No '{self.config.discussion_category_name}' category found. Available categories:"
)
for cat in categories:
print(f" - {cat.get('name')} (ID: {cat.get('id')})")
return
# Store the resolved category ID and update migration info
self.config.discussion_category_id = category_id
self.state.update_migration_info(
self.config.repo_owner,
self.config.repo_name,
self.config.discussion_category_name,
)
except Exception as e:
print(f"❌ Failed to fetch repository information: {e}")
return
success_count = 0
error_count = 0
skipped_discussions = 0
for thread_id, thread in threads_with_comments.items():
try:
title = MigrationFormatter.format_discussion_title(thread)
# Check if discussion already exists
discussion_id = None
if self.state.is_discussion_created(thread_id):
discussion_id = self.state.get_discussion_id(thread_id)
print(
f"⏭️ Skipping discussion '{title}' - already exists (ID: {discussion_id})"
)
skipped_discussions += 1
else:
print(f"\nπŸ“ Creating discussion: {title}")
# Create discussion
body = MigrationFormatter.format_discussion_body(thread)
discussion_id = self.api.create_discussion(
repo_id=repo_id,
category_id=self.config.discussion_category_id,
title=title,
body=body,
)
print(f"βœ… Created discussion with ID: {discussion_id}")
# Record the discussion creation
self.state.record_discussion(thread_id, discussion_id, title)
# Ensure we have a discussion_id before processing comments
if not discussion_id:
print(f"❌ No discussion ID available for {title}")
continue
# Process comments for this discussion
print(f"πŸ“ Processing comments for: {title}")
comment_count = 0
skipped_comments = 0
root_comments = thread.build_comment_tree()
for root_comment in root_comments:
try:
# Check if root comment already exists
if self.state.is_comment_created(root_comment.id):
print(
f"⏭️ Skipping root comment by {root_comment.author} - already exists"
)
skipped_comments += 1
else:
# Add the root comment
root_body = MigrationFormatter.format_comment_for_github(
root_comment
)
root_github_id = self.api.add_discussion_comment(
discussion_id=discussion_id,
body=root_body,
reply_to_id=None,
)
comment_count += 1
time.sleep(self.config.rate_limit_delay)
# Record the comment creation
self.state.record_comment(
root_comment.id, root_github_id, thread_id
)
print(f"βœ… Added root comment by {root_comment.author}")
# Get the GitHub ID for replies (whether just created or from state)
root_github_id = self.state.get_github_comment_id(
root_comment.id
)
if root_github_id:
# We have the GitHub ID (either just created or from previous run)
# Add all replies as direct replies to the root comment
all_replies = self._flatten_replies(root_comment)
for reply in all_replies:
try:
if self.state.is_comment_created(reply.id):
print(
f"⏭️ Skipping reply by {reply.author} - already exists"
)
skipped_comments += 1
continue
reply_context = ""
if reply.parent_id != root_comment.id:
# This was originally a nested reply, add context
original_parent = (
self._find_comment_by_id(
thread.comments, reply.parent_id
)
if reply.parent_id
else None
)
if original_parent:
reply_context = (
f"Replying to {original_parent.author}"
)
reply_body = (
MigrationFormatter.format_comment_for_github(
reply, reply_context
)
)
reply_github_id = self.api.add_discussion_comment(
discussion_id=discussion_id,
body=reply_body,
reply_to_id=root_github_id,
)
comment_count += 1
time.sleep(self.config.rate_limit_delay)
# Record the reply creation
self.state.record_comment(
reply.id, reply_github_id, thread_id
)
print(f"βœ… Added reply by {reply.author}")
except Exception as e:
print(
f"⚠️ Failed to add reply by {reply.author}: {e}"
)
continue
else:
print(
f"⚠️ No GitHub ID found for root comment by {root_comment.author}, skipping replies"
)
except Exception as e:
print(
f"⚠️ Failed to add root comment by {root_comment.author}: {e}"
)
continue
if comment_count > 0:
print(f"βœ… Added {comment_count} new comments")
if skipped_comments > 0:
print(f"⏭️ Skipped {skipped_comments} existing comments")
success_count += 1
except Exception as e:
print(f"❌ Error migrating {thread.title}: {e}")
error_count += 1
print(f"\nπŸŽ‰ Migration complete!")
print(f"βœ… Successful: {success_count}")
print(f"⏭️ Skipped (already exist): {skipped_discussions}")
print(f"❌ Errors: {error_count}")
# Show final stats
final_stats = self.state.get_stats()
print(
f"πŸ“Š Total state: {final_stats['discussions_created']} discussions, {final_stats['comments_created']} comments"
)
def _flatten_replies(self, comment: DisqusComment) -> List[DisqusComment]:
"""Flatten all replies recursively into a single list"""
replies = []
for reply in comment.replies:
replies.append(reply)
replies.extend(self._flatten_replies(reply))
return replies
def _find_comment_by_id(
self, comments: List[DisqusComment], comment_id: str
) -> Optional[DisqusComment]:
"""Find a comment by its ID"""
for comment in comments:
if comment.id == comment_id:
return comment
return None
def main():
parser = argparse.ArgumentParser(
description="Migrate Disqus comments to GitHub Discussions",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Environment Variables:
Authentication (choose one):
GITHUB_TOKEN Personal access token for GitHub API
OR for GitHub App authentication:
GITHUB_APP_ID GitHub App ID
GITHUB_APP_PRIVATE_KEY_PATH Path to GitHub App private key file
GITHUB_APP_INSTALLATION_ID GitHub App installation ID
State Tracking:
The script maintains a local state file (migration_state.json by default) to track
which discussions and comments have been successfully created. This makes the script
idempotent - you can safely re-run it after failures and it will resume where it
left off without creating duplicates.
Authentication Methods:
1. Personal Access Token (original method):
- Set GITHUB_TOKEN environment variable
- Token needs 'repo' and 'write:discussion' scopes
2. GitHub App (recommended for organizations):
- Create a GitHub App with discussions:write permission
- Install the app on your repository/organization
- Provide --app-id, --private-key-path, --installation-id
- Or set corresponding environment variables
Examples:
# Dry run (recommended first)
python disqus_to_giscus.py export.xml --dry-run
# Real migration with personal access token
export GITHUB_TOKEN="your_token_here"
python disqus_to_giscus.py export.xml --repo-owner myusername --repo-name myrepo
# Real migration with GitHub App
python disqus_to_giscus.py export.xml --repo-owner myusername --repo-name myrepo \\
--app-id 123456 --private-key-path /path/to/private-key.pem --installation-id 789012
# With custom category name
python disqus_to_giscus.py export.xml --repo-owner myusername --repo-name myrepo --category-name "General"
# Custom output file for dry run
python disqus_to_giscus.py export.xml --dry-run --output custom_preview.md
# Custom state file location
python disqus_to_giscus.py export.xml --repo-owner myusername --repo-name myrepo --state-file my_migration.json
""",
)
parser.add_argument("xml_file", help="Path to Disqus XML export file")
parser.add_argument(
"--repo-owner",
help="GitHub repository owner/organization name (required for real migration)",
)
parser.add_argument(
"--repo-name", help="GitHub repository name (required for real migration)"
)
parser.add_argument(
"--category-name",
default="Announcements",
help="GitHub Discussion category name (default: Announcements)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview migration without posting to GitHub (recommended)",
)
parser.add_argument(
"--output",
default="migration_preview.md",
help="Output file for dry run preview (default: migration_preview.md)",
)
parser.add_argument(
"--state-file",
default="migration_state.json",
help="State file for tracking migration progress (default: migration_state.json)",
)
# GitHub App authentication arguments
github_app_group = parser.add_argument_group(
"GitHub App Authentication",
"Use GitHub App for authentication instead of personal access token",
)
github_app_group.add_argument(
"--app-id",
help="GitHub App ID (can also be set via GITHUB_APP_ID environment variable)",
)
github_app_group.add_argument(
"--private-key-path",
help="Path to GitHub App private key file (can also be set via GITHUB_APP_PRIVATE_KEY_PATH environment variable)",
)
github_app_group.add_argument(
"--installation-id",
help="GitHub App installation ID (can also be set via GITHUB_APP_INSTALLATION_ID environment variable)",
)
args = parser.parse_args()
# Validate required arguments for non-dry-run mode
if not args.dry_run:
if not args.repo_owner:
parser.error("--repo-owner is required for real migration")
if not args.repo_name:
parser.error("--repo-name is required for real migration")
try:
# Get GitHub App credentials from args or environment variables
app_id = args.app_id or os.getenv("GITHUB_APP_ID")
private_key_path = args.private_key_path or os.getenv(
"GITHUB_APP_PRIVATE_KEY_PATH"
)
installation_id = args.installation_id or os.getenv(
"GITHUB_APP_INSTALLATION_ID"
)
# Initialize configuration
config = Config(
repo_owner=args.repo_owner or "",
repo_name=args.repo_name or "",
discussion_category_name=args.category_name,
app_id=app_id,
private_key_path=private_key_path,
installation_id=installation_id,
)
config.validate(dry_run=args.dry_run)
# Run migration
migrator = DisqusToGiscusMigrator(config, state_file=args.state_file)
migrator.migrate(args.xml_file, dry_run=args.dry_run, output_file=args.output)
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment