Created
August 17, 2025 15:06
-
-
Save stepbrobd/52b6586ee88461bc3248157b555470d1 to your computer and use it in GitHub Desktop.
import crunchyroll watchlist to anilist
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| import json | |
| import time | |
| import argparse | |
| import sys | |
| import webbrowser | |
| import urllib.parse | |
| from difflib import SequenceMatcher | |
| from typing import Dict, Optional | |
| class CrunchyrollAniListMatcher: | |
| def __init__(self, access_token: Optional[str] = None): | |
| self.anilist_url = "https://graphql.anilist.co" | |
| self.auth_url = "https://anilist.co/api/v2/oauth/authorize" | |
| self.token_url = "https://anilist.co/api/v2/oauth/token" | |
| self.access_token = access_token | |
| self.matched_data = [] | |
| self.failed_matches = [] | |
| self.request_count = 0 | |
| self.last_request_time = 0 | |
| def get_oauth_token( | |
| self, client_id: str, client_secret: str, redirect_uri: str | |
| ) -> Optional[str]: | |
| """Get OAuth access token through authorization code flow""" | |
| # Step 1: Generate authorization URL | |
| auth_params = { | |
| "client_id": client_id, | |
| "redirect_uri": redirect_uri, | |
| "response_type": "code", | |
| } | |
| auth_url = f"{self.auth_url}?{urllib.parse.urlencode(auth_params)}" | |
| print("\n1. Please visit this URL to authorize the application:") | |
| print(f"{auth_url}") | |
| print("\n2. After authorization, you'll be redirected to your redirect URI.") | |
| print("3. Copy the 'code' parameter from the redirect URL.") | |
| # Open browser automatically | |
| try: | |
| webbrowser.open(auth_url) | |
| print(" (Browser should open automatically)") | |
| except: | |
| print(" (Please copy and paste the URL into your browser)") | |
| # Get authorization code from user | |
| auth_code = input("\n4. Enter the authorization code: ").strip() | |
| if not auth_code: | |
| print("No authorization code provided.") | |
| return None | |
| # Step 2: Exchange authorization code for access token | |
| token_data = { | |
| "grant_type": "authorization_code", | |
| "client_id": client_id, | |
| "client_secret": client_secret, | |
| "redirect_uri": redirect_uri, | |
| "code": auth_code, | |
| } | |
| try: | |
| response = requests.post(self.token_url, json=token_data) | |
| if response.status_code == 200: | |
| token_response = response.json() | |
| access_token = token_response.get("access_token") | |
| if access_token: | |
| print("✓ Successfully obtained access token!") | |
| # Save token to file for future use | |
| with open(".anilist_token", "w") as f: | |
| f.write(access_token) | |
| print("✓ Token saved to .anilist_token file") | |
| return access_token | |
| else: | |
| print("Error: No access token in response") | |
| return None | |
| else: | |
| print(f"Error getting token: {response.status_code}") | |
| print(response.text) | |
| return None | |
| except Exception as e: | |
| print(f"Error during token exchange: {e}") | |
| return None | |
| def load_token_from_file(self, token_file: str = ".anilist_token") -> Optional[str]: | |
| """Load access token from file""" | |
| try: | |
| with open(token_file, "r") as f: | |
| return f.read().strip() | |
| except FileNotFoundError: | |
| return None | |
| def rate_limit_sleep(self): | |
| """Enhanced rate limiting with dynamic sleep""" | |
| current_time = time.time() | |
| # Ensure at least 2.5 seconds between requests (24 requests per minute max) | |
| # This is more conservative than the 30/minute limit to account for variations | |
| min_interval = 2.5 | |
| if self.last_request_time > 0: | |
| time_since_last = current_time - self.last_request_time | |
| if time_since_last < min_interval: | |
| sleep_time = min_interval - time_since_last | |
| print(f" Rate limiting: sleeping {sleep_time:.1f}s") | |
| time.sleep(sleep_time) | |
| self.last_request_time = time.time() | |
| self.request_count += 1 | |
| def handle_rate_limit_error(self, response) -> int: | |
| """Handle 429 rate limit error and return retry delay""" | |
| retry_after = 60 # Default 1 minute | |
| # Try to get Retry-After header | |
| if "Retry-After" in response.headers: | |
| try: | |
| retry_after = int(response.headers["Retry-After"]) | |
| except ValueError: | |
| pass | |
| # Try to get X-RateLimit-Reset header | |
| elif "X-RateLimit-Reset" in response.headers: | |
| try: | |
| reset_time = int(response.headers["X-RateLimit-Reset"]) | |
| retry_after = max(reset_time - int(time.time()), 10) | |
| except ValueError: | |
| pass | |
| print(f" Rate limit exceeded. Waiting {retry_after} seconds...") | |
| return retry_after | |
| def similarity(self, a: str, b: str) -> float: | |
| """Calculate similarity between two strings""" | |
| return SequenceMatcher(None, a.lower(), b.lower()).ratio() | |
| def clean_title(self, title: str) -> str: | |
| """Clean title for better matching""" | |
| cleaners = [ | |
| " (TV)", | |
| " (OVA)", | |
| " (Movie)", | |
| " the Movie", | |
| " Season 1", | |
| " Season 2", | |
| " Season 3", | |
| " Season 4", | |
| " S1", | |
| " S2", | |
| " S3", | |
| " S4", | |
| ] | |
| cleaned = title | |
| for cleaner in cleaners: | |
| cleaned = cleaned.replace(cleaner, "") | |
| return cleaned.strip() | |
| def make_request( | |
| self, query: str, variables: Dict, max_retries: int = 3 | |
| ) -> Optional[Dict]: | |
| """Make GraphQL request to AniList API with retry logic""" | |
| headers = {"Content-Type": "application/json", "Accept": "application/json"} | |
| if self.access_token: | |
| headers["Authorization"] = f"Bearer {self.access_token}" | |
| for attempt in range(max_retries + 1): | |
| # Rate limiting before each request | |
| self.rate_limit_sleep() | |
| try: | |
| response = requests.post( | |
| self.anilist_url, | |
| json={"query": query, "variables": variables}, | |
| headers=headers, | |
| timeout=15, | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if "errors" in data: | |
| print(f"GraphQL errors: {data['errors']}") | |
| return None | |
| return data.get("data") | |
| elif response.status_code == 429: | |
| # Rate limit exceeded | |
| if attempt < max_retries: | |
| retry_delay = self.handle_rate_limit_error(response) | |
| time.sleep(retry_delay) | |
| continue | |
| else: | |
| print(f"HTTP error 429: {response.text}") | |
| return None | |
| elif response.status_code == 500: | |
| # Server error - log and continue | |
| print(f"HTTP error 500: {response.text}") | |
| if attempt < max_retries: | |
| print( | |
| f" Retrying in 5 seconds... (attempt {attempt + 1}/{max_retries})" | |
| ) | |
| time.sleep(5) | |
| continue | |
| else: | |
| print(" Max retries reached for 500 error") | |
| return None | |
| else: | |
| print(f"HTTP error {response.status_code}: {response.text}") | |
| return None | |
| except Exception as e: | |
| print(f"Request error: {e}") | |
| if attempt < max_retries: | |
| print( | |
| f" Retrying in 3 seconds... (attempt {attempt + 1}/{max_retries})" | |
| ) | |
| time.sleep(3) | |
| continue | |
| else: | |
| return None | |
| return None | |
| def search_anilist(self, title: str) -> Optional[Dict]: | |
| """Search AniList for anime by title""" | |
| query = """ | |
| query ($search: String) { | |
| Media (search: $search, type: ANIME) { | |
| id | |
| title { | |
| romaji | |
| english | |
| native | |
| } | |
| synonyms | |
| } | |
| } | |
| """ | |
| variables = {"search": title} | |
| data = self.make_request(query, variables) | |
| if data and data.get("Media"): | |
| return data["Media"] | |
| return None | |
| def add_to_anilist(self, media_id: int, status: str = "PLANNING") -> bool: | |
| """Add anime to user's AniList""" | |
| if not self.access_token: | |
| print("Access token required for updating AniList") | |
| return False | |
| mutation = """ | |
| mutation ($mediaId: Int, $status: MediaListStatus) { | |
| SaveMediaListEntry (mediaId: $mediaId, status: $status) { | |
| id | |
| status | |
| media { | |
| id | |
| title { | |
| romaji | |
| english | |
| } | |
| } | |
| } | |
| } | |
| """ | |
| variables = {"mediaId": media_id, "status": status} | |
| data = self.make_request(mutation, variables) | |
| if data and data.get("SaveMediaListEntry"): | |
| return True | |
| return False | |
| def find_best_match(self, cr_title: str) -> Optional[Dict]: | |
| """Find best AniList match for Crunchyroll title""" | |
| if not cr_title.strip(): | |
| return None | |
| # Try exact search first | |
| anilist_data = self.search_anilist(cr_title) | |
| if anilist_data: | |
| return anilist_data | |
| # Try cleaned title | |
| cleaned_title = self.clean_title(cr_title) | |
| if cleaned_title != cr_title: | |
| anilist_data = self.search_anilist(cleaned_title) | |
| if anilist_data: | |
| return anilist_data | |
| # Try alternative searches for common variations | |
| variations = [ | |
| cr_title.replace("!", ""), | |
| cr_title.replace("?", ""), | |
| cr_title.replace(":", ""), | |
| cr_title.split(":")[0] if ":" in cr_title else cr_title, | |
| cr_title.split(" -")[0] if " -" in cr_title else cr_title, | |
| ] | |
| for variation in variations: | |
| if ( | |
| variation != cr_title | |
| and variation != cleaned_title | |
| and variation.strip() | |
| ): | |
| anilist_data = self.search_anilist(variation) | |
| if anilist_data: | |
| return anilist_data | |
| return None | |
| def process_crunchyroll_data( | |
| self, cr_data: Dict, update_anilist: bool = False, list_status: str = "PLANNING" | |
| ) -> Dict: | |
| """Process and match Crunchyroll watchlist data""" | |
| results = { | |
| "matched": [], | |
| "failed": [], | |
| "updated": [], | |
| "update_failed": [], | |
| "total": len(cr_data.get("data", [])), | |
| } | |
| print(f"Processing {results['total']} entries with enhanced rate limiting...") | |
| print("Note: AniList API is currently limited to 30 requests/minute") | |
| for i, item in enumerate(cr_data.get("data", []), 1): | |
| panel = item.get("panel", {}) | |
| cr_title = panel.get("title", "") | |
| panel.get("id", "") | |
| print(f"[{i}/{results['total']}] Processing: {cr_title}") | |
| # Find AniList match | |
| anilist_data = self.find_best_match(cr_title) | |
| if anilist_data: | |
| matched_entry = { | |
| "crunchyroll_title": cr_title, | |
| "anilist_id": anilist_data.get("id"), | |
| "anilist_english_name": anilist_data.get("title", {}).get("english") | |
| or anilist_data.get("title", {}).get("romaji"), | |
| } | |
| results["matched"].append(matched_entry) | |
| print( | |
| f" ✓ Matched: {cr_title} -> {matched_entry['anilist_english_name']}" | |
| ) | |
| # Update AniList if requested | |
| if update_anilist and self.access_token: | |
| if self.add_to_anilist(anilist_data.get("id"), list_status): | |
| results["updated"].append(matched_entry) | |
| print(f" ✓ Added to AniList with status: {list_status}") | |
| else: | |
| results["update_failed"].append(matched_entry) | |
| print(" ✗ Failed to add to AniList") | |
| else: | |
| failed_entry = {"crunchyroll_title": cr_title} | |
| results["failed"].append(failed_entry) | |
| print(f" ✗ Failed to match: {cr_title}") | |
| return results | |
| def save_results( | |
| self, results: Dict, filename: str = "crunchyroll_anilist_matches.json" | |
| ): | |
| """Save results to JSON file""" | |
| with open(filename, "w", encoding="utf-8") as f: | |
| json.dump(results, f, indent=2, ensure_ascii=False) | |
| print(f"\nResults saved to {filename}") | |
| print(f"Total processed: {results['total']}") | |
| print(f"Successfully matched: {len(results['matched'])}") | |
| print(f"Failed to match: {len(results['failed'])}") | |
| if "updated" in results: | |
| print(f"Added to AniList: {len(results['updated'])}") | |
| print(f"Failed to add to AniList: {len(results['update_failed'])}") | |
| success_rate = ( | |
| len(results["matched"]) / results["total"] * 100 | |
| if results["total"] > 0 | |
| else 0 | |
| ) | |
| print(f"Match success rate: {success_rate:.1f}%") | |
| print(f"Total API requests made: {self.request_count}") | |
| def load_crunchyroll_data(file_path: str) -> Dict: | |
| """Load Crunchyroll watchlist data from JSON file""" | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return data | |
| except FileNotFoundError: | |
| print(f"Error: File '{file_path}' not found.") | |
| sys.exit(1) | |
| except json.JSONDecodeError as e: | |
| print(f"Error: Invalid JSON in file '{file_path}': {e}") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"Error loading file '{file_path}': {e}") | |
| sys.exit(1) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Match Crunchyroll watchlist with AniList API data and optionally update AniList", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| OAuth Setup: | |
| 1. Go to https://anilist.co/settings/developer | |
| 2. Create a new client application | |
| 3. Note your Client ID, Client Secret, and Redirect URI | |
| Examples: | |
| # Just match and save results | |
| python matcher.py watchlist.json | |
| # Match and update AniList (OAuth flow) | |
| python matcher.py watchlist.json --update --client-id YOUR_CLIENT_ID --client-secret YOUR_SECRET --redirect-uri YOUR_REDIRECT_URI | |
| # Use saved token from previous OAuth | |
| python matcher.py watchlist.json --update --use-saved-token | |
| Note: AniList API is currently rate-limited to 30 requests/minute. | |
| Large lists will take significant time to process. | |
| """, | |
| ) | |
| parser.add_argument("input_file", help="Path to Crunchyroll watchlist JSON file") | |
| parser.add_argument( | |
| "-o", | |
| "--output", | |
| default="crunchyroll_anilist_matches.json", | |
| help="Output file for results (default: crunchyroll_anilist_matches.json)", | |
| ) | |
| parser.add_argument( | |
| "--matched", help="Optional: Save matched entries to separate file" | |
| ) | |
| parser.add_argument( | |
| "--failed", help="Optional: Save failed matches to separate file" | |
| ) | |
| parser.add_argument( | |
| "--update", | |
| action="store_true", | |
| help="Update AniList with matched entries (requires OAuth)", | |
| ) | |
| parser.add_argument("--client-id", help="AniList OAuth client ID") | |
| parser.add_argument("--client-secret", help="AniList OAuth client secret") | |
| parser.add_argument("--redirect-uri", help="AniList OAuth redirect URI") | |
| parser.add_argument( | |
| "--use-saved-token", | |
| action="store_true", | |
| help="Use previously saved access token from .anilist_token file", | |
| ) | |
| parser.add_argument( | |
| "--status", | |
| choices=["CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"], | |
| default="PLANNING", | |
| help="Status to set for added entries (default: PLANNING)", | |
| ) | |
| args = parser.parse_args() | |
| # Initialize matcher | |
| matcher = CrunchyrollAniListMatcher() | |
| # Handle authentication if updating | |
| if args.update: | |
| access_token = None | |
| # Try to use saved token first | |
| if args.use_saved_token: | |
| access_token = matcher.load_token_from_file() | |
| if access_token: | |
| print("✓ Using saved access token") | |
| else: | |
| print("No saved token found") | |
| # If no saved token, do OAuth flow | |
| if not access_token: | |
| if not all([args.client_id, args.client_secret, args.redirect_uri]): | |
| print("Error: OAuth credentials required for updating AniList.") | |
| print("Provide --client-id, --client-secret, and --redirect-uri") | |
| print("Or use --use-saved-token if you have a saved token") | |
| print( | |
| "\nGet your OAuth credentials at: https://anilist.co/settings/developer" | |
| ) | |
| sys.exit(1) | |
| print("Starting OAuth authentication flow...") | |
| access_token = matcher.get_oauth_token( | |
| args.client_id, args.client_secret, args.redirect_uri | |
| ) | |
| if not access_token: | |
| print("Failed to obtain access token") | |
| sys.exit(1) | |
| matcher.access_token = access_token | |
| # Load Crunchyroll data | |
| print(f"\nLoading Crunchyroll data from: {args.input_file}") | |
| cr_watchlist = load_crunchyroll_data(args.input_file) | |
| # Process data | |
| results = matcher.process_crunchyroll_data( | |
| cr_watchlist, update_anilist=args.update, list_status=args.status | |
| ) | |
| # Save main results | |
| matcher.save_results(results, args.output) | |
| # Save separate files if requested | |
| if args.matched: | |
| with open(args.matched, "w", encoding="utf-8") as f: | |
| json.dump(results["matched"], f, indent=2, ensure_ascii=False) | |
| print(f"Matched entries saved to: {args.matched}") | |
| if args.failed: | |
| with open(args.failed, "w", encoding="utf-8") as f: | |
| json.dump(results["failed"], f, indent=2, ensure_ascii=False) | |
| print(f"Failed matches saved to: {args.failed}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Warning
This code was generated using LLM.
I probably only spent ~10min vibe-coding this to migrate my Crunchyroll watchlist to AniList.
Use at your own risk.
Export your Crunchyroll watchlist in JSON format (you can do this through the
browser console or by getting the UUID and saving it locally).
Create an AniList OAuth application:
Download and prepare the script:
pip install requestsRun the migration: If you already have an OAuth token, you can manually
create a
.anilist_tokenfile with your token to skip the OAuth setupentirely. Go to step 5 directly.
For subsequent runs: Use the
--use-saved-tokenflag to skip the OAuth flowsince your token will be saved to .anilist_token after the first successful
authentication.
Check the code for other usages, like import to another AniList list, etc.