|
import os |
|
import csv |
|
import json |
|
import requests |
|
import pandas as pd |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
from collections import Counter |
|
from datetime import datetime |
|
import time |
|
import webbrowser |
|
import dotenv |
|
import warnings |
|
|
|
# Suppress warnings |
|
warnings.filterwarnings("ignore") |
|
|
|
# Set better visual style |
|
sns.set_style("whitegrid") |
|
plt.rcParams["figure.figsize"] = (12, 8) |
|
plt.rcParams["font.size"] = 12 |
|
|
|
# Load environment variables from .env file |
|
dotenv.load_dotenv() |
|
|
|
|
|
class TMDBAnalyzer: |
|
def __init__( |
|
self, |
|
api_key, |
|
base_url, |
|
output_dir, |
|
max_requests, |
|
request_window, |
|
top_n_companies, |
|
top_n_directors, |
|
top_n_actors, |
|
high_rating_threshold, |
|
embeddings_model, |
|
): |
|
# API Configuration |
|
self.api_key = api_key |
|
self.base_url = base_url |
|
|
|
# Rate limiting configuration |
|
self.max_requests = max_requests |
|
self.request_window = request_window |
|
self.request_timestamps = [] |
|
|
|
# Directory structure |
|
self.output_dir = output_dir |
|
self.dl_dir = os.path.join(output_dir, "deep_learning") |
|
os.makedirs(output_dir, exist_ok=True) |
|
os.makedirs(self.dl_dir, exist_ok=True) |
|
|
|
# File paths |
|
self.ratings_csv = os.path.join(output_dir, "tmdb_ratings_with_metadata.csv") |
|
self.stats_csv = os.path.join(output_dir, "tmdb_stats.csv") |
|
self.auth_file = os.path.join(output_dir, "auth_data.json") |
|
|
|
# Analysis parameters |
|
self.top_n_companies = top_n_companies |
|
self.top_n_directors = top_n_directors |
|
self.top_n_actors = top_n_actors |
|
self.high_rating_threshold = high_rating_threshold |
|
self.embeddings_model = embeddings_model |
|
|
|
def make_api_request(self, url, params=None, method="GET", data=None): |
|
"""Make an API request with rate limiting.""" |
|
# Check if we need to throttle requests |
|
current_time = time.time() |
|
self.request_timestamps = [ |
|
t for t in self.request_timestamps if current_time - t < self.request_window |
|
] |
|
|
|
if len(self.request_timestamps) >= self.max_requests: |
|
# We've hit the rate limit, wait until the oldest request falls out of the window |
|
sleep_time = ( |
|
self.request_window - (current_time - self.request_timestamps[0]) + 0.1 |
|
) |
|
print(f"Rate limit reached. Waiting {sleep_time:.2f} seconds...") |
|
time.sleep(sleep_time) |
|
# Update current time after waiting |
|
current_time = time.time() |
|
# Clean up timestamps again |
|
self.request_timestamps = [ |
|
t |
|
for t in self.request_timestamps |
|
if current_time - t < self.request_window |
|
] |
|
|
|
# Add current request timestamp |
|
self.request_timestamps.append(current_time) |
|
|
|
# Make the request |
|
if method.upper() == "GET": |
|
response = requests.get(url, params=params) |
|
elif method.upper() == "POST": |
|
response = requests.post(url, params=params, json=data) |
|
else: |
|
raise ValueError(f"Unsupported HTTP method: {method}") |
|
|
|
# Handle potential 429 Too Many Requests |
|
if response.status_code == 429: |
|
retry_after = int(response.headers.get("Retry-After", 1)) |
|
print(f"Rate limit exceeded. Retrying after {retry_after} seconds...") |
|
time.sleep(retry_after) |
|
# Try the request again |
|
return self.make_api_request(url, params, method, data) |
|
|
|
return response |
|
|
|
def setup_authentication(self): |
|
"""Set up authentication and get session ID and account ID.""" |
|
# Check if we already have auth data |
|
if os.path.exists(self.auth_file): |
|
try: |
|
with open(self.auth_file, "r") as f: |
|
auth_data = json.load(f) |
|
print("Found existing authentication data.") |
|
|
|
# Verify the session is still valid |
|
account_url = f"{self.base_url}/account" |
|
params = { |
|
"api_key": self.api_key, |
|
"session_id": auth_data["session_id"], |
|
} |
|
response = self.make_api_request(account_url, params) |
|
|
|
if response.status_code == 200: |
|
print("Session is valid.") |
|
return auth_data["session_id"], auth_data["account_id"] |
|
else: |
|
print("Session has expired. Creating a new one...") |
|
except Exception as e: |
|
print(f"Error reading auth file: {e}") |
|
print("Creating new authentication...") |
|
|
|
# Step 1: Create a request token |
|
token_url = f"{self.base_url}/authentication/token/new" |
|
params = {"api_key": self.api_key} |
|
response = self.make_api_request(token_url, params) |
|
|
|
if response.status_code != 200: |
|
raise Exception( |
|
f"Failed to get request token: {response.status_code} - {response.text}" |
|
) |
|
|
|
token_data = response.json() |
|
request_token = token_data["request_token"] |
|
|
|
# Step 2: Get the user to approve the request token |
|
auth_url = f"https://www.themoviedb.org/authenticate/{request_token}" |
|
print(f"\nPlease open this URL in your browser and approve the request:") |
|
print(auth_url) |
|
|
|
# Try to open the browser automatically |
|
webbrowser.open(auth_url) |
|
|
|
input("\nAfter approving, press Enter to continue...\n") |
|
|
|
# Step 3: Create a session ID with the approved request token |
|
session_url = f"{self.base_url}/authentication/session/new" |
|
params = {"api_key": self.api_key} |
|
data = {"request_token": request_token} |
|
response = self.make_api_request(session_url, params, method="POST", data=data) |
|
|
|
if response.status_code != 200: |
|
raise Exception( |
|
f"Failed to create session: {response.status_code} - {response.text}" |
|
) |
|
|
|
session_data = response.json() |
|
session_id = session_data["session_id"] |
|
|
|
# Step 4: Get account details |
|
account_url = f"{self.base_url}/account" |
|
params = {"api_key": self.api_key, "session_id": session_id} |
|
response = self.make_api_request(account_url, params) |
|
|
|
if response.status_code != 200: |
|
raise Exception( |
|
f"Failed to get account details: {response.status_code} - {response.text}" |
|
) |
|
|
|
account_data = response.json() |
|
account_id = account_data["id"] |
|
|
|
# Save the authentication data |
|
auth_data = {"session_id": session_id, "account_id": account_id} |
|
|
|
with open(self.auth_file, "w") as f: |
|
json.dump(auth_data, f) |
|
|
|
print(f"Authentication successful! Account ID: {account_id}") |
|
return session_id, account_id |
|
|
|
def get_ratings(self, media_type, account_id, session_id): |
|
"""Get rated movies or TV shows.""" |
|
all_ratings = [] |
|
page = 1 |
|
total_pages = 1 |
|
|
|
while page <= total_pages: |
|
url = f"{self.base_url}/account/{account_id}/rated/{media_type}" |
|
params = { |
|
"api_key": self.api_key, |
|
"session_id": session_id, |
|
"language": "en-US", |
|
"sort_by": "created_at.desc", |
|
"page": page, |
|
} |
|
|
|
response = self.make_api_request(url, params) |
|
if response.status_code == 200: |
|
data = response.json() |
|
all_ratings.extend(data["results"]) |
|
total_pages = data["total_pages"] |
|
page += 1 |
|
else: |
|
print( |
|
f"Error fetching {media_type} ratings page {page}: {response.status_code}" |
|
) |
|
break |
|
|
|
return all_ratings |
|
|
|
def get_watchlist(self, account_id, session_id): |
|
"""Get movies and TV shows from user's watchlist.""" |
|
movie_watchlist = [] |
|
tv_watchlist = [] |
|
|
|
# Fetch movies in watchlist |
|
print("Fetching movies in watchlist...") |
|
page = 1 |
|
total_pages = 1 |
|
|
|
while page <= total_pages: |
|
url = f"{self.base_url}/account/{account_id}/watchlist/movies" |
|
params = { |
|
"api_key": self.api_key, |
|
"session_id": session_id, |
|
"language": "en-US", |
|
"sort_by": "created_at.desc", |
|
"page": page, |
|
} |
|
|
|
response = self.make_api_request(url, params) |
|
if response.status_code == 200: |
|
data = response.json() |
|
movie_watchlist.extend(data["results"]) |
|
total_pages = data["total_pages"] |
|
page += 1 |
|
else: |
|
print( |
|
f"Error fetching movie watchlist page {page}: {response.status_code}" |
|
) |
|
break |
|
|
|
# Fetch TV shows in watchlist |
|
print("Fetching TV shows in watchlist...") |
|
page = 1 |
|
total_pages = 1 |
|
|
|
while page <= total_pages: |
|
url = f"{self.base_url}/account/{account_id}/watchlist/tv" |
|
params = { |
|
"api_key": self.api_key, |
|
"session_id": session_id, |
|
"language": "en-US", |
|
"sort_by": "created_at.desc", |
|
"page": page, |
|
} |
|
|
|
response = self.make_api_request(url, params) |
|
if response.status_code == 200: |
|
data = response.json() |
|
tv_watchlist.extend(data["results"]) |
|
total_pages = data["total_pages"] |
|
page += 1 |
|
else: |
|
print( |
|
f"Error fetching TV watchlist page {page}: {response.status_code}" |
|
) |
|
break |
|
|
|
return {"movies": movie_watchlist, "tv": tv_watchlist} |
|
|
|
def get_details(self, media_type, item_id): |
|
"""Get additional details for a movie or TV show.""" |
|
url = f"{self.base_url}/{media_type}/{item_id}" |
|
params = { |
|
"api_key": self.api_key, |
|
"language": "en-US", |
|
"append_to_response": "credits,keywords,release_dates", |
|
} |
|
|
|
response = self.make_api_request(url, params) |
|
if response.status_code == 200: |
|
return response.json() |
|
else: |
|
print( |
|
f"Error fetching details for {media_type} ID {item_id}: {response.status_code}" |
|
) |
|
return None |
|
|
|
def extract_metadata(self, item, details, media_type): |
|
"""Extract relevant metadata from API response.""" |
|
metadata = { |
|
"id": item["id"], |
|
"title": item.get("title", item.get("name", "Unknown")), |
|
"media_type": media_type, |
|
"rating": item["rating"], |
|
"rated_at": item.get("rated_at", "Unknown"), |
|
"release_date": details.get( |
|
"release_date", details.get("first_air_date", "Unknown") |
|
), |
|
"genres": ", ".join([genre["name"] for genre in details.get("genres", [])]), |
|
"runtime": details.get( |
|
"runtime", |
|
( |
|
details.get("episode_run_time", [0])[0] |
|
if details.get("episode_run_time") |
|
else 0 |
|
), |
|
), |
|
"vote_average": details.get("vote_average", 0), |
|
"vote_count": details.get("vote_count", 0), |
|
"popularity": details.get("popularity", 0), |
|
"production_companies": ", ".join( |
|
[company["name"] for company in details.get("production_companies", [])] |
|
), |
|
"original_language": details.get("original_language", "Unknown"), |
|
"overview": details.get("overview", ""), |
|
} |
|
|
|
# Get director(s) for movies or creators for TV shows |
|
if media_type == "movies": |
|
directors = [ |
|
crew["name"] |
|
for crew in details.get("credits", {}).get("crew", []) |
|
if crew["job"] == "Director" |
|
] |
|
metadata["director"] = ", ".join(directors) |
|
else: |
|
creators = [creator["name"] for creator in details.get("created_by", [])] |
|
metadata["creator"] = ", ".join(creators) |
|
|
|
# Get top cast |
|
cast = details.get("credits", {}).get("cast", []) |
|
top_cast = [actor["name"] for actor in cast[:20]] # Increased from 5 to 20 |
|
metadata["top_cast"] = ", ".join(top_cast) |
|
|
|
# Get keywords |
|
keywords = ( |
|
details.get("keywords", {}).get("keywords", []) |
|
if media_type == "movies" |
|
else details.get("keywords", {}).get("results", []) |
|
) |
|
metadata["keywords"] = ", ".join([kw["name"] for kw in keywords]) |
|
|
|
return metadata |
|
|
|
def extract_watchlist_metadata(self, item, details, media_type): |
|
"""Extract metadata for watchlist items.""" |
|
metadata = { |
|
"id": item["id"], |
|
"title": item.get("title", item.get("name", "Unknown")), |
|
"media_type": media_type, |
|
"in_watchlist": True, |
|
"release_date": details.get( |
|
"release_date", details.get("first_air_date", "Unknown") |
|
), |
|
"genres": ", ".join([genre["name"] for genre in details.get("genres", [])]), |
|
"runtime": details.get( |
|
"runtime", |
|
( |
|
details.get("episode_run_time", [0])[0] |
|
if details.get("episode_run_time") |
|
else 0 |
|
), |
|
), |
|
"vote_average": details.get("vote_average", 0), |
|
"vote_count": details.get("vote_count", 0), |
|
"popularity": details.get("popularity", 0), |
|
"production_companies": ", ".join( |
|
[company["name"] for company in details.get("production_companies", [])] |
|
), |
|
"original_language": details.get("original_language", "Unknown"), |
|
"overview": details.get("overview", ""), |
|
} |
|
|
|
# Get director(s) for movies or creators for TV shows |
|
if media_type == "movies": |
|
directors = [ |
|
crew["name"] |
|
for crew in details.get("credits", {}).get("crew", []) |
|
if crew["job"] == "Director" |
|
] |
|
metadata["director"] = ", ".join(directors) |
|
else: |
|
creators = [creator["name"] for creator in details.get("created_by", [])] |
|
metadata["creator"] = ", ".join(creators) |
|
|
|
# Get top cast |
|
cast = details.get("credits", {}).get("cast", []) |
|
top_cast = [actor["name"] for actor in cast[:20]] # Increased from 5 to 20 |
|
metadata["top_cast"] = ", ".join(top_cast) |
|
|
|
# Get keywords |
|
keywords = ( |
|
details.get("keywords", {}).get("keywords", []) |
|
if media_type == "movies" |
|
else details.get("keywords", {}).get("results", []) |
|
) |
|
metadata["keywords"] = ", ".join([kw["name"] for kw in keywords]) |
|
|
|
return metadata |
|
|
|
def process_watchlist(self, watchlist): |
|
"""Process watchlist items to get full details.""" |
|
watchlist_metadata = [] |
|
|
|
print("Fetching additional metadata for movies in watchlist...") |
|
for i, movie in enumerate(watchlist["movies"]): |
|
print( |
|
f"Processing watchlist movie {i+1}/{len(watchlist['movies'])}: {movie.get('title', movie.get('name', 'Unknown'))}" |
|
) |
|
details = self.get_details("movie", movie["id"]) |
|
if details: |
|
metadata = self.extract_watchlist_metadata(movie, details, "movies") |
|
watchlist_metadata.append(metadata) |
|
|
|
print("Fetching additional metadata for TV shows in watchlist...") |
|
for i, tv in enumerate(watchlist["tv"]): |
|
print( |
|
f"Processing watchlist TV show {i+1}/{len(watchlist['tv'])}: {tv.get('name', tv.get('title', 'Unknown'))}" |
|
) |
|
details = self.get_details("tv", tv["id"]) |
|
if details: |
|
metadata = self.extract_watchlist_metadata(tv, details, "tv") |
|
watchlist_metadata.append(metadata) |
|
|
|
return watchlist_metadata |
|
|
|
def generate_statistics(self, data_df): |
|
"""Generate statistics from the data.""" |
|
stats = [] |
|
|
|
# Overall statistics |
|
stats.append(("Total Rated Items", len(data_df))) |
|
stats.append(("Average Rating", round(data_df["rating"].mean(), 2))) |
|
stats.append(("Median Rating", data_df["rating"].median())) |
|
|
|
# Ratings distribution |
|
rating_counts = data_df["rating"].value_counts().sort_index() |
|
stats.append(("Rating Distribution", dict(rating_counts))) |
|
|
|
# Media type breakdown |
|
media_counts = data_df["media_type"].value_counts() |
|
stats.append(("Media Type Counts", dict(media_counts))) |
|
|
|
# Top genres |
|
all_genres = [] |
|
for genres_str in data_df["genres"]: |
|
genres = [genre.strip() for genre in genres_str.split(",")] |
|
all_genres.extend(genres) |
|
|
|
genre_counts = Counter(all_genres) |
|
top_genres = dict(genre_counts.most_common(15)) # Increased from 10 to 15 |
|
stats.append(("Top 15 Genres", top_genres)) |
|
|
|
# Average rating by genre |
|
genre_ratings = {} |
|
for genre in set(all_genres): |
|
genre_mask = data_df["genres"].str.contains(genre) |
|
avg_rating = round(data_df.loc[genre_mask, "rating"].mean(), 2) |
|
genre_ratings[genre] = avg_rating |
|
|
|
stats.append( |
|
( |
|
"Average Rating by Genre", |
|
dict(sorted(genre_ratings.items(), key=lambda x: x[1], reverse=True)), |
|
) |
|
) |
|
|
|
# Top directors/creators |
|
director_col = "director" if "director" in data_df.columns else "creator" |
|
all_directors = [] |
|
for directors_str in data_df[director_col].dropna(): |
|
directors = [director.strip() for director in directors_str.split(",")] |
|
all_directors.extend(directors) |
|
|
|
director_counts = Counter(all_directors) |
|
top_directors = dict( |
|
director_counts.most_common(self.top_n_directors) |
|
) # Using parameter |
|
stats.append( |
|
(f"Top {self.top_n_directors} {director_col.capitalize()}s", top_directors) |
|
) |
|
|
|
# Top production companies |
|
all_companies = [] |
|
for companies_str in data_df["production_companies"].dropna(): |
|
companies = [company.strip() for company in companies_str.split(",")] |
|
all_companies.extend(companies) |
|
|
|
company_counts = Counter(all_companies) |
|
top_companies = dict( |
|
company_counts.most_common(self.top_n_companies) |
|
) # Using parameter |
|
stats.append( |
|
(f"Top {self.top_n_companies} Production Companies", top_companies) |
|
) |
|
|
|
# Average rating by production company |
|
company_ratings = {} |
|
for company in dict(company_counts.most_common(self.top_n_companies)): |
|
company_mask = data_df["production_companies"].str.contains( |
|
company, na=False |
|
) |
|
if company_mask.sum() > 0: # Make sure we have matches |
|
avg_rating = round(data_df.loc[company_mask, "rating"].mean(), 2) |
|
company_ratings[company] = avg_rating |
|
|
|
stats.append( |
|
( |
|
"Average Rating by Top Production Companies", |
|
dict(sorted(company_ratings.items(), key=lambda x: x[1], reverse=True)), |
|
) |
|
) |
|
|
|
# Top actors |
|
all_actors = [] |
|
for cast_str in data_df["top_cast"].dropna(): |
|
actors = [actor.strip() for actor in cast_str.split(",")] |
|
all_actors.extend(actors) |
|
|
|
actor_counts = Counter(all_actors) |
|
top_actors = dict( |
|
actor_counts.most_common(self.top_n_actors) |
|
) # Using parameter |
|
stats.append((f"Top {self.top_n_actors} Actors", top_actors)) |
|
|
|
# Average rating by top actors |
|
actor_ratings = {} |
|
for actor in dict(actor_counts.most_common(self.top_n_actors)): |
|
actor_mask = data_df["top_cast"].str.contains(actor, na=False) |
|
if actor_mask.sum() > 0: # Make sure we have matches |
|
avg_rating = round(data_df.loc[actor_mask, "rating"].mean(), 2) |
|
num_appearances = actor_mask.sum() |
|
actor_ratings[f"{actor} ({num_appearances})"] = avg_rating |
|
|
|
stats.append( |
|
( |
|
"Average Rating by Top Actors", |
|
dict(sorted(actor_ratings.items(), key=lambda x: x[1], reverse=True)), |
|
) |
|
) |
|
|
|
# Ratings by year |
|
data_df["year"] = pd.to_datetime( |
|
data_df["release_date"], errors="coerce" |
|
).dt.year |
|
year_ratings = data_df.groupby("year")["rating"].mean().round(2) |
|
stats.append(("Average Rating by Year", dict(year_ratings.sort_index()))) |
|
|
|
# Your rating vs TMDB rating |
|
data_df["rating_diff"] = data_df["rating"] - data_df["vote_average"] |
|
avg_diff = round(data_df["rating_diff"].mean(), 2) |
|
stats.append(("Average Difference from TMDB Rating", avg_diff)) |
|
|
|
return stats |
|
|
|
def display_statistics(self, stats_list, df): |
|
"""Display statistics and create visualizations.""" |
|
print("\n===== TMDB RATINGS ANALYSIS =====\n") |
|
|
|
for title, data in stats_list: |
|
print(f"\n----- {title} -----") |
|
if isinstance(data, dict): |
|
for key, value in data.items(): |
|
print(f"{key}: {value}") |
|
else: |
|
print(data) |
|
|
|
# Create visualizations |
|
|
|
# 1. Rating distribution |
|
plt.figure(figsize=(10, 6)) |
|
sns.countplot(x="rating", data=df, palette="viridis") |
|
plt.title("Distribution of Your Ratings", fontsize=16) |
|
plt.xlabel("Rating", fontsize=14) |
|
plt.ylabel("Number of Items", fontsize=14) |
|
plt.tight_layout() |
|
plt.savefig(os.path.join(self.output_dir, "rating_distribution.png")) |
|
|
|
# 2. Rating by media type |
|
plt.figure(figsize=(10, 6)) |
|
sns.barplot(x="media_type", y="rating", data=df, palette="Set2") |
|
plt.title("Average Rating by Media Type", fontsize=16) |
|
plt.xlabel("Media Type", fontsize=14) |
|
plt.ylabel("Average Rating", fontsize=14) |
|
plt.tight_layout() |
|
plt.savefig(os.path.join(self.output_dir, "rating_by_media_type.png")) |
|
|
|
# 3. Your ratings vs TMDB ratings |
|
plt.figure(figsize=(10, 6)) |
|
plt.scatter(df["vote_average"], df["rating"], alpha=0.6, s=50) |
|
plt.plot([0, 10], [0, 10], "r--") # Diagonal line for reference |
|
plt.title("Your Ratings vs TMDB Ratings", fontsize=16) |
|
plt.xlabel("TMDB Rating", fontsize=14) |
|
plt.ylabel("Your Rating", fontsize=14) |
|
plt.grid(True, alpha=0.3) |
|
plt.tight_layout() |
|
plt.savefig(os.path.join(self.output_dir, "your_vs_tmdb_ratings.png")) |
|
|
|
# 4. Top production companies |
|
plt.figure(figsize=(12, 8)) |
|
all_companies = [] |
|
for companies_str in df["production_companies"].dropna(): |
|
companies = [company.strip() for company in companies_str.split(",")] |
|
all_companies.extend(companies) |
|
|
|
company_counts = Counter(all_companies) |
|
top_companies = dict(company_counts.most_common(self.top_n_companies)) |
|
|
|
company_df = pd.DataFrame( |
|
{ |
|
"company": list(top_companies.keys()), |
|
"count": list(top_companies.values()), |
|
} |
|
) |
|
|
|
sns.barplot(x="count", y="company", data=company_df, palette="Reds_r") |
|
plt.title(f"Top {self.top_n_companies} Production Companies", fontsize=16) |
|
plt.xlabel("Number of Rated Titles", fontsize=14) |
|
plt.ylabel("Production Company", fontsize=14) |
|
plt.tight_layout() |
|
plt.savefig(os.path.join(self.output_dir, "top_production_companies.png")) |
|
|
|
# 5. Top actors |
|
plt.figure(figsize=(12, 8)) |
|
all_actors = [] |
|
for cast_str in df["top_cast"].dropna(): |
|
actors = [actor.strip() for actor in cast_str.split(",")] |
|
all_actors.extend(actors) |
|
|
|
actor_counts = Counter(all_actors) |
|
top_actors = dict(actor_counts.most_common(self.top_n_actors)) |
|
|
|
actor_df = pd.DataFrame( |
|
{"actor": list(top_actors.keys()), "count": list(top_actors.values())} |
|
) |
|
|
|
sns.barplot(x="count", y="actor", data=actor_df, palette="Blues_r") |
|
plt.title(f"Top {self.top_n_actors} Actors in Your Rated Titles", fontsize=16) |
|
plt.xlabel("Number of Appearances", fontsize=14) |
|
plt.ylabel("Actor", fontsize=14) |
|
plt.tight_layout() |
|
plt.savefig(os.path.join(self.output_dir, "top_actors.png")) |
|
|
|
print(f"\nVisualizations saved to {self.output_dir} directory") |
|
|
|
def save_statistics_to_csv(self, stats_list, filename): |
|
"""Save statistics to CSV.""" |
|
with open(filename, "w", newline="", encoding="utf-8") as csvfile: |
|
writer = csv.writer(csvfile) |
|
writer.writerow(["Statistic", "Value"]) |
|
|
|
for title, data in stats_list: |
|
if isinstance(data, dict): |
|
for key, value in data.items(): |
|
writer.writerow([f"{title} - {key}", value]) |
|
else: |
|
writer.writerow([title, data]) |
|
|
|
print(f"Statistics saved to {filename}") |
|
|
|
def find_rating_anomalies(self, df): |
|
"""Find titles that you rated very differently from the TMDB average.""" |
|
print("Identifying rating anomalies...") |
|
|
|
# Calculate the difference between your rating and TMDB rating |
|
df["rating_diff"] = df["rating"] - df["vote_average"] |
|
|
|
# Find titles you liked much more than average (positive surprise) |
|
positive_anomalies = df[df["rating_diff"] > 2].sort_values( |
|
"rating_diff", ascending=False |
|
) |
|
|
|
# Find titles you liked much less than average (negative surprise) |
|
negative_anomalies = df[df["rating_diff"] < -2].sort_values("rating_diff") |
|
|
|
# Combine anomalies |
|
anomalies = pd.concat( |
|
[ |
|
positive_anomalies[ |
|
["title", "rating", "vote_average", "rating_diff", "genres"] |
|
].head(10), |
|
negative_anomalies[ |
|
["title", "rating", "vote_average", "rating_diff", "genres"] |
|
].head(10), |
|
] |
|
) |
|
|
|
# Save anomalies to CSV |
|
anomalies.to_csv( |
|
os.path.join(self.output_dir, "rating_anomalies.csv"), index=False |
|
) |
|
|
|
# Visualize rating anomalies |
|
plt.figure(figsize=(12, 10)) |
|
|
|
anomalies_plot = pd.concat( |
|
[ |
|
positive_anomalies[["title", "rating_diff"]].head(7), |
|
negative_anomalies[["title", "rating_diff"]].head(7), |
|
] |
|
) |
|
|
|
# Shorten long titles for display |
|
anomalies_plot["title"] = anomalies_plot["title"].apply( |
|
lambda x: x[:30] + "..." if len(x) > 30 else x |
|
) |
|
|
|
# Sort by rating difference for better visualization |
|
anomalies_plot = anomalies_plot.sort_values("rating_diff") |
|
|
|
# Plot with a colormap based on the rating difference |
|
bars = sns.barplot( |
|
x="rating_diff", |
|
y="title", |
|
data=anomalies_plot, |
|
palette=sns.color_palette("RdBu_r", len(anomalies_plot)), |
|
) |
|
|
|
# Add labels to bars |
|
for i, p in enumerate(bars.patches): |
|
diff = anomalies_plot.iloc[i]["rating_diff"] |
|
if p.get_width() < 0: |
|
bars.text( |
|
p.get_width() - 0.5, |
|
p.get_y() + p.get_height() / 2, |
|
f"{diff:.1f}", |
|
ha="right", |
|
va="center", |
|
color="white", |
|
fontweight="bold", |
|
) |
|
else: |
|
bars.text( |
|
p.get_width() + 0.1, |
|
p.get_y() + p.get_height() / 2, |
|
f"+{diff:.1f}", |
|
ha="left", |
|
va="center", |
|
fontweight="bold", |
|
) |
|
|
|
plt.axvline(x=0, color="black", linestyle="-", alpha=0.3) |
|
plt.title( |
|
"Movies/Shows You Rated Very Differently Than TMDB Average", fontsize=16 |
|
) |
|
plt.xlabel("Your Rating - TMDB Rating", fontsize=14) |
|
plt.ylabel("Title", fontsize=14) |
|
plt.tight_layout() |
|
plt.savefig(os.path.join(self.output_dir, "rating_anomalies.png")) |
|
|
|
return anomalies |
|
|
|
def analyze_rating_trends(self, df): |
|
"""Analyze how your ratings have changed over time.""" |
|
print("Analyzing your rating trends over time...") |
|
|
|
# Make sure we have rated_at as datetime |
|
if "rated_at" in df.columns: |
|
df["rated_at"] = pd.to_datetime(df["rated_at"], errors="coerce") |
|
|
|
# Extract components from the rating date |
|
df["rate_year"] = df["rated_at"].dt.year |
|
df["rate_month"] = df["rated_at"].dt.month |
|
|
|
# Calculate average rating by month |
|
monthly_ratings = ( |
|
df.groupby(["rate_year", "rate_month"])["rating"] |
|
.agg(["mean", "count"]) |
|
.reset_index() |
|
) |
|
monthly_ratings["date"] = pd.to_datetime( |
|
monthly_ratings["rate_year"].astype(str) |
|
+ "-" |
|
+ monthly_ratings["rate_month"].astype(str) |
|
+ "-01" |
|
) |
|
monthly_ratings = monthly_ratings.sort_values("date") |
|
|
|
# Calculate rolling average for smoother trend |
|
monthly_ratings["rolling_avg"] = ( |
|
monthly_ratings["mean"].rolling(window=3, min_periods=1).mean() |
|
) |
|
|
|
# Save to CSV |
|
monthly_ratings.to_csv( |
|
os.path.join(self.output_dir, "rating_trends.csv"), index=False |
|
) |
|
|
|
# Plot rating trends |
|
plt.figure(figsize=(14, 8)) |
|
|
|
# Plot average rating by month |
|
ax1 = plt.subplot(111) |
|
ax1.plot( |
|
monthly_ratings["date"], |
|
monthly_ratings["mean"], |
|
marker="o", |
|
linestyle="-", |
|
color="#3498db", |
|
alpha=0.7, |
|
label="Monthly Average", |
|
) |
|
ax1.plot( |
|
monthly_ratings["date"], |
|
monthly_ratings["rolling_avg"], |
|
linestyle="-", |
|
color="#e74c3c", |
|
linewidth=3, |
|
label="3-Month Rolling Average", |
|
) |
|
|
|
# Plot number of ratings as bars |
|
ax2 = ax1.twinx() |
|
ax2.bar( |
|
monthly_ratings["date"], |
|
monthly_ratings["count"], |
|
alpha=0.2, |
|
color="gray", |
|
label="Number of Ratings", |
|
) |
|
|
|
# Set labels and title |
|
ax1.set_xlabel("Date", fontsize=14) |
|
ax1.set_ylabel("Average Rating", fontsize=14) |
|
ax2.set_ylabel("Number of Ratings", fontsize=14) |
|
plt.title("Your Rating Trends Over Time", fontsize=16) |
|
|
|
# Add legends |
|
lines1, labels1 = ax1.get_legend_handles_labels() |
|
lines2, labels2 = ax2.get_legend_handles_labels() |
|
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left") |
|
|
|
plt.tight_layout() |
|
plt.savefig(os.path.join(self.output_dir, "rating_trends.png")) |
|
|
|
return monthly_ratings |
|
else: |
|
print("Rating date information not available. Skipping trend analysis.") |
|
return None |
|
|
|
def prepare_content_text_with_description(self, data_df): |
|
"""Prepare content text with heavy description weighting for semantic analysis.""" |
|
content_texts = [] |
|
|
|
for _, row in data_df.iterrows(): |
|
# Start with base text |
|
text_parts = [] |
|
|
|
# Add title |
|
if pd.notna(row.get("title")): |
|
text_parts.append(row["title"]) |
|
|
|
# Add overview/description with emphasis (repeat 3x for higher weight) |
|
if pd.notna(row.get("overview")): |
|
# Add description 3 times to increase its influence |
|
text_parts.append(row["overview"]) |
|
text_parts.append(row["overview"]) |
|
text_parts.append(row["overview"]) |
|
|
|
# Add genres |
|
if pd.notna(row.get("genres")): |
|
text_parts.append(row["genres"]) |
|
|
|
# Add keywords |
|
if pd.notna(row.get("keywords")): |
|
text_parts.append(row["keywords"]) |
|
|
|
# Add director/creator |
|
if "director" in row and pd.notna(row.get("director")): |
|
text_parts.append(row["director"]) |
|
elif "creator" in row and pd.notna(row.get("creator")): |
|
text_parts.append(row["creator"]) |
|
|
|
# Add cast |
|
if pd.notna(row.get("top_cast")): |
|
text_parts.append(row["top_cast"]) |
|
|
|
# Combine all parts |
|
content_texts.append(" ".join(text_parts)) |
|
|
|
return content_texts |
|
|
|
def run_deep_learning_analyses(self, df, watchlist_df=None, feature_weights=None): |
|
"""Run deep learning based analyses.""" |
|
print("\n===== DEEP LEARNING ANALYSIS =====\n") |
|
|
|
try: |
|
from sentence_transformers import SentenceTransformer |
|
except ImportError: |
|
print("Installing Sentence Transformers...") |
|
import subprocess |
|
|
|
subprocess.check_call(["pip", "install", "sentence-transformers"]) |
|
from sentence_transformers import SentenceTransformer |
|
|
|
try: |
|
from bertopic import BERTopic |
|
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS |
|
import nltk |
|
import re |
|
|
|
# Download NLTK stop words if not already available |
|
try: |
|
nltk.data.find("corpora/stopwords") |
|
except LookupError: |
|
nltk.download("stopwords") |
|
|
|
from nltk.corpus import stopwords |
|
|
|
# Create a comprehensive stop words list |
|
custom_stop_words = set( |
|
list(ENGLISH_STOP_WORDS) |
|
+ list(stopwords.words("english")) |
|
+ [ |
|
"movie", |
|
"film", |
|
"show", |
|
"character", |
|
"story", |
|
"watch", |
|
"like", |
|
"good", |
|
"great", |
|
"bad", |
|
"best", |
|
"better", |
|
"worse", |
|
"she", |
|
"he", |
|
"they", |
|
"it", |
|
"this", |
|
"that", |
|
"these", |
|
"those", |
|
"a", |
|
"an", |
|
"the", |
|
"and", |
|
"but", |
|
"if", |
|
"or", |
|
"because", |
|
"as", |
|
"until", |
|
"while", |
|
"of", |
|
"at", |
|
"by", |
|
"for", |
|
"with", |
|
"about", |
|
"against", |
|
"between", |
|
"into", |
|
"through", |
|
"during", |
|
"before", |
|
"after", |
|
"above", |
|
"below", |
|
"to", |
|
"from", |
|
"up", |
|
"down", |
|
"in", |
|
"out", |
|
"on", |
|
"off", |
|
"over", |
|
"under", |
|
"again", |
|
"further", |
|
"then", |
|
"once", |
|
"here", |
|
"there", |
|
"when", |
|
"where", |
|
"why", |
|
"how", |
|
"all", |
|
"any", |
|
"both", |
|
"each", |
|
"few", |
|
"more", |
|
"most", |
|
"other", |
|
"some", |
|
"such", |
|
"no", |
|
"nor", |
|
"not", |
|
"only", |
|
"own", |
|
"same", |
|
"so", |
|
"than", |
|
"too", |
|
"very", |
|
"s", |
|
"t", |
|
"can", |
|
"will", |
|
"just", |
|
"don", |
|
"should", |
|
"now", |
|
"m", |
|
"re", |
|
"ve", |
|
"y", |
|
"isn", |
|
"aren", |
|
"doesn", |
|
"didn", |
|
"hadn", |
|
"hasn", |
|
"haven", |
|
"isn", |
|
"wasn", |
|
"weren", |
|
"won", |
|
"wouldn", |
|
] |
|
) |
|
|
|
print( |
|
"\nPerforming topic modeling on descriptions with manual stop words removal..." |
|
) |
|
|
|
# Get descriptions from rated items |
|
descriptions = df["overview"].fillna("").tolist() |
|
|
|
if any(desc.strip() != "" for desc in descriptions): |
|
# Preprocess descriptions to remove stop words |
|
print("Preprocessing descriptions to remove stop words...") |
|
|
|
# Create a function to clean text |
|
def clean_text(text): |
|
# Convert to lowercase |
|
text = text.lower() |
|
|
|
# Remove special characters and digits |
|
text = re.sub(r"[^\w\s]", " ", text) |
|
text = re.sub(r"\d+", " ", text) |
|
|
|
# Tokenize |
|
words = text.split() |
|
|
|
# Remove stop words |
|
words = [word for word in words if word not in custom_stop_words] |
|
|
|
# Rejoin |
|
return " ".join(words) |
|
|
|
# Apply cleaning to descriptions |
|
cleaned_descriptions = [clean_text(desc) for desc in descriptions] |
|
|
|
# Filter out empty descriptions after cleaning |
|
cleaned_descriptions = [ |
|
desc for desc in cleaned_descriptions if desc.strip() |
|
] |
|
|
|
if cleaned_descriptions: |
|
print( |
|
f"Pre-processing complete: {len(cleaned_descriptions)} descriptions with stop words removed" |
|
) |
|
|
|
# Configure BERTopic with simpler parameters since we already cleaned the text |
|
from sklearn.feature_extraction.text import CountVectorizer |
|
|
|
# Create vectorizer that doesn't need to handle stop words (we already did that) |
|
vectorizer = CountVectorizer(min_df=2, max_df=0.85) |
|
|
|
# Create the BERTopic model with more configuration to avoid visualization issues |
|
topic_model = BERTopic( |
|
min_topic_size=max(2, min(5, len(cleaned_descriptions) // 20)), |
|
verbose=True, |
|
vectorizer_model=vectorizer, |
|
nr_topics="auto", # Let BERTopic decide the number of topics |
|
) |
|
|
|
# Fit model on cleaned descriptions |
|
topics, probs = topic_model.fit_transform(cleaned_descriptions) |
|
|
|
# Get topic info |
|
topic_info = topic_model.get_topic_info() |
|
|
|
# Save topic info |
|
topic_info.to_csv( |
|
os.path.join(self.dl_dir, "description_topics.csv"), index=False |
|
) |
|
|
|
# Print top topics |
|
print("\nMost common themes in your content based on descriptions:") |
|
for idx, row in ( |
|
topic_info[topic_info["Topic"] != -1].head(5).iterrows() |
|
): |
|
topic_words = topic_model.get_topic(row["Topic"]) |
|
# Just take the top 5 words |
|
top_words = ", ".join([word for word, _ in topic_words[:5]]) |
|
print( |
|
f"Theme {row['Topic']}: {top_words} ({row['Count']} items)" |
|
) |
|
|
|
# Create visualization if possible - with better error handling |
|
try: |
|
# Check if we have topics to visualize |
|
if len(topic_info[topic_info["Topic"] != -1]) > 0: |
|
print("Creating topic visualizations...") |
|
|
|
# Use a simpler visualization approach first |
|
# Create a manual CSV visualization instead |
|
topics_readable = [] |
|
for topic_id in set(topics): |
|
if topic_id != -1: # Skip the outlier topic |
|
words = [ |
|
word |
|
for word, _ in topic_model.get_topic(topic_id)[ |
|
:10 |
|
] |
|
] |
|
topics_readable.append( |
|
{ |
|
"Topic": topic_id, |
|
"Words": ", ".join(words), |
|
"Count": topic_info[ |
|
topic_info["Topic"] == topic_id |
|
]["Count"].values[0], |
|
} |
|
) |
|
|
|
topics_df = pd.DataFrame(topics_readable) |
|
if not topics_df.empty: |
|
topics_df = topics_df.sort_values( |
|
"Count", ascending=False |
|
) |
|
topics_df.to_csv( |
|
os.path.join(self.dl_dir, "readable_topics.csv"), |
|
index=False, |
|
) |
|
print( |
|
f"Saved readable topic list to {os.path.join(self.dl_dir, 'readable_topics.csv')}" |
|
) |
|
|
|
# Try the interactive visualizations with safeguards |
|
try: |
|
# Try to create a simple bar chart instead of the complex visualization |
|
import matplotlib.pyplot as plt |
|
|
|
# Create a bar chart of topic frequencies |
|
plt.figure(figsize=(12, 8)) |
|
topic_counts = topics_df.sort_values( |
|
"Count", ascending=True |
|
) |
|
plt.barh( |
|
topic_counts["Topic"].astype(str), |
|
topic_counts["Count"], |
|
) |
|
plt.xlabel("Number of Documents") |
|
plt.ylabel("Topic") |
|
plt.title("Topic Distribution") |
|
plt.tight_layout() |
|
plt.savefig( |
|
os.path.join(self.dl_dir, "topic_distribution.png") |
|
) |
|
|
|
# Now try the BERTopic visualizations with explicit checks |
|
if hasattr( |
|
topic_model, "visualize_topics" |
|
) and callable(topic_model.visualize_topics): |
|
fig = topic_model.visualize_topics() |
|
if fig is not None: |
|
fig.write_html( |
|
os.path.join( |
|
self.dl_dir, "description_topics.html" |
|
) |
|
) |
|
print( |
|
f"Saved interactive topic visualization to {os.path.join(self.dl_dir, 'description_topics.html')}" |
|
) |
|
|
|
if hasattr( |
|
topic_model, "visualize_hierarchy" |
|
) and callable(topic_model.visualize_hierarchy): |
|
fig = topic_model.visualize_hierarchy() |
|
if fig is not None: |
|
fig.write_html( |
|
os.path.join( |
|
self.dl_dir, "topic_hierarchy.html" |
|
) |
|
) |
|
print( |
|
f"Saved topic hierarchy visualization to {os.path.join(self.dl_dir, 'topic_hierarchy.html')}" |
|
) |
|
|
|
except Exception as viz_error: |
|
print( |
|
f"Interactive visualizations failed: {str(viz_error)}" |
|
) |
|
print("Falling back to basic visualizations only.") |
|
else: |
|
print("No distinct topics found for visualization.") |
|
except Exception as e: |
|
print(f"Couldn't create topic visualizations: {str(e)}") |
|
print("This is non-critical - continuing with analysis.") |
|
else: |
|
print( |
|
"After stop word removal, no substantial description content remains for topic modeling." |
|
) |
|
else: |
|
print("No descriptions found for topic modeling.") |
|
|
|
except ImportError: |
|
print("BERTopic not available. Skipping description-based topic modeling.") |
|
print("You can install it with: pip install bertopic hdbscan") |
|
except Exception as e: |
|
print(f"Error during topic modeling: {str(e)}") |
|
|
|
# DESCRIPTION-ENHANCED DEEP CONTENT EMBEDDINGS |
|
print("\nGenerating deep content embeddings...") |
|
|
|
# Prepare content text with description emphasis |
|
print("Preparing content texts with description emphasis...") |
|
df_content_texts = self.prepare_content_text_with_description(df) |
|
|
|
# Create combined dataset with watchlist items |
|
combined_content_texts = df_content_texts |
|
|
|
if watchlist_df is not None and not watchlist_df.empty: |
|
# Prepare watchlist content text with description emphasis |
|
watchlist_content_texts = self.prepare_content_text_with_description( |
|
watchlist_df |
|
) |
|
|
|
# Combine with main content texts |
|
combined_content_texts = df_content_texts + watchlist_content_texts |
|
|
|
# Load pre-trained sentence transformer |
|
model_name = self.embeddings_model |
|
print(f"Loading Sentence Transformer model: {model_name}") |
|
st_model = SentenceTransformer(model_name) |
|
|
|
# Generate embeddings in batches |
|
batch_size = 32 |
|
all_embeddings = [] |
|
|
|
for i in range(0, len(combined_content_texts), batch_size): |
|
end_idx = min(i + batch_size, len(combined_content_texts)) |
|
print( |
|
f"Processing embeddings batch {i//batch_size + 1}/{len(combined_content_texts)//batch_size + 1}" |
|
) |
|
|
|
batch_texts = combined_content_texts[i:end_idx] |
|
batch_embeddings = st_model.encode( |
|
batch_texts, |
|
show_progress_bar=False, |
|
convert_to_tensor=True, |
|
normalize_embeddings=True, |
|
) # only for multilingual-e5-large-instruct , convert_to_tensor=True, normalize_embeddings=True |
|
all_embeddings.append(batch_embeddings) |
|
|
|
# Combine batches |
|
embeddings = np.vstack(all_embeddings) |
|
|
|
# Save embeddings for future use |
|
np.save( |
|
os.path.join(self.dl_dir, "description_enhanced_embeddings.npy"), embeddings |
|
) |
|
|
|
# DESCRIPTION-ENHANCED RECOMMENDATION SYSTEM |
|
print("\nCreating description-enhanced recommendation system...") |
|
|
|
# Get indices for rated items and watchlist items |
|
n_rated = len(df) |
|
rated_indices = list(range(n_rated)) |
|
watchlist_indices = ( |
|
list(range(n_rated, len(combined_content_texts))) |
|
if watchlist_df is not None and not watchlist_df.empty |
|
else [] |
|
) |
|
|
|
# Create user preference profile from highly-rated items |
|
highly_rated = df[df["rating"] >= self.high_rating_threshold] |
|
if len(highly_rated) > 0: |
|
highly_rated_indices = highly_rated.index.tolist() |
|
user_profile = np.mean(embeddings[highly_rated_indices], axis=0) |
|
else: |
|
# Use all rated items as fallback |
|
user_profile = np.mean(embeddings[:n_rated], axis=0) |
|
|
|
# Compute similarity between user profile and all content |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
user_profile_reshaped = user_profile.reshape(1, -1) |
|
similarities = cosine_similarity(user_profile_reshaped, embeddings).flatten() |
|
|
|
# Process watchlist items |
|
recommendations = [] |
|
rated_titles = set(df["title"]) |
|
|
|
if watchlist_df is not None and not watchlist_df.empty: |
|
for i, title in enumerate(watchlist_df["title"]): |
|
if title not in rated_titles: |
|
idx = watchlist_indices[i] |
|
|
|
# Base score from semantic similarity |
|
semantic_score = similarities[idx] |
|
|
|
# Initialize component scores |
|
component_scores = { |
|
"semantic_similarity": semantic_score |
|
* feature_weights["content"] |
|
} |
|
|
|
# TMDB rating component |
|
if "vote_average" in watchlist_df.columns and pd.notna( |
|
watchlist_df["vote_average"].iloc[i] |
|
): |
|
vote_score = watchlist_df["vote_average"].iloc[i] / 10 |
|
component_scores["vote_average"] = ( |
|
vote_score * feature_weights["vote_average"] |
|
) |
|
else: |
|
component_scores["vote_average"] = 0 |
|
|
|
# Year recency component |
|
year_score = 0 |
|
if "year" in watchlist_df.columns and pd.notna( |
|
watchlist_df["year"].iloc[i] |
|
): |
|
current_year = datetime.now().year |
|
years_old = current_year - watchlist_df["year"].iloc[i] |
|
year_score = max( |
|
0, 1 - (years_old / 50) |
|
) # Linear decay over 50 years |
|
component_scores["year"] = year_score * feature_weights["year"] |
|
elif "release_date" in watchlist_df.columns and pd.notna( |
|
watchlist_df["release_date"].iloc[i] |
|
): |
|
try: |
|
release_year = pd.to_datetime( |
|
watchlist_df["release_date"].iloc[i] |
|
).year |
|
current_year = datetime.now().year |
|
years_old = current_year - release_year |
|
year_score = max(0, 1 - (years_old / 50)) |
|
component_scores["year"] = ( |
|
year_score * feature_weights["year"] |
|
) |
|
except: |
|
component_scores["year"] = 0 |
|
else: |
|
component_scores["year"] = 0 |
|
|
|
# Popularity component |
|
if "popularity" in watchlist_df.columns and pd.notna( |
|
watchlist_df["popularity"].iloc[i] |
|
): |
|
# Normalize popularity (assuming max of 100, adjust as needed) |
|
pop_score = min(1, watchlist_df["popularity"].iloc[i] / 100) |
|
component_scores["popularity"] = ( |
|
pop_score * feature_weights["popularity"] |
|
) |
|
else: |
|
component_scores["popularity"] = 0 |
|
|
|
# Runtime component |
|
if "runtime" in watchlist_df.columns and pd.notna( |
|
watchlist_df["runtime"].iloc[i] |
|
): |
|
# Prefer mid-length content (90-150 minutes) |
|
runtime = watchlist_df["runtime"].iloc[i] |
|
if 90 <= runtime <= 150: |
|
runtime_score = 1.0 |
|
elif runtime < 90: |
|
runtime_score = runtime / 90 |
|
else: # runtime > 150 |
|
runtime_score = max( |
|
0, 1 - (runtime - 150) / 120 |
|
) # Linear decay after 150 mins |
|
component_scores["runtime"] = ( |
|
runtime_score * feature_weights["runtime"] |
|
) |
|
else: |
|
component_scores["runtime"] = 0 |
|
|
|
# Get description for this item |
|
description = ( |
|
watchlist_df["overview"].iloc[i] |
|
if "overview" in watchlist_df.columns |
|
and pd.notna(watchlist_df["overview"].iloc[i]) |
|
else "" |
|
) |
|
|
|
# Calculate final score |
|
final_score = sum(component_scores.values()) |
|
|
|
# Add to recommendations |
|
recommendations.append( |
|
{ |
|
"title": title, |
|
"score": final_score, |
|
"status": "In your watchlist", |
|
"components": component_scores, |
|
"description": ( |
|
description[:200] + "..." |
|
if len(description) > 200 |
|
else description |
|
), |
|
} |
|
) |
|
|
|
# Sort recommendations by score |
|
recommendations.sort(key=lambda x: x["score"], reverse=True) |
|
|
|
# Create recommendation DataFrame |
|
if recommendations: |
|
rec_data = [] |
|
for rec in recommendations: |
|
rec_data.append( |
|
[ |
|
rec["title"], |
|
rec["score"], |
|
rec["status"], |
|
rec["components"].get("semantic_similarity", 0), |
|
rec["components"].get("vote_average", 0), |
|
rec["components"].get("year", 0), |
|
rec["components"].get("popularity", 0), |
|
rec["components"].get("runtime", 0), |
|
rec["description"], |
|
] |
|
) |
|
|
|
rec_df = pd.DataFrame( |
|
rec_data, |
|
columns=[ |
|
"Title", |
|
"Score", |
|
"Status", |
|
"Semantic Similarity", |
|
"TMDB Rating", |
|
"Year Recency", |
|
"Popularity", |
|
"Runtime", |
|
"Description", |
|
], |
|
) |
|
|
|
# Save recommendations with descriptions |
|
rec_df.to_csv( |
|
os.path.join( |
|
self.dl_dir, "deep_recommendations_with_descriptions.csv" |
|
), |
|
index=False, |
|
) |
|
|
|
# Visualize top recommendations |
|
top_n = min(10, len(rec_df)) |
|
top_recs = rec_df.head(top_n) |
|
|
|
plt.figure(figsize=(14, 10)) |
|
|
|
# Create stacked bar chart for component visualization |
|
components = top_recs[ |
|
[ |
|
"Semantic Similarity", |
|
"TMDB Rating", |
|
"Year Recency", |
|
"Popularity", |
|
"Runtime", |
|
] |
|
] |
|
components = components.set_index(top_recs["Title"]) |
|
|
|
ax = components.plot( |
|
kind="barh", |
|
stacked=True, |
|
figsize=(14, 10), |
|
colormap="viridis", |
|
width=0.7, |
|
) |
|
|
|
plt.title( |
|
"Description-Enhanced Recommendations - Score Components", |
|
fontsize=16, |
|
) |
|
plt.xlabel("Score Contribution", fontsize=14) |
|
plt.ylabel("Title", fontsize=14) |
|
plt.legend(title="Components", title_fontsize=12) |
|
plt.tight_layout() |
|
plt.savefig( |
|
os.path.join(self.dl_dir, "deep_recommendations_components.png") |
|
) |
|
|
|
# Print top recommendations with descriptions |
|
print("\nTop 5 Deep Learning Recommendations:") |
|
for i, (_, row) in enumerate(top_recs.head(5).iterrows()): |
|
print(f"{i+1}. {row['Title']} (Score: {row['Score']:.2f})") |
|
print( |
|
f" Semantic Match: {row['Semantic Similarity']:.2f}, TMDB: {row['TMDB Rating']:.2f}, Year: {row['Year Recency']:.2f}" |
|
) |
|
if row["Description"]: |
|
print(f" Description: {row['Description']}") |
|
|
|
# Similarity matrix between recommendations |
|
if len(top_recs) > 1: |
|
print("\nGenerating similarity matrix between recommendations...") |
|
top_indices = [ |
|
watchlist_indices[i] |
|
for i in range(len(watchlist_df)) |
|
if watchlist_df["title"].iloc[i] in top_recs["Title"].values |
|
] |
|
|
|
if top_indices: |
|
# Extract embeddings for top recommendations |
|
top_embeddings = embeddings[top_indices] |
|
|
|
# Calculate similarity matrix |
|
rec_similarities = cosine_similarity(top_embeddings) |
|
|
|
# Create heatmap of recommendation similarities |
|
plt.figure(figsize=(12, 10)) |
|
sns.heatmap( |
|
rec_similarities, |
|
annot=True, |
|
fmt=".2f", |
|
cmap="YlGnBu", |
|
xticklabels=top_recs["Title"][: len(top_indices)], |
|
yticklabels=top_recs["Title"][: len(top_indices)], |
|
) |
|
plt.title( |
|
"Content Similarity Between Recommendations", fontsize=16 |
|
) |
|
plt.tight_layout() |
|
plt.savefig( |
|
os.path.join( |
|
self.dl_dir, "recommendation_similarity_matrix.png" |
|
) |
|
) |
|
else: |
|
print("No recommendations generated from watchlist.") |
|
rec_df = pd.DataFrame() |
|
else: |
|
print("No watchlist items found for recommendations.") |
|
rec_df = pd.DataFrame() |
|
|
|
# Visualize content embeddings with dimensionality reduction |
|
print("\nVisualizing content embeddings...") |
|
|
|
# Use t-SNE for dimensionality reduction |
|
from sklearn.manifold import TSNE |
|
|
|
# Apply t-SNE to rated content only |
|
rated_embeddings = embeddings[:n_rated] |
|
|
|
# Determine good perplexity value based on data size |
|
perplexity = min(30, max(5, len(rated_embeddings) // 10)) |
|
|
|
tsne = TSNE(n_components=2, random_state=42, perplexity=perplexity) |
|
embeddings_2d = tsne.fit_transform(rated_embeddings) |
|
|
|
# Create scatter plot colored by rating |
|
plt.figure(figsize=(12, 10)) |
|
scatter = plt.scatter( |
|
embeddings_2d[:, 0], |
|
embeddings_2d[:, 1], |
|
c=df["rating"], |
|
cmap="viridis", |
|
s=100, |
|
alpha=0.7, |
|
) |
|
|
|
# Add color bar |
|
cbar = plt.colorbar(scatter) |
|
cbar.set_label("Your Rating", fontsize=12) |
|
|
|
# Label some points for context |
|
np.random.seed(42) |
|
indices_to_label = np.random.choice( |
|
range(len(df)), size=min(10, len(df)), replace=False |
|
) |
|
|
|
for idx in indices_to_label: |
|
plt.annotate( |
|
df["title"].iloc[idx], |
|
(embeddings_2d[idx, 0], embeddings_2d[idx, 1]), |
|
fontsize=9, |
|
) |
|
|
|
plt.title("Description-Enhanced Content Embeddings (t-SNE)", fontsize=16) |
|
plt.savefig( |
|
os.path.join(self.dl_dir, "description_enhanced_embeddings_tsne.png") |
|
) |
|
|
|
print(f"\nDeep learning analysis completed. Results saved to {self.dl_dir}") |
|
return rec_df |
|
|
|
def run_analysis( |
|
self, |
|
use_existing_data=False, |
|
use_existing_watchlist=False, |
|
feature_weights=None, |
|
): |
|
"""Run the complete analysis pipeline.""" |
|
watchlist_df = None |
|
|
|
# Check if we should use existing data |
|
if use_existing_data and os.path.exists(self.ratings_csv): |
|
print(f"Using existing data from {self.ratings_csv}") |
|
df = pd.read_csv(self.ratings_csv) |
|
|
|
# Check for existing watchlist data |
|
watchlist_path = os.path.join(self.output_dir, "tmdb_watchlist.csv") |
|
if use_existing_watchlist and os.path.exists(watchlist_path): |
|
print(f"Using existing watchlist data") |
|
watchlist_df = pd.read_csv(watchlist_path) |
|
print(f"Loaded {len(watchlist_df)} items from your watchlist") |
|
else: |
|
# Set up authentication |
|
print("Setting up TMDB authentication...") |
|
session_id, account_id = self.setup_authentication() |
|
|
|
# Get ratings |
|
print("Fetching your TMDB ratings...") |
|
movie_ratings = self.get_ratings("movies", account_id, session_id) |
|
tv_ratings = self.get_ratings("tv", account_id, session_id) |
|
|
|
print( |
|
f"Found {len(movie_ratings)} rated movies and {len(tv_ratings)} rated TV shows" |
|
) |
|
|
|
# Save raw ratings data in case of interruption |
|
with open(os.path.join(self.output_dir, "raw_ratings.json"), "w") as f: |
|
json.dump({"movies": movie_ratings, "tv": tv_ratings}, f) |
|
|
|
# Get watchlist |
|
print("\nFetching your TMDB watchlist...") |
|
watchlist = self.get_watchlist(account_id, session_id) |
|
|
|
print( |
|
f"Found {len(watchlist['movies'])} movies and {len(watchlist['tv'])} TV shows in your watchlist" |
|
) |
|
|
|
# Save raw watchlist data |
|
with open(os.path.join(self.output_dir, "raw_watchlist.json"), "w") as f: |
|
json.dump(watchlist, f) |
|
|
|
# Process watchlist items |
|
watchlist_metadata = self.process_watchlist(watchlist) |
|
|
|
# Create watchlist DataFrame and save to CSV |
|
if watchlist_metadata: |
|
watchlist_df = pd.DataFrame(watchlist_metadata) |
|
watchlist_df.to_csv( |
|
os.path.join(self.output_dir, "tmdb_watchlist.csv"), |
|
index=False, |
|
encoding="utf-8", |
|
) |
|
print( |
|
f"Watchlist with {len(watchlist_df)} items saved to {os.path.join(self.output_dir, 'tmdb_watchlist.csv')}" |
|
) |
|
else: |
|
watchlist_df = pd.DataFrame() |
|
print("No items found in your watchlist.") |
|
|
|
# Process all rated items |
|
all_metadata = [] |
|
|
|
print("Fetching additional metadata for movies...") |
|
for i, movie in enumerate(movie_ratings): |
|
print( |
|
f"Processing movie {i+1}/{len(movie_ratings)}: {movie.get('title', movie.get('name', 'Unknown'))}" |
|
) |
|
details = self.get_details("movie", movie["id"]) |
|
if details: |
|
metadata = self.extract_metadata(movie, details, "movies") |
|
all_metadata.append(metadata) |
|
# Save progress after each batch of 10 items |
|
if (i + 1) % 10 == 0: |
|
temp_df = pd.DataFrame(all_metadata) |
|
temp_df.to_csv( |
|
os.path.join(self.output_dir, "progress_data.csv"), |
|
index=False, |
|
encoding="utf-8", |
|
) |
|
|
|
print("Fetching additional metadata for TV shows...") |
|
for i, tv in enumerate(tv_ratings): |
|
print( |
|
f"Processing TV show {i+1}/{len(tv_ratings)}: {tv.get('name', tv.get('title', 'Unknown'))}" |
|
) |
|
details = self.get_details("tv", tv["id"]) |
|
if details: |
|
metadata = self.extract_metadata(tv, details, "tv") |
|
all_metadata.append(metadata) |
|
# Save progress after each batch of 10 items |
|
if (i + 1) % 10 == 0: |
|
temp_df = pd.DataFrame(all_metadata) |
|
temp_df.to_csv( |
|
os.path.join(self.output_dir, "progress_data.csv"), |
|
index=False, |
|
encoding="utf-8", |
|
) |
|
|
|
# Create DataFrame and save to CSV |
|
df = pd.DataFrame(all_metadata) |
|
df.to_csv(self.ratings_csv, index=False, encoding="utf-8") |
|
print(f"Ratings with metadata saved to {self.ratings_csv}") |
|
|
|
# Generate statistics |
|
print("Generating basic statistics...") |
|
stats = self.generate_statistics(df) |
|
|
|
# Display statistics |
|
self.display_statistics(stats, df) |
|
|
|
# Save statistics to CSV |
|
self.save_statistics_to_csv(stats, self.stats_csv) |
|
|
|
# Find rating anomalies |
|
anomalies = self.find_rating_anomalies(df) |
|
|
|
# Analyze rating trends over time |
|
trend_analysis = self.analyze_rating_trends(df) |
|
|
|
# Run deep learning analysis |
|
recommendations = self.run_deep_learning_analyses( |
|
df, watchlist_df, feature_weights |
|
) |
|
|
|
print("\nAnalysis complete!") |
|
return df, watchlist_df, recommendations |
|
|
|
|
|
if __name__ == "__main__": |
|
# Hard-coded configuration parameters |
|
config = { |
|
"api_key": os.getenv("TMDB_API_KEY"), # Use from .env file |
|
"base_url": "https://api.themoviedb.org/3", # TMDB API base URL |
|
"output_dir": "tmdb_analysis", # Output directory |
|
"max_requests": 50, # Rate limiting - requests per window |
|
"request_window": 10, # Rate limiting - window in seconds |
|
"top_n_companies": 25, # Top N production companies to analyze |
|
"top_n_directors": 25, # Top N directors to analyze |
|
"top_n_actors": 30, # Top N actors to analyze |
|
"high_rating_threshold": 8.0, # Threshold for high ratings |
|
"embeddings_model": "intfloat/multilingual-e5-large-instruct", # Sentence transformer model for embeddings |
|
} |
|
|
|
# ADJUSTED FEATURE WEIGHTS TO EMPHASIZE DESCRIPTION-BASED SIMILARITY |
|
feature_weights = { |
|
"vote_average": 0.20, # TMDB rating |
|
"year": 0.15, # Release year recency |
|
"popularity": 0.10, # Popularity |
|
"runtime": 0.05, # Runtime |
|
"content": 0.50, # Content similarity (significantly increased) |
|
} |
|
|
|
# Initialize analyzer with our configuration |
|
analyzer = TMDBAnalyzer( |
|
api_key=config["api_key"], |
|
base_url=config["base_url"], |
|
output_dir=config["output_dir"], |
|
max_requests=config["max_requests"], |
|
request_window=config["request_window"], |
|
top_n_companies=config["top_n_companies"], |
|
top_n_directors=config["top_n_directors"], |
|
top_n_actors=config["top_n_actors"], |
|
high_rating_threshold=config["high_rating_threshold"], |
|
embeddings_model=config["embeddings_model"], |
|
) |
|
|
|
# Check for existing data |
|
data_exists = os.path.exists( |
|
os.path.join(config["output_dir"], "tmdb_ratings_with_metadata.csv") |
|
) |
|
watchlist_exists = os.path.exists( |
|
os.path.join(config["output_dir"], "tmdb_watchlist.csv") |
|
) |
|
|
|
if data_exists: |
|
print(f"Found existing data in {config['output_dir']}") |
|
use_existing = ( |
|
input("Would you like to use existing data? (y/n): ").strip().lower() |
|
) |
|
use_existing_data = use_existing == "y" |
|
|
|
if use_existing_data and watchlist_exists: |
|
use_existing_watchlist = ( |
|
input("Would you like to use existing watchlist data? (y/n): ") |
|
.strip() |
|
.lower() |
|
== "y" |
|
) |
|
else: |
|
use_existing_watchlist = False |
|
else: |
|
use_existing_data = False |
|
use_existing_watchlist = False |
|
|
|
# Run the analysis |
|
df, watchlist_df, recommendations = analyzer.run_analysis( |
|
use_existing_data=use_existing_data, |
|
use_existing_watchlist=use_existing_watchlist, |
|
feature_weights=feature_weights, |
|
) |