Skip to content

Instantly share code, notes, and snippets.

@stepbrobd
Created August 17, 2025 15:06
Show Gist options
  • Save stepbrobd/52b6586ee88461bc3248157b555470d1 to your computer and use it in GitHub Desktop.
Save stepbrobd/52b6586ee88461bc3248157b555470d1 to your computer and use it in GitHub Desktop.
import crunchyroll watchlist to anilist
import requests
import json
import time
import argparse
import sys
import webbrowser
import urllib.parse
from difflib import SequenceMatcher
from typing import Dict, Optional
class CrunchyrollAniListMatcher:
def __init__(self, access_token: Optional[str] = None):
self.anilist_url = "https://graphql.anilist.co"
self.auth_url = "https://anilist.co/api/v2/oauth/authorize"
self.token_url = "https://anilist.co/api/v2/oauth/token"
self.access_token = access_token
self.matched_data = []
self.failed_matches = []
self.request_count = 0
self.last_request_time = 0
def get_oauth_token(
self, client_id: str, client_secret: str, redirect_uri: str
) -> Optional[str]:
"""Get OAuth access token through authorization code flow"""
# Step 1: Generate authorization URL
auth_params = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
}
auth_url = f"{self.auth_url}?{urllib.parse.urlencode(auth_params)}"
print("\n1. Please visit this URL to authorize the application:")
print(f"{auth_url}")
print("\n2. After authorization, you'll be redirected to your redirect URI.")
print("3. Copy the 'code' parameter from the redirect URL.")
# Open browser automatically
try:
webbrowser.open(auth_url)
print(" (Browser should open automatically)")
except:
print(" (Please copy and paste the URL into your browser)")
# Get authorization code from user
auth_code = input("\n4. Enter the authorization code: ").strip()
if not auth_code:
print("No authorization code provided.")
return None
# Step 2: Exchange authorization code for access token
token_data = {
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"code": auth_code,
}
try:
response = requests.post(self.token_url, json=token_data)
if response.status_code == 200:
token_response = response.json()
access_token = token_response.get("access_token")
if access_token:
print("✓ Successfully obtained access token!")
# Save token to file for future use
with open(".anilist_token", "w") as f:
f.write(access_token)
print("✓ Token saved to .anilist_token file")
return access_token
else:
print("Error: No access token in response")
return None
else:
print(f"Error getting token: {response.status_code}")
print(response.text)
return None
except Exception as e:
print(f"Error during token exchange: {e}")
return None
def load_token_from_file(self, token_file: str = ".anilist_token") -> Optional[str]:
"""Load access token from file"""
try:
with open(token_file, "r") as f:
return f.read().strip()
except FileNotFoundError:
return None
def rate_limit_sleep(self):
"""Enhanced rate limiting with dynamic sleep"""
current_time = time.time()
# Ensure at least 2.5 seconds between requests (24 requests per minute max)
# This is more conservative than the 30/minute limit to account for variations
min_interval = 2.5
if self.last_request_time > 0:
time_since_last = current_time - self.last_request_time
if time_since_last < min_interval:
sleep_time = min_interval - time_since_last
print(f" Rate limiting: sleeping {sleep_time:.1f}s")
time.sleep(sleep_time)
self.last_request_time = time.time()
self.request_count += 1
def handle_rate_limit_error(self, response) -> int:
"""Handle 429 rate limit error and return retry delay"""
retry_after = 60 # Default 1 minute
# Try to get Retry-After header
if "Retry-After" in response.headers:
try:
retry_after = int(response.headers["Retry-After"])
except ValueError:
pass
# Try to get X-RateLimit-Reset header
elif "X-RateLimit-Reset" in response.headers:
try:
reset_time = int(response.headers["X-RateLimit-Reset"])
retry_after = max(reset_time - int(time.time()), 10)
except ValueError:
pass
print(f" Rate limit exceeded. Waiting {retry_after} seconds...")
return retry_after
def similarity(self, a: str, b: str) -> float:
"""Calculate similarity between two strings"""
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def clean_title(self, title: str) -> str:
"""Clean title for better matching"""
cleaners = [
" (TV)",
" (OVA)",
" (Movie)",
" the Movie",
" Season 1",
" Season 2",
" Season 3",
" Season 4",
" S1",
" S2",
" S3",
" S4",
]
cleaned = title
for cleaner in cleaners:
cleaned = cleaned.replace(cleaner, "")
return cleaned.strip()
def make_request(
self, query: str, variables: Dict, max_retries: int = 3
) -> Optional[Dict]:
"""Make GraphQL request to AniList API with retry logic"""
headers = {"Content-Type": "application/json", "Accept": "application/json"}
if self.access_token:
headers["Authorization"] = f"Bearer {self.access_token}"
for attempt in range(max_retries + 1):
# Rate limiting before each request
self.rate_limit_sleep()
try:
response = requests.post(
self.anilist_url,
json={"query": query, "variables": variables},
headers=headers,
timeout=15,
)
if response.status_code == 200:
data = response.json()
if "errors" in data:
print(f"GraphQL errors: {data['errors']}")
return None
return data.get("data")
elif response.status_code == 429:
# Rate limit exceeded
if attempt < max_retries:
retry_delay = self.handle_rate_limit_error(response)
time.sleep(retry_delay)
continue
else:
print(f"HTTP error 429: {response.text}")
return None
elif response.status_code == 500:
# Server error - log and continue
print(f"HTTP error 500: {response.text}")
if attempt < max_retries:
print(
f" Retrying in 5 seconds... (attempt {attempt + 1}/{max_retries})"
)
time.sleep(5)
continue
else:
print(" Max retries reached for 500 error")
return None
else:
print(f"HTTP error {response.status_code}: {response.text}")
return None
except Exception as e:
print(f"Request error: {e}")
if attempt < max_retries:
print(
f" Retrying in 3 seconds... (attempt {attempt + 1}/{max_retries})"
)
time.sleep(3)
continue
else:
return None
return None
def search_anilist(self, title: str) -> Optional[Dict]:
"""Search AniList for anime by title"""
query = """
query ($search: String) {
Media (search: $search, type: ANIME) {
id
title {
romaji
english
native
}
synonyms
}
}
"""
variables = {"search": title}
data = self.make_request(query, variables)
if data and data.get("Media"):
return data["Media"]
return None
def add_to_anilist(self, media_id: int, status: str = "PLANNING") -> bool:
"""Add anime to user's AniList"""
if not self.access_token:
print("Access token required for updating AniList")
return False
mutation = """
mutation ($mediaId: Int, $status: MediaListStatus) {
SaveMediaListEntry (mediaId: $mediaId, status: $status) {
id
status
media {
id
title {
romaji
english
}
}
}
}
"""
variables = {"mediaId": media_id, "status": status}
data = self.make_request(mutation, variables)
if data and data.get("SaveMediaListEntry"):
return True
return False
def find_best_match(self, cr_title: str) -> Optional[Dict]:
"""Find best AniList match for Crunchyroll title"""
if not cr_title.strip():
return None
# Try exact search first
anilist_data = self.search_anilist(cr_title)
if anilist_data:
return anilist_data
# Try cleaned title
cleaned_title = self.clean_title(cr_title)
if cleaned_title != cr_title:
anilist_data = self.search_anilist(cleaned_title)
if anilist_data:
return anilist_data
# Try alternative searches for common variations
variations = [
cr_title.replace("!", ""),
cr_title.replace("?", ""),
cr_title.replace(":", ""),
cr_title.split(":")[0] if ":" in cr_title else cr_title,
cr_title.split(" -")[0] if " -" in cr_title else cr_title,
]
for variation in variations:
if (
variation != cr_title
and variation != cleaned_title
and variation.strip()
):
anilist_data = self.search_anilist(variation)
if anilist_data:
return anilist_data
return None
def process_crunchyroll_data(
self, cr_data: Dict, update_anilist: bool = False, list_status: str = "PLANNING"
) -> Dict:
"""Process and match Crunchyroll watchlist data"""
results = {
"matched": [],
"failed": [],
"updated": [],
"update_failed": [],
"total": len(cr_data.get("data", [])),
}
print(f"Processing {results['total']} entries with enhanced rate limiting...")
print("Note: AniList API is currently limited to 30 requests/minute")
for i, item in enumerate(cr_data.get("data", []), 1):
panel = item.get("panel", {})
cr_title = panel.get("title", "")
panel.get("id", "")
print(f"[{i}/{results['total']}] Processing: {cr_title}")
# Find AniList match
anilist_data = self.find_best_match(cr_title)
if anilist_data:
matched_entry = {
"crunchyroll_title": cr_title,
"anilist_id": anilist_data.get("id"),
"anilist_english_name": anilist_data.get("title", {}).get("english")
or anilist_data.get("title", {}).get("romaji"),
}
results["matched"].append(matched_entry)
print(
f" ✓ Matched: {cr_title} -> {matched_entry['anilist_english_name']}"
)
# Update AniList if requested
if update_anilist and self.access_token:
if self.add_to_anilist(anilist_data.get("id"), list_status):
results["updated"].append(matched_entry)
print(f" ✓ Added to AniList with status: {list_status}")
else:
results["update_failed"].append(matched_entry)
print(" ✗ Failed to add to AniList")
else:
failed_entry = {"crunchyroll_title": cr_title}
results["failed"].append(failed_entry)
print(f" ✗ Failed to match: {cr_title}")
return results
def save_results(
self, results: Dict, filename: str = "crunchyroll_anilist_matches.json"
):
"""Save results to JSON file"""
with open(filename, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\nResults saved to {filename}")
print(f"Total processed: {results['total']}")
print(f"Successfully matched: {len(results['matched'])}")
print(f"Failed to match: {len(results['failed'])}")
if "updated" in results:
print(f"Added to AniList: {len(results['updated'])}")
print(f"Failed to add to AniList: {len(results['update_failed'])}")
success_rate = (
len(results["matched"]) / results["total"] * 100
if results["total"] > 0
else 0
)
print(f"Match success rate: {success_rate:.1f}%")
print(f"Total API requests made: {self.request_count}")
def load_crunchyroll_data(file_path: str) -> Dict:
"""Load Crunchyroll watchlist data from JSON file"""
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data
except FileNotFoundError:
print(f"Error: File '{file_path}' not found.")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in file '{file_path}': {e}")
sys.exit(1)
except Exception as e:
print(f"Error loading file '{file_path}': {e}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description="Match Crunchyroll watchlist with AniList API data and optionally update AniList",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
OAuth Setup:
1. Go to https://anilist.co/settings/developer
2. Create a new client application
3. Note your Client ID, Client Secret, and Redirect URI
Examples:
# Just match and save results
python matcher.py watchlist.json
# Match and update AniList (OAuth flow)
python matcher.py watchlist.json --update --client-id YOUR_CLIENT_ID --client-secret YOUR_SECRET --redirect-uri YOUR_REDIRECT_URI
# Use saved token from previous OAuth
python matcher.py watchlist.json --update --use-saved-token
Note: AniList API is currently rate-limited to 30 requests/minute.
Large lists will take significant time to process.
""",
)
parser.add_argument("input_file", help="Path to Crunchyroll watchlist JSON file")
parser.add_argument(
"-o",
"--output",
default="crunchyroll_anilist_matches.json",
help="Output file for results (default: crunchyroll_anilist_matches.json)",
)
parser.add_argument(
"--matched", help="Optional: Save matched entries to separate file"
)
parser.add_argument(
"--failed", help="Optional: Save failed matches to separate file"
)
parser.add_argument(
"--update",
action="store_true",
help="Update AniList with matched entries (requires OAuth)",
)
parser.add_argument("--client-id", help="AniList OAuth client ID")
parser.add_argument("--client-secret", help="AniList OAuth client secret")
parser.add_argument("--redirect-uri", help="AniList OAuth redirect URI")
parser.add_argument(
"--use-saved-token",
action="store_true",
help="Use previously saved access token from .anilist_token file",
)
parser.add_argument(
"--status",
choices=["CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"],
default="PLANNING",
help="Status to set for added entries (default: PLANNING)",
)
args = parser.parse_args()
# Initialize matcher
matcher = CrunchyrollAniListMatcher()
# Handle authentication if updating
if args.update:
access_token = None
# Try to use saved token first
if args.use_saved_token:
access_token = matcher.load_token_from_file()
if access_token:
print("✓ Using saved access token")
else:
print("No saved token found")
# If no saved token, do OAuth flow
if not access_token:
if not all([args.client_id, args.client_secret, args.redirect_uri]):
print("Error: OAuth credentials required for updating AniList.")
print("Provide --client-id, --client-secret, and --redirect-uri")
print("Or use --use-saved-token if you have a saved token")
print(
"\nGet your OAuth credentials at: https://anilist.co/settings/developer"
)
sys.exit(1)
print("Starting OAuth authentication flow...")
access_token = matcher.get_oauth_token(
args.client_id, args.client_secret, args.redirect_uri
)
if not access_token:
print("Failed to obtain access token")
sys.exit(1)
matcher.access_token = access_token
# Load Crunchyroll data
print(f"\nLoading Crunchyroll data from: {args.input_file}")
cr_watchlist = load_crunchyroll_data(args.input_file)
# Process data
results = matcher.process_crunchyroll_data(
cr_watchlist, update_anilist=args.update, list_status=args.status
)
# Save main results
matcher.save_results(results, args.output)
# Save separate files if requested
if args.matched:
with open(args.matched, "w", encoding="utf-8") as f:
json.dump(results["matched"], f, indent=2, ensure_ascii=False)
print(f"Matched entries saved to: {args.matched}")
if args.failed:
with open(args.failed, "w", encoding="utf-8") as f:
json.dump(results["failed"], f, indent=2, ensure_ascii=False)
print(f"Failed matches saved to: {args.failed}")
if __name__ == "__main__":
main()
@stepbrobd
Copy link
Author

Warning

This code was generated using LLM.
I probably only spent ~10min vibe-coding this to migrate my Crunchyroll watchlist to AniList.
Use at your own risk.

  1. Export your Crunchyroll watchlist in JSON format (you can do this through the
    browser console or by getting the UUID and saving it locally).

  2. Create an AniList OAuth application:

  3. Download and prepare the script:

    • Download above script and review for security concerns
    • Install dependency: pip install requests
  4. Run the migration: If you already have an OAuth token, you can manually
    create a .anilist_token file with your token to skip the OAuth setup
    entirely. Go to step 5 directly.

    python anilist_crunchylist_import.py watchlist.json --update --client-id YOUR_CLIENT_ID --client-secret YOUR_CLIENT_SECRET --redirect-uri http://localhost:3000 --status COMPLETED
  5. For subsequent runs: Use the --use-saved-token flag to skip the OAuth flow
    since your token will be saved to .anilist_token after the first successful
    authentication.

    python anilist_crunchylist_import.py watchlist.json --update --use-saved-token --status COMPLETED
  6. Check the code for other usages, like import to another AniList list, etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment