Last active
August 28, 2025 02:23
-
-
Save justinmklam/59a6c72d98ffc0a67948e254286114ae to your computer and use it in GitHub Desktop.
Script to migrate disqus comments to giscus. For more details, see https://www.justinmklam.com/posts/2025/08/replacing-disqus-with-giscus/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.9" | |
| # dependencies = [ | |
| # "requests", | |
| # "PyJWT", | |
| # "cryptography" | |
| # ] | |
| # /// | |
| # | |
| """ | |
| Disqus to GitHub Discussions Migration Tool | |
| This script converts Disqus XML exports to GitHub Discussions while: | |
| - Maintaining conversation threading (flattened to GitHub's 2-level limit) | |
| - Grouping comments by blog post | |
| - Using configurable discussion categories | |
| - Supporting dry-run mode for testing with same formatting as real migration | |
| - Providing idempotent operations with local state tracking for safe re-runs | |
| - Detailed progress reporting and resumption after failures | |
| - Supporting both personal access token and GitHub App authentication | |
| """ | |
| import xml.etree.ElementTree as ET | |
| import requests | |
| import json | |
| from datetime import datetime, timezone, timedelta | |
| import argparse | |
| import os | |
| import sys | |
| import time | |
| from typing import Dict, List, Optional, Any | |
| from urllib.parse import urlparse | |
| import re | |
| from pathlib import Path | |
| import jwt | |
| import base64 | |
| class GitHubAppAuth: | |
| """Handles GitHub App authentication and token management""" | |
| def __init__(self, app_id: str, private_key_path: str, installation_id: str): | |
| self.app_id = app_id | |
| self.private_key_path = private_key_path | |
| self.installation_id = installation_id | |
| self._access_token = None | |
| self._token_expires_at = None | |
| def _load_private_key(self) -> bytes: | |
| """Load the private key from file""" | |
| try: | |
| with open(self.private_key_path, "rb") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| raise ValueError(f"Private key file not found: {self.private_key_path}") | |
| except IOError as e: | |
| raise ValueError(f"Failed to read private key file: {e}") | |
| def _generate_jwt_token(self) -> str: | |
| """Generate a JWT token for GitHub App authentication""" | |
| private_key = self._load_private_key() | |
| # JWT payload | |
| now = datetime.now(timezone.utc) | |
| payload = { | |
| "iat": now, | |
| "exp": now + timedelta(minutes=10), # Maximum 10 minutes | |
| "iss": self.app_id, | |
| } | |
| # Generate JWT | |
| token = jwt.encode(payload, private_key, algorithm="RS256") | |
| return token | |
| def _get_installation_token(self) -> tuple[str, datetime]: | |
| """Get an installation access token""" | |
| jwt_token = self._generate_jwt_token() | |
| headers = { | |
| "Authorization": f"Bearer {jwt_token}", | |
| "Accept": "application/vnd.github+json", | |
| "X-GitHub-Api-Version": "2022-11-28", | |
| } | |
| url = f"https://api.github.com/app/installations/{self.installation_id}/access_tokens" | |
| response = requests.post(url, headers=headers) | |
| if response.status_code != 201: | |
| raise Exception( | |
| f"Failed to get installation token: {response.status_code} {response.text}" | |
| ) | |
| data = response.json() | |
| token = data["token"] | |
| expires_at = datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")) | |
| return token, expires_at | |
| def get_access_token(self) -> str: | |
| """Get a valid access token, refreshing if necessary""" | |
| now = datetime.now(timezone.utc) | |
| # If we don't have a token or it's expired/expiring soon, get a new one | |
| if ( | |
| self._access_token is None | |
| or self._token_expires_at is None | |
| or now >= (self._token_expires_at - timedelta(minutes=5)) | |
| ): | |
| self._access_token, self._token_expires_at = self._get_installation_token() | |
| return self._access_token | |
| # Configuration | |
| class MigrationState: | |
| """Tracks migration state for idempotent operations""" | |
| def __init__(self, state_file: str = "migration_state.json"): | |
| self.state_file = state_file | |
| self.state = self._load_state() | |
| def _load_state(self) -> Dict[str, Any]: | |
| """Load existing state from file""" | |
| if os.path.exists(self.state_file): | |
| try: | |
| with open(self.state_file, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, IOError) as e: | |
| print(f"β οΈ Warning: Could not load state file {self.state_file}: {e}") | |
| print("β οΈ Starting with empty state") | |
| return { | |
| "discussions": {}, # thread_id -> {"discussion_id": "", "title": "", "created": ""} | |
| "comments": {}, # disqus_comment_id -> {"github_id": "", "thread_id": "", "created": ""} | |
| "migration_info": { | |
| "last_run": None, | |
| "repo_owner": None, | |
| "repo_name": None, | |
| "category_name": None, | |
| }, | |
| } | |
| def save_state(self): | |
| """Save current state to file""" | |
| try: | |
| # Create backup of existing state file | |
| if os.path.exists(self.state_file): | |
| backup_file = f"{self.state_file}.backup" | |
| Path(self.state_file).rename(backup_file) | |
| with open(self.state_file, "w", encoding="utf-8") as f: | |
| json.dump(self.state, f, indent=2, ensure_ascii=False) | |
| # Remove backup on successful save | |
| backup_file = f"{self.state_file}.backup" | |
| if os.path.exists(backup_file): | |
| os.remove(backup_file) | |
| except IOError as e: | |
| print(f"β οΈ Warning: Could not save state file: {e}") | |
| def is_discussion_created(self, thread_id: str) -> bool: | |
| """Check if discussion for thread already exists""" | |
| return thread_id in self.state.get("discussions", {}) | |
| def get_discussion_id(self, thread_id: str) -> Optional[str]: | |
| """Get GitHub discussion ID for thread""" | |
| discussion_info = self.state.get("discussions", {}).get(thread_id) | |
| return discussion_info.get("discussion_id") if discussion_info else None | |
| def record_discussion(self, thread_id: str, discussion_id: str, title: str): | |
| """Record that a discussion was created""" | |
| if "discussions" not in self.state: | |
| self.state["discussions"] = {} | |
| self.state["discussions"][thread_id] = { | |
| "discussion_id": discussion_id, | |
| "title": title, | |
| "created": datetime.now().isoformat(), | |
| } | |
| self.save_state() | |
| def is_comment_created(self, disqus_comment_id: str) -> bool: | |
| """Check if comment was already posted""" | |
| return disqus_comment_id in self.state.get("comments", {}) | |
| def get_github_comment_id(self, disqus_comment_id: str) -> Optional[str]: | |
| """Get GitHub comment ID for a Disqus comment""" | |
| comment_info = self.state.get("comments", {}).get(disqus_comment_id) | |
| return comment_info.get("github_id") if comment_info else None | |
| def record_comment( | |
| self, disqus_comment_id: str, github_comment_id: str, thread_id: str | |
| ): | |
| """Record that a comment was created""" | |
| if "comments" not in self.state: | |
| self.state["comments"] = {} | |
| self.state["comments"][disqus_comment_id] = { | |
| "github_id": github_comment_id, | |
| "thread_id": thread_id, | |
| "created": datetime.now().isoformat(), | |
| } | |
| self.save_state() | |
| def update_migration_info( | |
| self, repo_owner: str, repo_name: str, category_name: str | |
| ): | |
| """Update migration metadata""" | |
| self.state["migration_info"] = { | |
| "last_run": datetime.now().isoformat(), | |
| "repo_owner": repo_owner, | |
| "repo_name": repo_name, | |
| "category_name": category_name, | |
| } | |
| self.save_state() | |
| def get_stats(self) -> Dict[str, int]: | |
| """Get migration statistics""" | |
| return { | |
| "discussions_created": len(self.state.get("discussions", {})), | |
| "comments_created": len(self.state.get("comments", {})), | |
| } | |
| class Config: | |
| def __init__( | |
| self, | |
| repo_owner: str = "", | |
| repo_name: str = "", | |
| discussion_category_name: str = "Announcements", | |
| # GitHub App authentication parameters | |
| app_id: Optional[str] = None, | |
| private_key_path: Optional[str] = None, | |
| installation_id: Optional[str] = None, | |
| ): | |
| # Authentication - try GitHub App first, then personal token | |
| self.auth_method = None | |
| self.github_app_auth = None | |
| self.github_token = None | |
| # Check for GitHub App authentication | |
| if app_id and private_key_path and installation_id: | |
| try: | |
| self.github_app_auth = GitHubAppAuth( | |
| app_id, private_key_path, installation_id | |
| ) | |
| self.auth_method = "github_app" | |
| print("π Using GitHub App authentication") | |
| except Exception as e: | |
| print(f"β οΈ GitHub App authentication failed: {e}") | |
| print("π Falling back to personal access token") | |
| # Fallback to personal access token if GitHub App auth not available/working | |
| if self.auth_method != "github_app": | |
| self.github_token = os.getenv("GITHUB_TOKEN") | |
| if self.github_token: | |
| self.auth_method = "personal_token" | |
| print("π Using personal access token authentication") | |
| self.repo_owner = repo_owner | |
| self.repo_name = repo_name | |
| self.discussion_category_name = discussion_category_name | |
| self.discussion_category_id = "" # Will be resolved from name during migration | |
| # GitHub API settings | |
| self.api_base = "https://api.github.com" | |
| # Rate limiting | |
| self.rate_limit_delay = 1.0 # seconds between API calls | |
| def get_auth_headers(self) -> Dict[str, str]: | |
| """Get authentication headers for API requests""" | |
| headers = { | |
| "Accept": "application/vnd.github+json", | |
| "X-GitHub-Api-Version": "2022-11-28", | |
| } | |
| if self.auth_method == "github_app" and self.github_app_auth: | |
| token = self.github_app_auth.get_access_token() | |
| headers["Authorization"] = f"Bearer {token}" | |
| elif self.auth_method == "personal_token" and self.github_token: | |
| headers["Authorization"] = f"Bearer {self.github_token}" | |
| else: | |
| # This should not happen if validate() passed | |
| raise ValueError("No valid authentication method available") | |
| return headers | |
| def validate(self, dry_run: bool = False): | |
| """Validate configuration""" | |
| if not dry_run: | |
| # Check authentication | |
| if self.auth_method == "github_app": | |
| if not self.github_app_auth: | |
| raise ValueError( | |
| "GitHub App authentication is not properly configured" | |
| ) | |
| # Test that we can get a token | |
| try: | |
| self.github_app_auth.get_access_token() | |
| except Exception as e: | |
| raise ValueError(f"Failed to authenticate with GitHub App: {e}") | |
| elif self.auth_method == "personal_token": | |
| if not self.github_token: | |
| raise ValueError("GITHUB_TOKEN environment variable is required") | |
| else: | |
| raise ValueError( | |
| "No valid authentication method configured. Either set up GitHub App credentials or GITHUB_TOKEN environment variable." | |
| ) | |
| if not self.repo_owner: | |
| raise ValueError("Repository owner is required (use --repo-owner)") | |
| if not self.repo_name: | |
| raise ValueError("Repository name is required (use --repo-name)") | |
| return True | |
| class DisqusComment: | |
| def __init__( | |
| self, | |
| comment_id: str, | |
| parent_id: Optional[str], | |
| author: str, | |
| created: str, | |
| message: str, | |
| is_deleted: bool = False, | |
| ): | |
| self.id = comment_id | |
| self.parent_id = parent_id | |
| self.author = author | |
| self.created = created | |
| self.message = message | |
| self.is_deleted = is_deleted | |
| self.replies: List["DisqusComment"] = [] | |
| def add_reply(self, reply: "DisqusComment"): | |
| self.replies.append(reply) | |
| def to_dict(self): | |
| return { | |
| "id": self.id, | |
| "parent_id": self.parent_id, | |
| "author": self.author, | |
| "created": self.created, | |
| "message": self.message, | |
| "is_deleted": self.is_deleted, | |
| "replies": [reply.to_dict() for reply in self.replies], | |
| } | |
| class DisqusThread: | |
| def __init__(self, thread_id: str, title: str, link: str, created: str): | |
| self.id = thread_id | |
| self.title = title | |
| self.link = link | |
| self.created = created | |
| self.comments: List[DisqusComment] = [] | |
| def add_comment(self, comment: DisqusComment): | |
| self.comments.append(comment) | |
| def build_comment_tree(self) -> List[DisqusComment]: | |
| """Build threaded comment structure""" | |
| comment_map = {c.id: c for c in self.comments} | |
| root_comments = [] | |
| for comment in self.comments: | |
| if comment.parent_id and comment.parent_id in comment_map: | |
| comment_map[comment.parent_id].add_reply(comment) | |
| else: | |
| root_comments.append(comment) | |
| return root_comments | |
| def to_dict(self): | |
| return { | |
| "id": self.id, | |
| "title": self.title, | |
| "link": self.link, | |
| "created": self.created, | |
| "comments": [c.to_dict() for c in self.build_comment_tree()], | |
| } | |
| class DisqusParser: | |
| def __init__(self, xml_path: str): | |
| self.xml_path = xml_path | |
| self.namespace = { | |
| "d": "http://disqus.com", | |
| "dsq": "http://disqus.com/disqus-internals", | |
| } | |
| def parse(self) -> Dict[str, DisqusThread]: | |
| """Parse Disqus XML export""" | |
| print(f"π Parsing Disqus XML: {self.xml_path}") | |
| try: | |
| tree = ET.parse(self.xml_path) | |
| root = tree.getroot() | |
| except ET.ParseError as e: | |
| raise ValueError(f"Failed to parse XML: {e}") | |
| threads = self._parse_threads(root) | |
| self._parse_posts(root, threads) | |
| print(f"β Parsed {len(threads)} threads with comments") | |
| return threads | |
| def _parse_threads(self, root) -> Dict[str, DisqusThread]: | |
| """Parse thread elements""" | |
| threads = {} | |
| for thread_elem in root.findall("d:thread", self.namespace): | |
| thread_id = thread_elem.get("{http://disqus.com/disqus-internals}id") | |
| if not thread_id: | |
| continue | |
| title_elem = thread_elem.find("d:title", self.namespace) | |
| link_elem = thread_elem.find("d:link", self.namespace) | |
| created_elem = thread_elem.find("d:createdAt", self.namespace) | |
| # Skip threads without essential data | |
| if not all([title_elem is not None, link_elem is not None]): | |
| continue | |
| title = title_elem.text or "Untitled" | |
| link = link_elem.text or "" | |
| created = created_elem.text if created_elem is not None else "" | |
| threads[thread_id] = DisqusThread(thread_id, title, link, created) | |
| return threads | |
| def _parse_posts(self, root, threads: Dict[str, DisqusThread]): | |
| """Parse post/comment elements""" | |
| for post_elem in root.findall("d:post", self.namespace): | |
| # Skip deleted posts | |
| is_deleted_elem = post_elem.find("d:isDeleted", self.namespace) | |
| if is_deleted_elem is not None and is_deleted_elem.text == "true": | |
| continue | |
| # Skip spam | |
| is_spam_elem = post_elem.find("d:isSpam", self.namespace) | |
| if is_spam_elem is not None and is_spam_elem.text == "true": | |
| continue | |
| post_id = post_elem.get("{http://disqus.com/disqus-internals}id") | |
| if not post_id: | |
| continue | |
| # Get thread ID | |
| thread_elem = post_elem.find("d:thread", self.namespace) | |
| if thread_elem is None: | |
| continue | |
| thread_id = thread_elem.get("{http://disqus.com/disqus-internals}id") | |
| if thread_id not in threads: | |
| continue | |
| # Get parent ID (for threading) | |
| parent_elem = post_elem.find("d:parent", self.namespace) | |
| parent_id = ( | |
| parent_elem.get("{http://disqus.com/disqus-internals}id") | |
| if parent_elem is not None | |
| else None | |
| ) | |
| # Get comment data | |
| author_elem = post_elem.find("d:author/d:name", self.namespace) | |
| created_elem = post_elem.find("d:createdAt", self.namespace) | |
| message_elem = post_elem.find("d:message", self.namespace) | |
| author = author_elem.text if author_elem is not None else "Anonymous" | |
| created = created_elem.text if created_elem is not None else "" | |
| message = message_elem.text if message_elem is not None else "" | |
| # Clean up HTML in message | |
| message = self._clean_html(message) | |
| comment = DisqusComment(post_id, parent_id, author, created, message) | |
| threads[thread_id].add_comment(comment) | |
| def _clean_html(self, html_content: str) -> str: | |
| """Convert HTML content to Markdown-friendly format""" | |
| if not html_content: | |
| return "" | |
| # Remove CDATA wrapper | |
| content = re.sub(r"<!\[CDATA\[(.*?)\]\]>", r"\1", html_content, flags=re.DOTALL) | |
| # Convert basic HTML tags to Markdown | |
| content = re.sub(r"<p>(.*?)</p>", r"\1\n\n", content, flags=re.DOTALL) | |
| content = re.sub(r"<br\s*/?>", "\n", content) | |
| content = re.sub(r"<strong>(.*?)</strong>", r"**\1**", content) | |
| content = re.sub(r"<b>(.*?)</b>", r"**\1**", content) | |
| content = re.sub(r"<em>(.*?)</em>", r"*\1*", content) | |
| content = re.sub(r"<i>(.*?)</i>", r"*\1*", content) | |
| content = re.sub(r"<code>(.*?)</code>", r"`\1`", content) | |
| # Handle anchor tags with various attributes - extract href and text content only | |
| content = re.sub( | |
| r'<a\s+[^>]*href="([^"]*)"[^>]*>(.*?)</a>', | |
| r"[\2](\1)", | |
| content, | |
| flags=re.IGNORECASE, | |
| ) | |
| # Remove remaining HTML tags | |
| content = re.sub(r"<[^>]+>", "", content) | |
| # Clean up whitespace | |
| content = re.sub(r"\n\s*\n\s*\n", "\n\n", content) | |
| content = content.strip() | |
| return content | |
| class GitHubDiscussionsAPI: | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.graphql_url = "https://api.github.com/graphql" | |
| def _graphql_request(self, query: str, variables: Dict[str, Any]) -> Dict[str, Any]: | |
| """Make a GraphQL request to GitHub API""" | |
| payload = {"query": query, "variables": variables} | |
| headers = self.config.get_auth_headers() | |
| headers["Content-Type"] = "application/json" | |
| response = requests.post(self.graphql_url, headers=headers, json=payload) | |
| if response.status_code != 200: | |
| raise Exception( | |
| f"GraphQL request failed: {response.status_code} {response.text}" | |
| ) | |
| result = response.json() | |
| if "errors" in result: | |
| raise Exception(f"GraphQL errors: {result['errors']}") | |
| return result.get("data", {}) | |
| def get_repository_info(self) -> Dict[str, Any]: | |
| """Get repository ID and discussion categories""" | |
| query = """ | |
| query($owner: String!, $name: String!) { | |
| repository(owner: $owner, name: $name) { | |
| id | |
| discussionCategories(first: 20) { | |
| nodes { | |
| id | |
| name | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| variables = {"owner": self.config.repo_owner, "name": self.config.repo_name} | |
| data = self._graphql_request(query, variables) | |
| repo_data = data.get("repository", {}) | |
| repo_id = repo_data.get("id") | |
| categories = repo_data.get("discussionCategories", {}).get("nodes", []) | |
| return {"repo_id": repo_id, "categories": categories} | |
| def create_discussion( | |
| self, repo_id: str, category_id: str, title: str, body: str | |
| ) -> str: | |
| """Create a new discussion and return its ID""" | |
| query = """ | |
| mutation($repoId: ID!, $categoryId: ID!, $title: String!, $body: String!) { | |
| createDiscussion(input: {repositoryId: $repoId, categoryId: $categoryId, title: $title, body: $body}) { | |
| discussion { | |
| id | |
| number | |
| } | |
| } | |
| } | |
| """ | |
| variables = { | |
| "repoId": repo_id, | |
| "categoryId": category_id, | |
| "title": title, | |
| "body": body, | |
| } | |
| data = self._graphql_request(query, variables) | |
| discussion = data.get("createDiscussion", {}).get("discussion", {}) | |
| return discussion.get("id") | |
| def add_discussion_comment( | |
| self, discussion_id: str, body: str, reply_to_id: Optional[str] = None | |
| ) -> str: | |
| """Add a comment to a discussion""" | |
| if reply_to_id: | |
| query = """ | |
| mutation($discussionId: ID!, $body: String!, $replyToId: ID!) { | |
| addDiscussionComment(input: {discussionId: $discussionId, body: $body, replyToId: $replyToId}) { | |
| comment { | |
| id | |
| } | |
| } | |
| } | |
| """ | |
| variables = { | |
| "discussionId": discussion_id, | |
| "body": body, | |
| "replyToId": reply_to_id, | |
| } | |
| else: | |
| query = """ | |
| mutation($discussionId: ID!, $body: String!) { | |
| addDiscussionComment(input: {discussionId: $discussionId, body: $body}) { | |
| comment { | |
| id | |
| } | |
| } | |
| } | |
| """ | |
| variables = {"discussionId": discussion_id, "body": body} | |
| data = self._graphql_request(query, variables) | |
| comment = data.get("addDiscussionComment", {}).get("comment", {}) | |
| return comment.get("id") | |
| class MigrationFormatter: | |
| @staticmethod | |
| def format_discussion_title(thread: DisqusThread) -> str: | |
| """Format discussion title from URL path""" | |
| if not thread.link: | |
| return thread.title or "Untitled" | |
| try: | |
| # Extract path from URL and remove leading/trailing slashes | |
| path = urlparse(thread.link).path.strip("/") | |
| # If we have a meaningful path, use it | |
| if path: | |
| return path | |
| else: | |
| # Fallback to original title | |
| return thread.title or "Untitled" | |
| except: | |
| # If URL parsing fails, use original title | |
| return thread.title or "Untitled" | |
| @staticmethod | |
| def format_discussion_body(thread: DisqusThread) -> str: | |
| """Format main discussion body""" | |
| body = f"**Comments migrated from Disqus**\n\n" | |
| body += f"Original post: {thread.link}\n\n" | |
| if thread.created: | |
| try: | |
| created_date = datetime.fromisoformat( | |
| thread.created.replace("Z", "+00:00") | |
| ) | |
| body += f"Thread created: {created_date.strftime('%B %d, %Y')}\n\n" | |
| except: | |
| pass | |
| body += "---\n\n" | |
| return body | |
| @staticmethod | |
| def format_comment_for_github( | |
| comment: DisqusComment, reply_context: str = "" | |
| ) -> str: | |
| """Format a single comment for GitHub (without threading info)""" | |
| # Format timestamp | |
| timestamp = "" | |
| if comment.created: | |
| try: | |
| created_date = datetime.fromisoformat( | |
| comment.created.replace("Z", "+00:00") | |
| ) | |
| timestamp = created_date.strftime("%B %-d, %Y at %-I:%M %p UTC") | |
| except: | |
| timestamp = comment.created | |
| # Build comment | |
| formatted = f"*Originally posted by* **{comment.author}**" | |
| if timestamp: | |
| formatted += f" β’ *{timestamp}*" | |
| # Add reply context if this is a nested reply | |
| if reply_context: | |
| formatted += f" β’ *{reply_context}*" | |
| formatted += "\n\n" | |
| if comment.message: | |
| formatted += comment.message + "\n" | |
| else: | |
| formatted += "*No message content*\n" | |
| return formatted | |
| class DisqusToGiscusMigrator: | |
| def __init__(self, config: Config, state_file: str = "migration_state.json"): | |
| self.config = config | |
| self.api = GitHubDiscussionsAPI(config) | |
| self.state = MigrationState(state_file) | |
| def migrate( | |
| self, | |
| xml_path: str, | |
| dry_run: bool = True, | |
| output_file: str = "migration_preview.md", | |
| ): | |
| """Main migration function""" | |
| print("π Starting Disqus to GitHub Discussions migration") | |
| # Parse Disqus data | |
| parser = DisqusParser(xml_path) | |
| threads = parser.parse() | |
| if not threads: | |
| print("β No threads found to migrate") | |
| return | |
| if dry_run: | |
| self._dry_run(threads, output_file) | |
| else: | |
| self._real_migration(threads) | |
| def _dry_run(self, threads: Dict[str, DisqusThread], output_file: str): | |
| """Perform dry run and generate preview using same formatting as GitHub migration""" | |
| print(f"π Performing dry run - generating preview to {output_file}") | |
| # Show existing state | |
| stats = self.state.get_stats() | |
| if stats["discussions_created"] > 0 or stats["comments_created"] > 0: | |
| print( | |
| f"π Existing state: {stats['discussions_created']} discussions, {stats['comments_created']} comments already created" | |
| ) | |
| # Filter out threads with no comments | |
| threads_with_comments = { | |
| tid: thread for tid, thread in threads.items() if thread.comments | |
| } | |
| skipped_count = len(threads) - len(threads_with_comments) | |
| # Count what would be skipped vs created | |
| already_created = sum( | |
| 1 | |
| for tid in threads_with_comments.keys() | |
| if self.state.is_discussion_created(tid) | |
| ) | |
| to_create = len(threads_with_comments) - already_created | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| f.write("# Disqus to GitHub Discussions Migration Preview\n\n") | |
| f.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") | |
| f.write(f"Total threads found: {len(threads)}\n") | |
| f.write(f"Threads with comments: {len(threads_with_comments)}\n") | |
| f.write(f"Already migrated: {already_created}\n") | |
| f.write(f"Will be created: {to_create}\n") | |
| if skipped_count > 0: | |
| f.write(f"Threads skipped (no comments): {skipped_count}\n") | |
| f.write("\n---\n\n") | |
| for thread_id, thread in threads_with_comments.items(): | |
| title = MigrationFormatter.format_discussion_title(thread) | |
| body = MigrationFormatter.format_discussion_body(thread) | |
| # Mark if already created | |
| status = ( | |
| "β ALREADY MIGRATED" | |
| if self.state.is_discussion_created(thread_id) | |
| else "π TO BE CREATED" | |
| ) | |
| f.write(f"## Discussion: {title} ({status})\n\n") | |
| f.write(body) | |
| # Process comments using same logic as real migration | |
| root_comments = thread.build_comment_tree() | |
| total_root_comments = len(root_comments) | |
| total_replies = sum( | |
| len(self._flatten_replies(c)) for c in root_comments | |
| ) | |
| total_comments = total_root_comments + total_replies | |
| # Count already created comments for this thread | |
| created_comments = sum( | |
| 1 for c in thread.comments if self.state.is_comment_created(c.id) | |
| ) | |
| remaining_comments = total_comments - created_comments | |
| f.write( | |
| f"**{total_comments} comments total** ({total_root_comments} root comments, {total_replies} replies)\n" | |
| ) | |
| if created_comments > 0: | |
| f.write( | |
| f"**{created_comments} already created, {remaining_comments} remaining**\n" | |
| ) | |
| f.write("\n") | |
| # Show how comments would be formatted for GitHub | |
| for comment in root_comments: | |
| comment_status = ( | |
| "β CREATED" | |
| if self.state.is_comment_created(comment.id) | |
| else "π TO CREATE" | |
| ) | |
| f.write(f"### Root Comment ({comment_status}):\n") | |
| f.write(MigrationFormatter.format_comment_for_github(comment)) | |
| f.write("\n") | |
| # Show flattened replies | |
| all_replies = self._flatten_replies(comment) | |
| for reply in all_replies: | |
| reply_context = "" | |
| if reply.parent_id != comment.id: | |
| # This was originally a nested reply, add context | |
| original_parent = ( | |
| self._find_comment_by_id( | |
| thread.comments, reply.parent_id | |
| ) | |
| if reply.parent_id | |
| else None | |
| ) | |
| if original_parent: | |
| reply_context = f"Replying to {original_parent.author}" | |
| reply_status = ( | |
| "β CREATED" | |
| if self.state.is_comment_created(reply.id) | |
| else "π TO CREATE" | |
| ) | |
| f.write(f"### Reply ({reply_status}):\n") | |
| f.write( | |
| MigrationFormatter.format_comment_for_github( | |
| reply, reply_context | |
| ) | |
| ) | |
| f.write("\n") | |
| f.write("---\n\n") | |
| f.write("=" * 80 + "\n\n") | |
| print(f"β Dry run complete! Preview saved to {output_file}") | |
| print( | |
| f"π Summary: {to_create} new threads ready for migration, {already_created} already exist" | |
| ) | |
| if skipped_count > 0: | |
| print(f"π« Skipped {skipped_count} threads with no comments") | |
| def _real_migration(self, threads: Dict[str, DisqusThread]): | |
| """Perform actual migration to GitHub with state tracking""" | |
| print("π¨ Starting REAL migration to GitHub Discussions") | |
| # Filter out threads with no comments | |
| threads_with_comments = { | |
| tid: thread for tid, thread in threads.items() if thread.comments | |
| } | |
| skipped_count = len(threads) - len(threads_with_comments) | |
| if skipped_count > 0: | |
| print(f"π« Skipping {skipped_count} threads with no comments") | |
| if not threads_with_comments: | |
| print("β No threads with comments found to migrate") | |
| return | |
| # Show existing state | |
| stats = self.state.get_stats() | |
| already_created = sum( | |
| 1 | |
| for tid in threads_with_comments.keys() | |
| if self.state.is_discussion_created(tid) | |
| ) | |
| to_create = len(threads_with_comments) - already_created | |
| print( | |
| f"π Migration state: {already_created} discussions already exist, {to_create} to be created" | |
| ) | |
| if stats["comments_created"] > 0: | |
| print( | |
| f"π {stats['comments_created']} comments already created from previous runs" | |
| ) | |
| # Get repository info and categories | |
| print("π Fetching repository information...") | |
| try: | |
| repo_info = self.api.get_repository_info() | |
| repo_id = repo_info.get("repo_id") | |
| categories = repo_info.get("categories", []) | |
| if not repo_id: | |
| print("β Repository not found or access denied") | |
| return | |
| # Find the discussion category by name | |
| category_id = None | |
| for cat in categories: | |
| if ( | |
| cat.get("name", "").lower() | |
| == self.config.discussion_category_name.lower() | |
| ): | |
| category_id = cat.get("id") | |
| print( | |
| f"β Found '{self.config.discussion_category_name}' category: {category_id}" | |
| ) | |
| break | |
| if not category_id: | |
| print( | |
| f"β No '{self.config.discussion_category_name}' category found. Available categories:" | |
| ) | |
| for cat in categories: | |
| print(f" - {cat.get('name')} (ID: {cat.get('id')})") | |
| return | |
| # Store the resolved category ID and update migration info | |
| self.config.discussion_category_id = category_id | |
| self.state.update_migration_info( | |
| self.config.repo_owner, | |
| self.config.repo_name, | |
| self.config.discussion_category_name, | |
| ) | |
| except Exception as e: | |
| print(f"β Failed to fetch repository information: {e}") | |
| return | |
| success_count = 0 | |
| error_count = 0 | |
| skipped_discussions = 0 | |
| for thread_id, thread in threads_with_comments.items(): | |
| try: | |
| title = MigrationFormatter.format_discussion_title(thread) | |
| # Check if discussion already exists | |
| discussion_id = None | |
| if self.state.is_discussion_created(thread_id): | |
| discussion_id = self.state.get_discussion_id(thread_id) | |
| print( | |
| f"βοΈ Skipping discussion '{title}' - already exists (ID: {discussion_id})" | |
| ) | |
| skipped_discussions += 1 | |
| else: | |
| print(f"\nπ Creating discussion: {title}") | |
| # Create discussion | |
| body = MigrationFormatter.format_discussion_body(thread) | |
| discussion_id = self.api.create_discussion( | |
| repo_id=repo_id, | |
| category_id=self.config.discussion_category_id, | |
| title=title, | |
| body=body, | |
| ) | |
| print(f"β Created discussion with ID: {discussion_id}") | |
| # Record the discussion creation | |
| self.state.record_discussion(thread_id, discussion_id, title) | |
| # Ensure we have a discussion_id before processing comments | |
| if not discussion_id: | |
| print(f"β No discussion ID available for {title}") | |
| continue | |
| # Process comments for this discussion | |
| print(f"π Processing comments for: {title}") | |
| comment_count = 0 | |
| skipped_comments = 0 | |
| root_comments = thread.build_comment_tree() | |
| for root_comment in root_comments: | |
| try: | |
| # Check if root comment already exists | |
| if self.state.is_comment_created(root_comment.id): | |
| print( | |
| f"βοΈ Skipping root comment by {root_comment.author} - already exists" | |
| ) | |
| skipped_comments += 1 | |
| else: | |
| # Add the root comment | |
| root_body = MigrationFormatter.format_comment_for_github( | |
| root_comment | |
| ) | |
| root_github_id = self.api.add_discussion_comment( | |
| discussion_id=discussion_id, | |
| body=root_body, | |
| reply_to_id=None, | |
| ) | |
| comment_count += 1 | |
| time.sleep(self.config.rate_limit_delay) | |
| # Record the comment creation | |
| self.state.record_comment( | |
| root_comment.id, root_github_id, thread_id | |
| ) | |
| print(f"β Added root comment by {root_comment.author}") | |
| # Get the GitHub ID for replies (whether just created or from state) | |
| root_github_id = self.state.get_github_comment_id( | |
| root_comment.id | |
| ) | |
| if root_github_id: | |
| # We have the GitHub ID (either just created or from previous run) | |
| # Add all replies as direct replies to the root comment | |
| all_replies = self._flatten_replies(root_comment) | |
| for reply in all_replies: | |
| try: | |
| if self.state.is_comment_created(reply.id): | |
| print( | |
| f"βοΈ Skipping reply by {reply.author} - already exists" | |
| ) | |
| skipped_comments += 1 | |
| continue | |
| reply_context = "" | |
| if reply.parent_id != root_comment.id: | |
| # This was originally a nested reply, add context | |
| original_parent = ( | |
| self._find_comment_by_id( | |
| thread.comments, reply.parent_id | |
| ) | |
| if reply.parent_id | |
| else None | |
| ) | |
| if original_parent: | |
| reply_context = ( | |
| f"Replying to {original_parent.author}" | |
| ) | |
| reply_body = ( | |
| MigrationFormatter.format_comment_for_github( | |
| reply, reply_context | |
| ) | |
| ) | |
| reply_github_id = self.api.add_discussion_comment( | |
| discussion_id=discussion_id, | |
| body=reply_body, | |
| reply_to_id=root_github_id, | |
| ) | |
| comment_count += 1 | |
| time.sleep(self.config.rate_limit_delay) | |
| # Record the reply creation | |
| self.state.record_comment( | |
| reply.id, reply_github_id, thread_id | |
| ) | |
| print(f"β Added reply by {reply.author}") | |
| except Exception as e: | |
| print( | |
| f"β οΈ Failed to add reply by {reply.author}: {e}" | |
| ) | |
| continue | |
| else: | |
| print( | |
| f"β οΈ No GitHub ID found for root comment by {root_comment.author}, skipping replies" | |
| ) | |
| except Exception as e: | |
| print( | |
| f"β οΈ Failed to add root comment by {root_comment.author}: {e}" | |
| ) | |
| continue | |
| if comment_count > 0: | |
| print(f"β Added {comment_count} new comments") | |
| if skipped_comments > 0: | |
| print(f"βοΈ Skipped {skipped_comments} existing comments") | |
| success_count += 1 | |
| except Exception as e: | |
| print(f"β Error migrating {thread.title}: {e}") | |
| error_count += 1 | |
| print(f"\nπ Migration complete!") | |
| print(f"β Successful: {success_count}") | |
| print(f"βοΈ Skipped (already exist): {skipped_discussions}") | |
| print(f"β Errors: {error_count}") | |
| # Show final stats | |
| final_stats = self.state.get_stats() | |
| print( | |
| f"π Total state: {final_stats['discussions_created']} discussions, {final_stats['comments_created']} comments" | |
| ) | |
| def _flatten_replies(self, comment: DisqusComment) -> List[DisqusComment]: | |
| """Flatten all replies recursively into a single list""" | |
| replies = [] | |
| for reply in comment.replies: | |
| replies.append(reply) | |
| replies.extend(self._flatten_replies(reply)) | |
| return replies | |
| def _find_comment_by_id( | |
| self, comments: List[DisqusComment], comment_id: str | |
| ) -> Optional[DisqusComment]: | |
| """Find a comment by its ID""" | |
| for comment in comments: | |
| if comment.id == comment_id: | |
| return comment | |
| return None | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Migrate Disqus comments to GitHub Discussions", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Environment Variables: | |
| Authentication (choose one): | |
| GITHUB_TOKEN Personal access token for GitHub API | |
| OR for GitHub App authentication: | |
| GITHUB_APP_ID GitHub App ID | |
| GITHUB_APP_PRIVATE_KEY_PATH Path to GitHub App private key file | |
| GITHUB_APP_INSTALLATION_ID GitHub App installation ID | |
| State Tracking: | |
| The script maintains a local state file (migration_state.json by default) to track | |
| which discussions and comments have been successfully created. This makes the script | |
| idempotent - you can safely re-run it after failures and it will resume where it | |
| left off without creating duplicates. | |
| Authentication Methods: | |
| 1. Personal Access Token (original method): | |
| - Set GITHUB_TOKEN environment variable | |
| - Token needs 'repo' and 'write:discussion' scopes | |
| 2. GitHub App (recommended for organizations): | |
| - Create a GitHub App with discussions:write permission | |
| - Install the app on your repository/organization | |
| - Provide --app-id, --private-key-path, --installation-id | |
| - Or set corresponding environment variables | |
| Examples: | |
| # Dry run (recommended first) | |
| python disqus_to_giscus.py export.xml --dry-run | |
| # Real migration with personal access token | |
| export GITHUB_TOKEN="your_token_here" | |
| python disqus_to_giscus.py export.xml --repo-owner myusername --repo-name myrepo | |
| # Real migration with GitHub App | |
| python disqus_to_giscus.py export.xml --repo-owner myusername --repo-name myrepo \\ | |
| --app-id 123456 --private-key-path /path/to/private-key.pem --installation-id 789012 | |
| # With custom category name | |
| python disqus_to_giscus.py export.xml --repo-owner myusername --repo-name myrepo --category-name "General" | |
| # Custom output file for dry run | |
| python disqus_to_giscus.py export.xml --dry-run --output custom_preview.md | |
| # Custom state file location | |
| python disqus_to_giscus.py export.xml --repo-owner myusername --repo-name myrepo --state-file my_migration.json | |
| """, | |
| ) | |
| parser.add_argument("xml_file", help="Path to Disqus XML export file") | |
| parser.add_argument( | |
| "--repo-owner", | |
| help="GitHub repository owner/organization name (required for real migration)", | |
| ) | |
| parser.add_argument( | |
| "--repo-name", help="GitHub repository name (required for real migration)" | |
| ) | |
| parser.add_argument( | |
| "--category-name", | |
| default="Announcements", | |
| help="GitHub Discussion category name (default: Announcements)", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Preview migration without posting to GitHub (recommended)", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| default="migration_preview.md", | |
| help="Output file for dry run preview (default: migration_preview.md)", | |
| ) | |
| parser.add_argument( | |
| "--state-file", | |
| default="migration_state.json", | |
| help="State file for tracking migration progress (default: migration_state.json)", | |
| ) | |
| # GitHub App authentication arguments | |
| github_app_group = parser.add_argument_group( | |
| "GitHub App Authentication", | |
| "Use GitHub App for authentication instead of personal access token", | |
| ) | |
| github_app_group.add_argument( | |
| "--app-id", | |
| help="GitHub App ID (can also be set via GITHUB_APP_ID environment variable)", | |
| ) | |
| github_app_group.add_argument( | |
| "--private-key-path", | |
| help="Path to GitHub App private key file (can also be set via GITHUB_APP_PRIVATE_KEY_PATH environment variable)", | |
| ) | |
| github_app_group.add_argument( | |
| "--installation-id", | |
| help="GitHub App installation ID (can also be set via GITHUB_APP_INSTALLATION_ID environment variable)", | |
| ) | |
| args = parser.parse_args() | |
| # Validate required arguments for non-dry-run mode | |
| if not args.dry_run: | |
| if not args.repo_owner: | |
| parser.error("--repo-owner is required for real migration") | |
| if not args.repo_name: | |
| parser.error("--repo-name is required for real migration") | |
| try: | |
| # Get GitHub App credentials from args or environment variables | |
| app_id = args.app_id or os.getenv("GITHUB_APP_ID") | |
| private_key_path = args.private_key_path or os.getenv( | |
| "GITHUB_APP_PRIVATE_KEY_PATH" | |
| ) | |
| installation_id = args.installation_id or os.getenv( | |
| "GITHUB_APP_INSTALLATION_ID" | |
| ) | |
| # Initialize configuration | |
| config = Config( | |
| repo_owner=args.repo_owner or "", | |
| repo_name=args.repo_name or "", | |
| discussion_category_name=args.category_name, | |
| app_id=app_id, | |
| private_key_path=private_key_path, | |
| installation_id=installation_id, | |
| ) | |
| config.validate(dry_run=args.dry_run) | |
| # Run migration | |
| migrator = DisqusToGiscusMigrator(config, state_file=args.state_file) | |
| migrator.migrate(args.xml_file, dry_run=args.dry_run, output_file=args.output) | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment