Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save denis-kasak/484756627cf021a033aaa9cdd612a380 to your computer and use it in GitHub Desktop.
Save denis-kasak/484756627cf021a033aaa9cdd612a380 to your computer and use it in GitHub Desktop.
TMDB Profile Analysis and Recommendations

Hello World,

this is a huge python script that:

  1. imports your ratings and your watchlist from The Movie Database,
  2. creates a few simple statistics and
  3. uses some Deep Learning to recommend movies and tv shows from your watchlist based on your previous ratings and media metadata, e.g. director, actors, production company.

The script requires a TMDB api key stored in a .env file as TMDB_API_KEY. Overall, I would say that the recommendations and the given visualizations are pretty decent.

I would also like to tell that I have generated this behemoth of a script cooperatively with Claude-Sonnet-3.7. It took a few hours...

The reason that I am publishing this script, and have created it in the first place, is that I couldn't find anything similar online.

Anyways, have fun with this script. Do with it what you want. I don't need any attribution. Also, please be careful with your TMDB api key and your data.

Bye ^^

import os
import csv
import json
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from datetime import datetime
import time
import webbrowser
import dotenv
import warnings
# Suppress warnings
warnings.filterwarnings("ignore")
# Set better visual style
sns.set_style("whitegrid")
plt.rcParams["figure.figsize"] = (12, 8)
plt.rcParams["font.size"] = 12
# Load environment variables from .env file
dotenv.load_dotenv()
class TMDBAnalyzer:
def __init__(
self,
api_key,
base_url,
output_dir,
max_requests,
request_window,
top_n_companies,
top_n_directors,
top_n_actors,
high_rating_threshold,
embeddings_model,
):
# API Configuration
self.api_key = api_key
self.base_url = base_url
# Rate limiting configuration
self.max_requests = max_requests
self.request_window = request_window
self.request_timestamps = []
# Directory structure
self.output_dir = output_dir
self.dl_dir = os.path.join(output_dir, "deep_learning")
os.makedirs(output_dir, exist_ok=True)
os.makedirs(self.dl_dir, exist_ok=True)
# File paths
self.ratings_csv = os.path.join(output_dir, "tmdb_ratings_with_metadata.csv")
self.stats_csv = os.path.join(output_dir, "tmdb_stats.csv")
self.auth_file = os.path.join(output_dir, "auth_data.json")
# Analysis parameters
self.top_n_companies = top_n_companies
self.top_n_directors = top_n_directors
self.top_n_actors = top_n_actors
self.high_rating_threshold = high_rating_threshold
self.embeddings_model = embeddings_model
def make_api_request(self, url, params=None, method="GET", data=None):
"""Make an API request with rate limiting."""
# Check if we need to throttle requests
current_time = time.time()
self.request_timestamps = [
t for t in self.request_timestamps if current_time - t < self.request_window
]
if len(self.request_timestamps) >= self.max_requests:
# We've hit the rate limit, wait until the oldest request falls out of the window
sleep_time = (
self.request_window - (current_time - self.request_timestamps[0]) + 0.1
)
print(f"Rate limit reached. Waiting {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
# Update current time after waiting
current_time = time.time()
# Clean up timestamps again
self.request_timestamps = [
t
for t in self.request_timestamps
if current_time - t < self.request_window
]
# Add current request timestamp
self.request_timestamps.append(current_time)
# Make the request
if method.upper() == "GET":
response = requests.get(url, params=params)
elif method.upper() == "POST":
response = requests.post(url, params=params, json=data)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
# Handle potential 429 Too Many Requests
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 1))
print(f"Rate limit exceeded. Retrying after {retry_after} seconds...")
time.sleep(retry_after)
# Try the request again
return self.make_api_request(url, params, method, data)
return response
def setup_authentication(self):
"""Set up authentication and get session ID and account ID."""
# Check if we already have auth data
if os.path.exists(self.auth_file):
try:
with open(self.auth_file, "r") as f:
auth_data = json.load(f)
print("Found existing authentication data.")
# Verify the session is still valid
account_url = f"{self.base_url}/account"
params = {
"api_key": self.api_key,
"session_id": auth_data["session_id"],
}
response = self.make_api_request(account_url, params)
if response.status_code == 200:
print("Session is valid.")
return auth_data["session_id"], auth_data["account_id"]
else:
print("Session has expired. Creating a new one...")
except Exception as e:
print(f"Error reading auth file: {e}")
print("Creating new authentication...")
# Step 1: Create a request token
token_url = f"{self.base_url}/authentication/token/new"
params = {"api_key": self.api_key}
response = self.make_api_request(token_url, params)
if response.status_code != 200:
raise Exception(
f"Failed to get request token: {response.status_code} - {response.text}"
)
token_data = response.json()
request_token = token_data["request_token"]
# Step 2: Get the user to approve the request token
auth_url = f"https://www.themoviedb.org/authenticate/{request_token}"
print(f"\nPlease open this URL in your browser and approve the request:")
print(auth_url)
# Try to open the browser automatically
webbrowser.open(auth_url)
input("\nAfter approving, press Enter to continue...\n")
# Step 3: Create a session ID with the approved request token
session_url = f"{self.base_url}/authentication/session/new"
params = {"api_key": self.api_key}
data = {"request_token": request_token}
response = self.make_api_request(session_url, params, method="POST", data=data)
if response.status_code != 200:
raise Exception(
f"Failed to create session: {response.status_code} - {response.text}"
)
session_data = response.json()
session_id = session_data["session_id"]
# Step 4: Get account details
account_url = f"{self.base_url}/account"
params = {"api_key": self.api_key, "session_id": session_id}
response = self.make_api_request(account_url, params)
if response.status_code != 200:
raise Exception(
f"Failed to get account details: {response.status_code} - {response.text}"
)
account_data = response.json()
account_id = account_data["id"]
# Save the authentication data
auth_data = {"session_id": session_id, "account_id": account_id}
with open(self.auth_file, "w") as f:
json.dump(auth_data, f)
print(f"Authentication successful! Account ID: {account_id}")
return session_id, account_id
def get_ratings(self, media_type, account_id, session_id):
"""Get rated movies or TV shows."""
all_ratings = []
page = 1
total_pages = 1
while page <= total_pages:
url = f"{self.base_url}/account/{account_id}/rated/{media_type}"
params = {
"api_key": self.api_key,
"session_id": session_id,
"language": "en-US",
"sort_by": "created_at.desc",
"page": page,
}
response = self.make_api_request(url, params)
if response.status_code == 200:
data = response.json()
all_ratings.extend(data["results"])
total_pages = data["total_pages"]
page += 1
else:
print(
f"Error fetching {media_type} ratings page {page}: {response.status_code}"
)
break
return all_ratings
def get_watchlist(self, account_id, session_id):
"""Get movies and TV shows from user's watchlist."""
movie_watchlist = []
tv_watchlist = []
# Fetch movies in watchlist
print("Fetching movies in watchlist...")
page = 1
total_pages = 1
while page <= total_pages:
url = f"{self.base_url}/account/{account_id}/watchlist/movies"
params = {
"api_key": self.api_key,
"session_id": session_id,
"language": "en-US",
"sort_by": "created_at.desc",
"page": page,
}
response = self.make_api_request(url, params)
if response.status_code == 200:
data = response.json()
movie_watchlist.extend(data["results"])
total_pages = data["total_pages"]
page += 1
else:
print(
f"Error fetching movie watchlist page {page}: {response.status_code}"
)
break
# Fetch TV shows in watchlist
print("Fetching TV shows in watchlist...")
page = 1
total_pages = 1
while page <= total_pages:
url = f"{self.base_url}/account/{account_id}/watchlist/tv"
params = {
"api_key": self.api_key,
"session_id": session_id,
"language": "en-US",
"sort_by": "created_at.desc",
"page": page,
}
response = self.make_api_request(url, params)
if response.status_code == 200:
data = response.json()
tv_watchlist.extend(data["results"])
total_pages = data["total_pages"]
page += 1
else:
print(
f"Error fetching TV watchlist page {page}: {response.status_code}"
)
break
return {"movies": movie_watchlist, "tv": tv_watchlist}
def get_details(self, media_type, item_id):
"""Get additional details for a movie or TV show."""
url = f"{self.base_url}/{media_type}/{item_id}"
params = {
"api_key": self.api_key,
"language": "en-US",
"append_to_response": "credits,keywords,release_dates",
}
response = self.make_api_request(url, params)
if response.status_code == 200:
return response.json()
else:
print(
f"Error fetching details for {media_type} ID {item_id}: {response.status_code}"
)
return None
def extract_metadata(self, item, details, media_type):
"""Extract relevant metadata from API response."""
metadata = {
"id": item["id"],
"title": item.get("title", item.get("name", "Unknown")),
"media_type": media_type,
"rating": item["rating"],
"rated_at": item.get("rated_at", "Unknown"),
"release_date": details.get(
"release_date", details.get("first_air_date", "Unknown")
),
"genres": ", ".join([genre["name"] for genre in details.get("genres", [])]),
"runtime": details.get(
"runtime",
(
details.get("episode_run_time", [0])[0]
if details.get("episode_run_time")
else 0
),
),
"vote_average": details.get("vote_average", 0),
"vote_count": details.get("vote_count", 0),
"popularity": details.get("popularity", 0),
"production_companies": ", ".join(
[company["name"] for company in details.get("production_companies", [])]
),
"original_language": details.get("original_language", "Unknown"),
"overview": details.get("overview", ""),
}
# Get director(s) for movies or creators for TV shows
if media_type == "movies":
directors = [
crew["name"]
for crew in details.get("credits", {}).get("crew", [])
if crew["job"] == "Director"
]
metadata["director"] = ", ".join(directors)
else:
creators = [creator["name"] for creator in details.get("created_by", [])]
metadata["creator"] = ", ".join(creators)
# Get top cast
cast = details.get("credits", {}).get("cast", [])
top_cast = [actor["name"] for actor in cast[:20]] # Increased from 5 to 20
metadata["top_cast"] = ", ".join(top_cast)
# Get keywords
keywords = (
details.get("keywords", {}).get("keywords", [])
if media_type == "movies"
else details.get("keywords", {}).get("results", [])
)
metadata["keywords"] = ", ".join([kw["name"] for kw in keywords])
return metadata
def extract_watchlist_metadata(self, item, details, media_type):
"""Extract metadata for watchlist items."""
metadata = {
"id": item["id"],
"title": item.get("title", item.get("name", "Unknown")),
"media_type": media_type,
"in_watchlist": True,
"release_date": details.get(
"release_date", details.get("first_air_date", "Unknown")
),
"genres": ", ".join([genre["name"] for genre in details.get("genres", [])]),
"runtime": details.get(
"runtime",
(
details.get("episode_run_time", [0])[0]
if details.get("episode_run_time")
else 0
),
),
"vote_average": details.get("vote_average", 0),
"vote_count": details.get("vote_count", 0),
"popularity": details.get("popularity", 0),
"production_companies": ", ".join(
[company["name"] for company in details.get("production_companies", [])]
),
"original_language": details.get("original_language", "Unknown"),
"overview": details.get("overview", ""),
}
# Get director(s) for movies or creators for TV shows
if media_type == "movies":
directors = [
crew["name"]
for crew in details.get("credits", {}).get("crew", [])
if crew["job"] == "Director"
]
metadata["director"] = ", ".join(directors)
else:
creators = [creator["name"] for creator in details.get("created_by", [])]
metadata["creator"] = ", ".join(creators)
# Get top cast
cast = details.get("credits", {}).get("cast", [])
top_cast = [actor["name"] for actor in cast[:20]] # Increased from 5 to 20
metadata["top_cast"] = ", ".join(top_cast)
# Get keywords
keywords = (
details.get("keywords", {}).get("keywords", [])
if media_type == "movies"
else details.get("keywords", {}).get("results", [])
)
metadata["keywords"] = ", ".join([kw["name"] for kw in keywords])
return metadata
def process_watchlist(self, watchlist):
"""Process watchlist items to get full details."""
watchlist_metadata = []
print("Fetching additional metadata for movies in watchlist...")
for i, movie in enumerate(watchlist["movies"]):
print(
f"Processing watchlist movie {i+1}/{len(watchlist['movies'])}: {movie.get('title', movie.get('name', 'Unknown'))}"
)
details = self.get_details("movie", movie["id"])
if details:
metadata = self.extract_watchlist_metadata(movie, details, "movies")
watchlist_metadata.append(metadata)
print("Fetching additional metadata for TV shows in watchlist...")
for i, tv in enumerate(watchlist["tv"]):
print(
f"Processing watchlist TV show {i+1}/{len(watchlist['tv'])}: {tv.get('name', tv.get('title', 'Unknown'))}"
)
details = self.get_details("tv", tv["id"])
if details:
metadata = self.extract_watchlist_metadata(tv, details, "tv")
watchlist_metadata.append(metadata)
return watchlist_metadata
def generate_statistics(self, data_df):
"""Generate statistics from the data."""
stats = []
# Overall statistics
stats.append(("Total Rated Items", len(data_df)))
stats.append(("Average Rating", round(data_df["rating"].mean(), 2)))
stats.append(("Median Rating", data_df["rating"].median()))
# Ratings distribution
rating_counts = data_df["rating"].value_counts().sort_index()
stats.append(("Rating Distribution", dict(rating_counts)))
# Media type breakdown
media_counts = data_df["media_type"].value_counts()
stats.append(("Media Type Counts", dict(media_counts)))
# Top genres
all_genres = []
for genres_str in data_df["genres"]:
genres = [genre.strip() for genre in genres_str.split(",")]
all_genres.extend(genres)
genre_counts = Counter(all_genres)
top_genres = dict(genre_counts.most_common(15)) # Increased from 10 to 15
stats.append(("Top 15 Genres", top_genres))
# Average rating by genre
genre_ratings = {}
for genre in set(all_genres):
genre_mask = data_df["genres"].str.contains(genre)
avg_rating = round(data_df.loc[genre_mask, "rating"].mean(), 2)
genre_ratings[genre] = avg_rating
stats.append(
(
"Average Rating by Genre",
dict(sorted(genre_ratings.items(), key=lambda x: x[1], reverse=True)),
)
)
# Top directors/creators
director_col = "director" if "director" in data_df.columns else "creator"
all_directors = []
for directors_str in data_df[director_col].dropna():
directors = [director.strip() for director in directors_str.split(",")]
all_directors.extend(directors)
director_counts = Counter(all_directors)
top_directors = dict(
director_counts.most_common(self.top_n_directors)
) # Using parameter
stats.append(
(f"Top {self.top_n_directors} {director_col.capitalize()}s", top_directors)
)
# Top production companies
all_companies = []
for companies_str in data_df["production_companies"].dropna():
companies = [company.strip() for company in companies_str.split(",")]
all_companies.extend(companies)
company_counts = Counter(all_companies)
top_companies = dict(
company_counts.most_common(self.top_n_companies)
) # Using parameter
stats.append(
(f"Top {self.top_n_companies} Production Companies", top_companies)
)
# Average rating by production company
company_ratings = {}
for company in dict(company_counts.most_common(self.top_n_companies)):
company_mask = data_df["production_companies"].str.contains(
company, na=False
)
if company_mask.sum() > 0: # Make sure we have matches
avg_rating = round(data_df.loc[company_mask, "rating"].mean(), 2)
company_ratings[company] = avg_rating
stats.append(
(
"Average Rating by Top Production Companies",
dict(sorted(company_ratings.items(), key=lambda x: x[1], reverse=True)),
)
)
# Top actors
all_actors = []
for cast_str in data_df["top_cast"].dropna():
actors = [actor.strip() for actor in cast_str.split(",")]
all_actors.extend(actors)
actor_counts = Counter(all_actors)
top_actors = dict(
actor_counts.most_common(self.top_n_actors)
) # Using parameter
stats.append((f"Top {self.top_n_actors} Actors", top_actors))
# Average rating by top actors
actor_ratings = {}
for actor in dict(actor_counts.most_common(self.top_n_actors)):
actor_mask = data_df["top_cast"].str.contains(actor, na=False)
if actor_mask.sum() > 0: # Make sure we have matches
avg_rating = round(data_df.loc[actor_mask, "rating"].mean(), 2)
num_appearances = actor_mask.sum()
actor_ratings[f"{actor} ({num_appearances})"] = avg_rating
stats.append(
(
"Average Rating by Top Actors",
dict(sorted(actor_ratings.items(), key=lambda x: x[1], reverse=True)),
)
)
# Ratings by year
data_df["year"] = pd.to_datetime(
data_df["release_date"], errors="coerce"
).dt.year
year_ratings = data_df.groupby("year")["rating"].mean().round(2)
stats.append(("Average Rating by Year", dict(year_ratings.sort_index())))
# Your rating vs TMDB rating
data_df["rating_diff"] = data_df["rating"] - data_df["vote_average"]
avg_diff = round(data_df["rating_diff"].mean(), 2)
stats.append(("Average Difference from TMDB Rating", avg_diff))
return stats
def display_statistics(self, stats_list, df):
"""Display statistics and create visualizations."""
print("\n===== TMDB RATINGS ANALYSIS =====\n")
for title, data in stats_list:
print(f"\n----- {title} -----")
if isinstance(data, dict):
for key, value in data.items():
print(f"{key}: {value}")
else:
print(data)
# Create visualizations
# 1. Rating distribution
plt.figure(figsize=(10, 6))
sns.countplot(x="rating", data=df, palette="viridis")
plt.title("Distribution of Your Ratings", fontsize=16)
plt.xlabel("Rating", fontsize=14)
plt.ylabel("Number of Items", fontsize=14)
plt.tight_layout()
plt.savefig(os.path.join(self.output_dir, "rating_distribution.png"))
# 2. Rating by media type
plt.figure(figsize=(10, 6))
sns.barplot(x="media_type", y="rating", data=df, palette="Set2")
plt.title("Average Rating by Media Type", fontsize=16)
plt.xlabel("Media Type", fontsize=14)
plt.ylabel("Average Rating", fontsize=14)
plt.tight_layout()
plt.savefig(os.path.join(self.output_dir, "rating_by_media_type.png"))
# 3. Your ratings vs TMDB ratings
plt.figure(figsize=(10, 6))
plt.scatter(df["vote_average"], df["rating"], alpha=0.6, s=50)
plt.plot([0, 10], [0, 10], "r--") # Diagonal line for reference
plt.title("Your Ratings vs TMDB Ratings", fontsize=16)
plt.xlabel("TMDB Rating", fontsize=14)
plt.ylabel("Your Rating", fontsize=14)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(self.output_dir, "your_vs_tmdb_ratings.png"))
# 4. Top production companies
plt.figure(figsize=(12, 8))
all_companies = []
for companies_str in df["production_companies"].dropna():
companies = [company.strip() for company in companies_str.split(",")]
all_companies.extend(companies)
company_counts = Counter(all_companies)
top_companies = dict(company_counts.most_common(self.top_n_companies))
company_df = pd.DataFrame(
{
"company": list(top_companies.keys()),
"count": list(top_companies.values()),
}
)
sns.barplot(x="count", y="company", data=company_df, palette="Reds_r")
plt.title(f"Top {self.top_n_companies} Production Companies", fontsize=16)
plt.xlabel("Number of Rated Titles", fontsize=14)
plt.ylabel("Production Company", fontsize=14)
plt.tight_layout()
plt.savefig(os.path.join(self.output_dir, "top_production_companies.png"))
# 5. Top actors
plt.figure(figsize=(12, 8))
all_actors = []
for cast_str in df["top_cast"].dropna():
actors = [actor.strip() for actor in cast_str.split(",")]
all_actors.extend(actors)
actor_counts = Counter(all_actors)
top_actors = dict(actor_counts.most_common(self.top_n_actors))
actor_df = pd.DataFrame(
{"actor": list(top_actors.keys()), "count": list(top_actors.values())}
)
sns.barplot(x="count", y="actor", data=actor_df, palette="Blues_r")
plt.title(f"Top {self.top_n_actors} Actors in Your Rated Titles", fontsize=16)
plt.xlabel("Number of Appearances", fontsize=14)
plt.ylabel("Actor", fontsize=14)
plt.tight_layout()
plt.savefig(os.path.join(self.output_dir, "top_actors.png"))
print(f"\nVisualizations saved to {self.output_dir} directory")
def save_statistics_to_csv(self, stats_list, filename):
"""Save statistics to CSV."""
with open(filename, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["Statistic", "Value"])
for title, data in stats_list:
if isinstance(data, dict):
for key, value in data.items():
writer.writerow([f"{title} - {key}", value])
else:
writer.writerow([title, data])
print(f"Statistics saved to {filename}")
def find_rating_anomalies(self, df):
"""Find titles that you rated very differently from the TMDB average."""
print("Identifying rating anomalies...")
# Calculate the difference between your rating and TMDB rating
df["rating_diff"] = df["rating"] - df["vote_average"]
# Find titles you liked much more than average (positive surprise)
positive_anomalies = df[df["rating_diff"] > 2].sort_values(
"rating_diff", ascending=False
)
# Find titles you liked much less than average (negative surprise)
negative_anomalies = df[df["rating_diff"] < -2].sort_values("rating_diff")
# Combine anomalies
anomalies = pd.concat(
[
positive_anomalies[
["title", "rating", "vote_average", "rating_diff", "genres"]
].head(10),
negative_anomalies[
["title", "rating", "vote_average", "rating_diff", "genres"]
].head(10),
]
)
# Save anomalies to CSV
anomalies.to_csv(
os.path.join(self.output_dir, "rating_anomalies.csv"), index=False
)
# Visualize rating anomalies
plt.figure(figsize=(12, 10))
anomalies_plot = pd.concat(
[
positive_anomalies[["title", "rating_diff"]].head(7),
negative_anomalies[["title", "rating_diff"]].head(7),
]
)
# Shorten long titles for display
anomalies_plot["title"] = anomalies_plot["title"].apply(
lambda x: x[:30] + "..." if len(x) > 30 else x
)
# Sort by rating difference for better visualization
anomalies_plot = anomalies_plot.sort_values("rating_diff")
# Plot with a colormap based on the rating difference
bars = sns.barplot(
x="rating_diff",
y="title",
data=anomalies_plot,
palette=sns.color_palette("RdBu_r", len(anomalies_plot)),
)
# Add labels to bars
for i, p in enumerate(bars.patches):
diff = anomalies_plot.iloc[i]["rating_diff"]
if p.get_width() < 0:
bars.text(
p.get_width() - 0.5,
p.get_y() + p.get_height() / 2,
f"{diff:.1f}",
ha="right",
va="center",
color="white",
fontweight="bold",
)
else:
bars.text(
p.get_width() + 0.1,
p.get_y() + p.get_height() / 2,
f"+{diff:.1f}",
ha="left",
va="center",
fontweight="bold",
)
plt.axvline(x=0, color="black", linestyle="-", alpha=0.3)
plt.title(
"Movies/Shows You Rated Very Differently Than TMDB Average", fontsize=16
)
plt.xlabel("Your Rating - TMDB Rating", fontsize=14)
plt.ylabel("Title", fontsize=14)
plt.tight_layout()
plt.savefig(os.path.join(self.output_dir, "rating_anomalies.png"))
return anomalies
def analyze_rating_trends(self, df):
"""Analyze how your ratings have changed over time."""
print("Analyzing your rating trends over time...")
# Make sure we have rated_at as datetime
if "rated_at" in df.columns:
df["rated_at"] = pd.to_datetime(df["rated_at"], errors="coerce")
# Extract components from the rating date
df["rate_year"] = df["rated_at"].dt.year
df["rate_month"] = df["rated_at"].dt.month
# Calculate average rating by month
monthly_ratings = (
df.groupby(["rate_year", "rate_month"])["rating"]
.agg(["mean", "count"])
.reset_index()
)
monthly_ratings["date"] = pd.to_datetime(
monthly_ratings["rate_year"].astype(str)
+ "-"
+ monthly_ratings["rate_month"].astype(str)
+ "-01"
)
monthly_ratings = monthly_ratings.sort_values("date")
# Calculate rolling average for smoother trend
monthly_ratings["rolling_avg"] = (
monthly_ratings["mean"].rolling(window=3, min_periods=1).mean()
)
# Save to CSV
monthly_ratings.to_csv(
os.path.join(self.output_dir, "rating_trends.csv"), index=False
)
# Plot rating trends
plt.figure(figsize=(14, 8))
# Plot average rating by month
ax1 = plt.subplot(111)
ax1.plot(
monthly_ratings["date"],
monthly_ratings["mean"],
marker="o",
linestyle="-",
color="#3498db",
alpha=0.7,
label="Monthly Average",
)
ax1.plot(
monthly_ratings["date"],
monthly_ratings["rolling_avg"],
linestyle="-",
color="#e74c3c",
linewidth=3,
label="3-Month Rolling Average",
)
# Plot number of ratings as bars
ax2 = ax1.twinx()
ax2.bar(
monthly_ratings["date"],
monthly_ratings["count"],
alpha=0.2,
color="gray",
label="Number of Ratings",
)
# Set labels and title
ax1.set_xlabel("Date", fontsize=14)
ax1.set_ylabel("Average Rating", fontsize=14)
ax2.set_ylabel("Number of Ratings", fontsize=14)
plt.title("Your Rating Trends Over Time", fontsize=16)
# Add legends
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
plt.tight_layout()
plt.savefig(os.path.join(self.output_dir, "rating_trends.png"))
return monthly_ratings
else:
print("Rating date information not available. Skipping trend analysis.")
return None
def prepare_content_text_with_description(self, data_df):
"""Prepare content text with heavy description weighting for semantic analysis."""
content_texts = []
for _, row in data_df.iterrows():
# Start with base text
text_parts = []
# Add title
if pd.notna(row.get("title")):
text_parts.append(row["title"])
# Add overview/description with emphasis (repeat 3x for higher weight)
if pd.notna(row.get("overview")):
# Add description 3 times to increase its influence
text_parts.append(row["overview"])
text_parts.append(row["overview"])
text_parts.append(row["overview"])
# Add genres
if pd.notna(row.get("genres")):
text_parts.append(row["genres"])
# Add keywords
if pd.notna(row.get("keywords")):
text_parts.append(row["keywords"])
# Add director/creator
if "director" in row and pd.notna(row.get("director")):
text_parts.append(row["director"])
elif "creator" in row and pd.notna(row.get("creator")):
text_parts.append(row["creator"])
# Add cast
if pd.notna(row.get("top_cast")):
text_parts.append(row["top_cast"])
# Combine all parts
content_texts.append(" ".join(text_parts))
return content_texts
def run_deep_learning_analyses(self, df, watchlist_df=None, feature_weights=None):
"""Run deep learning based analyses."""
print("\n===== DEEP LEARNING ANALYSIS =====\n")
try:
from sentence_transformers import SentenceTransformer
except ImportError:
print("Installing Sentence Transformers...")
import subprocess
subprocess.check_call(["pip", "install", "sentence-transformers"])
from sentence_transformers import SentenceTransformer
try:
from bertopic import BERTopic
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS
import nltk
import re
# Download NLTK stop words if not already available
try:
nltk.data.find("corpora/stopwords")
except LookupError:
nltk.download("stopwords")
from nltk.corpus import stopwords
# Create a comprehensive stop words list
custom_stop_words = set(
list(ENGLISH_STOP_WORDS)
+ list(stopwords.words("english"))
+ [
"movie",
"film",
"show",
"character",
"story",
"watch",
"like",
"good",
"great",
"bad",
"best",
"better",
"worse",
"she",
"he",
"they",
"it",
"this",
"that",
"these",
"those",
"a",
"an",
"the",
"and",
"but",
"if",
"or",
"because",
"as",
"until",
"while",
"of",
"at",
"by",
"for",
"with",
"about",
"against",
"between",
"into",
"through",
"during",
"before",
"after",
"above",
"below",
"to",
"from",
"up",
"down",
"in",
"out",
"on",
"off",
"over",
"under",
"again",
"further",
"then",
"once",
"here",
"there",
"when",
"where",
"why",
"how",
"all",
"any",
"both",
"each",
"few",
"more",
"most",
"other",
"some",
"such",
"no",
"nor",
"not",
"only",
"own",
"same",
"so",
"than",
"too",
"very",
"s",
"t",
"can",
"will",
"just",
"don",
"should",
"now",
"m",
"re",
"ve",
"y",
"isn",
"aren",
"doesn",
"didn",
"hadn",
"hasn",
"haven",
"isn",
"wasn",
"weren",
"won",
"wouldn",
]
)
print(
"\nPerforming topic modeling on descriptions with manual stop words removal..."
)
# Get descriptions from rated items
descriptions = df["overview"].fillna("").tolist()
if any(desc.strip() != "" for desc in descriptions):
# Preprocess descriptions to remove stop words
print("Preprocessing descriptions to remove stop words...")
# Create a function to clean text
def clean_text(text):
# Convert to lowercase
text = text.lower()
# Remove special characters and digits
text = re.sub(r"[^\w\s]", " ", text)
text = re.sub(r"\d+", " ", text)
# Tokenize
words = text.split()
# Remove stop words
words = [word for word in words if word not in custom_stop_words]
# Rejoin
return " ".join(words)
# Apply cleaning to descriptions
cleaned_descriptions = [clean_text(desc) for desc in descriptions]
# Filter out empty descriptions after cleaning
cleaned_descriptions = [
desc for desc in cleaned_descriptions if desc.strip()
]
if cleaned_descriptions:
print(
f"Pre-processing complete: {len(cleaned_descriptions)} descriptions with stop words removed"
)
# Configure BERTopic with simpler parameters since we already cleaned the text
from sklearn.feature_extraction.text import CountVectorizer
# Create vectorizer that doesn't need to handle stop words (we already did that)
vectorizer = CountVectorizer(min_df=2, max_df=0.85)
# Create the BERTopic model with more configuration to avoid visualization issues
topic_model = BERTopic(
min_topic_size=max(2, min(5, len(cleaned_descriptions) // 20)),
verbose=True,
vectorizer_model=vectorizer,
nr_topics="auto", # Let BERTopic decide the number of topics
)
# Fit model on cleaned descriptions
topics, probs = topic_model.fit_transform(cleaned_descriptions)
# Get topic info
topic_info = topic_model.get_topic_info()
# Save topic info
topic_info.to_csv(
os.path.join(self.dl_dir, "description_topics.csv"), index=False
)
# Print top topics
print("\nMost common themes in your content based on descriptions:")
for idx, row in (
topic_info[topic_info["Topic"] != -1].head(5).iterrows()
):
topic_words = topic_model.get_topic(row["Topic"])
# Just take the top 5 words
top_words = ", ".join([word for word, _ in topic_words[:5]])
print(
f"Theme {row['Topic']}: {top_words} ({row['Count']} items)"
)
# Create visualization if possible - with better error handling
try:
# Check if we have topics to visualize
if len(topic_info[topic_info["Topic"] != -1]) > 0:
print("Creating topic visualizations...")
# Use a simpler visualization approach first
# Create a manual CSV visualization instead
topics_readable = []
for topic_id in set(topics):
if topic_id != -1: # Skip the outlier topic
words = [
word
for word, _ in topic_model.get_topic(topic_id)[
:10
]
]
topics_readable.append(
{
"Topic": topic_id,
"Words": ", ".join(words),
"Count": topic_info[
topic_info["Topic"] == topic_id
]["Count"].values[0],
}
)
topics_df = pd.DataFrame(topics_readable)
if not topics_df.empty:
topics_df = topics_df.sort_values(
"Count", ascending=False
)
topics_df.to_csv(
os.path.join(self.dl_dir, "readable_topics.csv"),
index=False,
)
print(
f"Saved readable topic list to {os.path.join(self.dl_dir, 'readable_topics.csv')}"
)
# Try the interactive visualizations with safeguards
try:
# Try to create a simple bar chart instead of the complex visualization
import matplotlib.pyplot as plt
# Create a bar chart of topic frequencies
plt.figure(figsize=(12, 8))
topic_counts = topics_df.sort_values(
"Count", ascending=True
)
plt.barh(
topic_counts["Topic"].astype(str),
topic_counts["Count"],
)
plt.xlabel("Number of Documents")
plt.ylabel("Topic")
plt.title("Topic Distribution")
plt.tight_layout()
plt.savefig(
os.path.join(self.dl_dir, "topic_distribution.png")
)
# Now try the BERTopic visualizations with explicit checks
if hasattr(
topic_model, "visualize_topics"
) and callable(topic_model.visualize_topics):
fig = topic_model.visualize_topics()
if fig is not None:
fig.write_html(
os.path.join(
self.dl_dir, "description_topics.html"
)
)
print(
f"Saved interactive topic visualization to {os.path.join(self.dl_dir, 'description_topics.html')}"
)
if hasattr(
topic_model, "visualize_hierarchy"
) and callable(topic_model.visualize_hierarchy):
fig = topic_model.visualize_hierarchy()
if fig is not None:
fig.write_html(
os.path.join(
self.dl_dir, "topic_hierarchy.html"
)
)
print(
f"Saved topic hierarchy visualization to {os.path.join(self.dl_dir, 'topic_hierarchy.html')}"
)
except Exception as viz_error:
print(
f"Interactive visualizations failed: {str(viz_error)}"
)
print("Falling back to basic visualizations only.")
else:
print("No distinct topics found for visualization.")
except Exception as e:
print(f"Couldn't create topic visualizations: {str(e)}")
print("This is non-critical - continuing with analysis.")
else:
print(
"After stop word removal, no substantial description content remains for topic modeling."
)
else:
print("No descriptions found for topic modeling.")
except ImportError:
print("BERTopic not available. Skipping description-based topic modeling.")
print("You can install it with: pip install bertopic hdbscan")
except Exception as e:
print(f"Error during topic modeling: {str(e)}")
# DESCRIPTION-ENHANCED DEEP CONTENT EMBEDDINGS
print("\nGenerating deep content embeddings...")
# Prepare content text with description emphasis
print("Preparing content texts with description emphasis...")
df_content_texts = self.prepare_content_text_with_description(df)
# Create combined dataset with watchlist items
combined_content_texts = df_content_texts
if watchlist_df is not None and not watchlist_df.empty:
# Prepare watchlist content text with description emphasis
watchlist_content_texts = self.prepare_content_text_with_description(
watchlist_df
)
# Combine with main content texts
combined_content_texts = df_content_texts + watchlist_content_texts
# Load pre-trained sentence transformer
model_name = self.embeddings_model
print(f"Loading Sentence Transformer model: {model_name}")
st_model = SentenceTransformer(model_name)
# Generate embeddings in batches
batch_size = 32
all_embeddings = []
for i in range(0, len(combined_content_texts), batch_size):
end_idx = min(i + batch_size, len(combined_content_texts))
print(
f"Processing embeddings batch {i//batch_size + 1}/{len(combined_content_texts)//batch_size + 1}"
)
batch_texts = combined_content_texts[i:end_idx]
batch_embeddings = st_model.encode(
batch_texts,
show_progress_bar=False,
convert_to_tensor=True,
normalize_embeddings=True,
) # only for multilingual-e5-large-instruct , convert_to_tensor=True, normalize_embeddings=True
all_embeddings.append(batch_embeddings)
# Combine batches
embeddings = np.vstack(all_embeddings)
# Save embeddings for future use
np.save(
os.path.join(self.dl_dir, "description_enhanced_embeddings.npy"), embeddings
)
# DESCRIPTION-ENHANCED RECOMMENDATION SYSTEM
print("\nCreating description-enhanced recommendation system...")
# Get indices for rated items and watchlist items
n_rated = len(df)
rated_indices = list(range(n_rated))
watchlist_indices = (
list(range(n_rated, len(combined_content_texts)))
if watchlist_df is not None and not watchlist_df.empty
else []
)
# Create user preference profile from highly-rated items
highly_rated = df[df["rating"] >= self.high_rating_threshold]
if len(highly_rated) > 0:
highly_rated_indices = highly_rated.index.tolist()
user_profile = np.mean(embeddings[highly_rated_indices], axis=0)
else:
# Use all rated items as fallback
user_profile = np.mean(embeddings[:n_rated], axis=0)
# Compute similarity between user profile and all content
from sklearn.metrics.pairwise import cosine_similarity
user_profile_reshaped = user_profile.reshape(1, -1)
similarities = cosine_similarity(user_profile_reshaped, embeddings).flatten()
# Process watchlist items
recommendations = []
rated_titles = set(df["title"])
if watchlist_df is not None and not watchlist_df.empty:
for i, title in enumerate(watchlist_df["title"]):
if title not in rated_titles:
idx = watchlist_indices[i]
# Base score from semantic similarity
semantic_score = similarities[idx]
# Initialize component scores
component_scores = {
"semantic_similarity": semantic_score
* feature_weights["content"]
}
# TMDB rating component
if "vote_average" in watchlist_df.columns and pd.notna(
watchlist_df["vote_average"].iloc[i]
):
vote_score = watchlist_df["vote_average"].iloc[i] / 10
component_scores["vote_average"] = (
vote_score * feature_weights["vote_average"]
)
else:
component_scores["vote_average"] = 0
# Year recency component
year_score = 0
if "year" in watchlist_df.columns and pd.notna(
watchlist_df["year"].iloc[i]
):
current_year = datetime.now().year
years_old = current_year - watchlist_df["year"].iloc[i]
year_score = max(
0, 1 - (years_old / 50)
) # Linear decay over 50 years
component_scores["year"] = year_score * feature_weights["year"]
elif "release_date" in watchlist_df.columns and pd.notna(
watchlist_df["release_date"].iloc[i]
):
try:
release_year = pd.to_datetime(
watchlist_df["release_date"].iloc[i]
).year
current_year = datetime.now().year
years_old = current_year - release_year
year_score = max(0, 1 - (years_old / 50))
component_scores["year"] = (
year_score * feature_weights["year"]
)
except:
component_scores["year"] = 0
else:
component_scores["year"] = 0
# Popularity component
if "popularity" in watchlist_df.columns and pd.notna(
watchlist_df["popularity"].iloc[i]
):
# Normalize popularity (assuming max of 100, adjust as needed)
pop_score = min(1, watchlist_df["popularity"].iloc[i] / 100)
component_scores["popularity"] = (
pop_score * feature_weights["popularity"]
)
else:
component_scores["popularity"] = 0
# Runtime component
if "runtime" in watchlist_df.columns and pd.notna(
watchlist_df["runtime"].iloc[i]
):
# Prefer mid-length content (90-150 minutes)
runtime = watchlist_df["runtime"].iloc[i]
if 90 <= runtime <= 150:
runtime_score = 1.0
elif runtime < 90:
runtime_score = runtime / 90
else: # runtime > 150
runtime_score = max(
0, 1 - (runtime - 150) / 120
) # Linear decay after 150 mins
component_scores["runtime"] = (
runtime_score * feature_weights["runtime"]
)
else:
component_scores["runtime"] = 0
# Get description for this item
description = (
watchlist_df["overview"].iloc[i]
if "overview" in watchlist_df.columns
and pd.notna(watchlist_df["overview"].iloc[i])
else ""
)
# Calculate final score
final_score = sum(component_scores.values())
# Add to recommendations
recommendations.append(
{
"title": title,
"score": final_score,
"status": "In your watchlist",
"components": component_scores,
"description": (
description[:200] + "..."
if len(description) > 200
else description
),
}
)
# Sort recommendations by score
recommendations.sort(key=lambda x: x["score"], reverse=True)
# Create recommendation DataFrame
if recommendations:
rec_data = []
for rec in recommendations:
rec_data.append(
[
rec["title"],
rec["score"],
rec["status"],
rec["components"].get("semantic_similarity", 0),
rec["components"].get("vote_average", 0),
rec["components"].get("year", 0),
rec["components"].get("popularity", 0),
rec["components"].get("runtime", 0),
rec["description"],
]
)
rec_df = pd.DataFrame(
rec_data,
columns=[
"Title",
"Score",
"Status",
"Semantic Similarity",
"TMDB Rating",
"Year Recency",
"Popularity",
"Runtime",
"Description",
],
)
# Save recommendations with descriptions
rec_df.to_csv(
os.path.join(
self.dl_dir, "deep_recommendations_with_descriptions.csv"
),
index=False,
)
# Visualize top recommendations
top_n = min(10, len(rec_df))
top_recs = rec_df.head(top_n)
plt.figure(figsize=(14, 10))
# Create stacked bar chart for component visualization
components = top_recs[
[
"Semantic Similarity",
"TMDB Rating",
"Year Recency",
"Popularity",
"Runtime",
]
]
components = components.set_index(top_recs["Title"])
ax = components.plot(
kind="barh",
stacked=True,
figsize=(14, 10),
colormap="viridis",
width=0.7,
)
plt.title(
"Description-Enhanced Recommendations - Score Components",
fontsize=16,
)
plt.xlabel("Score Contribution", fontsize=14)
plt.ylabel("Title", fontsize=14)
plt.legend(title="Components", title_fontsize=12)
plt.tight_layout()
plt.savefig(
os.path.join(self.dl_dir, "deep_recommendations_components.png")
)
# Print top recommendations with descriptions
print("\nTop 5 Deep Learning Recommendations:")
for i, (_, row) in enumerate(top_recs.head(5).iterrows()):
print(f"{i+1}. {row['Title']} (Score: {row['Score']:.2f})")
print(
f" Semantic Match: {row['Semantic Similarity']:.2f}, TMDB: {row['TMDB Rating']:.2f}, Year: {row['Year Recency']:.2f}"
)
if row["Description"]:
print(f" Description: {row['Description']}")
# Similarity matrix between recommendations
if len(top_recs) > 1:
print("\nGenerating similarity matrix between recommendations...")
top_indices = [
watchlist_indices[i]
for i in range(len(watchlist_df))
if watchlist_df["title"].iloc[i] in top_recs["Title"].values
]
if top_indices:
# Extract embeddings for top recommendations
top_embeddings = embeddings[top_indices]
# Calculate similarity matrix
rec_similarities = cosine_similarity(top_embeddings)
# Create heatmap of recommendation similarities
plt.figure(figsize=(12, 10))
sns.heatmap(
rec_similarities,
annot=True,
fmt=".2f",
cmap="YlGnBu",
xticklabels=top_recs["Title"][: len(top_indices)],
yticklabels=top_recs["Title"][: len(top_indices)],
)
plt.title(
"Content Similarity Between Recommendations", fontsize=16
)
plt.tight_layout()
plt.savefig(
os.path.join(
self.dl_dir, "recommendation_similarity_matrix.png"
)
)
else:
print("No recommendations generated from watchlist.")
rec_df = pd.DataFrame()
else:
print("No watchlist items found for recommendations.")
rec_df = pd.DataFrame()
# Visualize content embeddings with dimensionality reduction
print("\nVisualizing content embeddings...")
# Use t-SNE for dimensionality reduction
from sklearn.manifold import TSNE
# Apply t-SNE to rated content only
rated_embeddings = embeddings[:n_rated]
# Determine good perplexity value based on data size
perplexity = min(30, max(5, len(rated_embeddings) // 10))
tsne = TSNE(n_components=2, random_state=42, perplexity=perplexity)
embeddings_2d = tsne.fit_transform(rated_embeddings)
# Create scatter plot colored by rating
plt.figure(figsize=(12, 10))
scatter = plt.scatter(
embeddings_2d[:, 0],
embeddings_2d[:, 1],
c=df["rating"],
cmap="viridis",
s=100,
alpha=0.7,
)
# Add color bar
cbar = plt.colorbar(scatter)
cbar.set_label("Your Rating", fontsize=12)
# Label some points for context
np.random.seed(42)
indices_to_label = np.random.choice(
range(len(df)), size=min(10, len(df)), replace=False
)
for idx in indices_to_label:
plt.annotate(
df["title"].iloc[idx],
(embeddings_2d[idx, 0], embeddings_2d[idx, 1]),
fontsize=9,
)
plt.title("Description-Enhanced Content Embeddings (t-SNE)", fontsize=16)
plt.savefig(
os.path.join(self.dl_dir, "description_enhanced_embeddings_tsne.png")
)
print(f"\nDeep learning analysis completed. Results saved to {self.dl_dir}")
return rec_df
def run_analysis(
self,
use_existing_data=False,
use_existing_watchlist=False,
feature_weights=None,
):
"""Run the complete analysis pipeline."""
watchlist_df = None
# Check if we should use existing data
if use_existing_data and os.path.exists(self.ratings_csv):
print(f"Using existing data from {self.ratings_csv}")
df = pd.read_csv(self.ratings_csv)
# Check for existing watchlist data
watchlist_path = os.path.join(self.output_dir, "tmdb_watchlist.csv")
if use_existing_watchlist and os.path.exists(watchlist_path):
print(f"Using existing watchlist data")
watchlist_df = pd.read_csv(watchlist_path)
print(f"Loaded {len(watchlist_df)} items from your watchlist")
else:
# Set up authentication
print("Setting up TMDB authentication...")
session_id, account_id = self.setup_authentication()
# Get ratings
print("Fetching your TMDB ratings...")
movie_ratings = self.get_ratings("movies", account_id, session_id)
tv_ratings = self.get_ratings("tv", account_id, session_id)
print(
f"Found {len(movie_ratings)} rated movies and {len(tv_ratings)} rated TV shows"
)
# Save raw ratings data in case of interruption
with open(os.path.join(self.output_dir, "raw_ratings.json"), "w") as f:
json.dump({"movies": movie_ratings, "tv": tv_ratings}, f)
# Get watchlist
print("\nFetching your TMDB watchlist...")
watchlist = self.get_watchlist(account_id, session_id)
print(
f"Found {len(watchlist['movies'])} movies and {len(watchlist['tv'])} TV shows in your watchlist"
)
# Save raw watchlist data
with open(os.path.join(self.output_dir, "raw_watchlist.json"), "w") as f:
json.dump(watchlist, f)
# Process watchlist items
watchlist_metadata = self.process_watchlist(watchlist)
# Create watchlist DataFrame and save to CSV
if watchlist_metadata:
watchlist_df = pd.DataFrame(watchlist_metadata)
watchlist_df.to_csv(
os.path.join(self.output_dir, "tmdb_watchlist.csv"),
index=False,
encoding="utf-8",
)
print(
f"Watchlist with {len(watchlist_df)} items saved to {os.path.join(self.output_dir, 'tmdb_watchlist.csv')}"
)
else:
watchlist_df = pd.DataFrame()
print("No items found in your watchlist.")
# Process all rated items
all_metadata = []
print("Fetching additional metadata for movies...")
for i, movie in enumerate(movie_ratings):
print(
f"Processing movie {i+1}/{len(movie_ratings)}: {movie.get('title', movie.get('name', 'Unknown'))}"
)
details = self.get_details("movie", movie["id"])
if details:
metadata = self.extract_metadata(movie, details, "movies")
all_metadata.append(metadata)
# Save progress after each batch of 10 items
if (i + 1) % 10 == 0:
temp_df = pd.DataFrame(all_metadata)
temp_df.to_csv(
os.path.join(self.output_dir, "progress_data.csv"),
index=False,
encoding="utf-8",
)
print("Fetching additional metadata for TV shows...")
for i, tv in enumerate(tv_ratings):
print(
f"Processing TV show {i+1}/{len(tv_ratings)}: {tv.get('name', tv.get('title', 'Unknown'))}"
)
details = self.get_details("tv", tv["id"])
if details:
metadata = self.extract_metadata(tv, details, "tv")
all_metadata.append(metadata)
# Save progress after each batch of 10 items
if (i + 1) % 10 == 0:
temp_df = pd.DataFrame(all_metadata)
temp_df.to_csv(
os.path.join(self.output_dir, "progress_data.csv"),
index=False,
encoding="utf-8",
)
# Create DataFrame and save to CSV
df = pd.DataFrame(all_metadata)
df.to_csv(self.ratings_csv, index=False, encoding="utf-8")
print(f"Ratings with metadata saved to {self.ratings_csv}")
# Generate statistics
print("Generating basic statistics...")
stats = self.generate_statistics(df)
# Display statistics
self.display_statistics(stats, df)
# Save statistics to CSV
self.save_statistics_to_csv(stats, self.stats_csv)
# Find rating anomalies
anomalies = self.find_rating_anomalies(df)
# Analyze rating trends over time
trend_analysis = self.analyze_rating_trends(df)
# Run deep learning analysis
recommendations = self.run_deep_learning_analyses(
df, watchlist_df, feature_weights
)
print("\nAnalysis complete!")
return df, watchlist_df, recommendations
if __name__ == "__main__":
# Hard-coded configuration parameters
config = {
"api_key": os.getenv("TMDB_API_KEY"), # Use from .env file
"base_url": "https://api.themoviedb.org/3", # TMDB API base URL
"output_dir": "tmdb_analysis", # Output directory
"max_requests": 50, # Rate limiting - requests per window
"request_window": 10, # Rate limiting - window in seconds
"top_n_companies": 25, # Top N production companies to analyze
"top_n_directors": 25, # Top N directors to analyze
"top_n_actors": 30, # Top N actors to analyze
"high_rating_threshold": 8.0, # Threshold for high ratings
"embeddings_model": "intfloat/multilingual-e5-large-instruct", # Sentence transformer model for embeddings
}
# ADJUSTED FEATURE WEIGHTS TO EMPHASIZE DESCRIPTION-BASED SIMILARITY
feature_weights = {
"vote_average": 0.20, # TMDB rating
"year": 0.15, # Release year recency
"popularity": 0.10, # Popularity
"runtime": 0.05, # Runtime
"content": 0.50, # Content similarity (significantly increased)
}
# Initialize analyzer with our configuration
analyzer = TMDBAnalyzer(
api_key=config["api_key"],
base_url=config["base_url"],
output_dir=config["output_dir"],
max_requests=config["max_requests"],
request_window=config["request_window"],
top_n_companies=config["top_n_companies"],
top_n_directors=config["top_n_directors"],
top_n_actors=config["top_n_actors"],
high_rating_threshold=config["high_rating_threshold"],
embeddings_model=config["embeddings_model"],
)
# Check for existing data
data_exists = os.path.exists(
os.path.join(config["output_dir"], "tmdb_ratings_with_metadata.csv")
)
watchlist_exists = os.path.exists(
os.path.join(config["output_dir"], "tmdb_watchlist.csv")
)
if data_exists:
print(f"Found existing data in {config['output_dir']}")
use_existing = (
input("Would you like to use existing data? (y/n): ").strip().lower()
)
use_existing_data = use_existing == "y"
if use_existing_data and watchlist_exists:
use_existing_watchlist = (
input("Would you like to use existing watchlist data? (y/n): ")
.strip()
.lower()
== "y"
)
else:
use_existing_watchlist = False
else:
use_existing_data = False
use_existing_watchlist = False
# Run the analysis
df, watchlist_df, recommendations = analyzer.run_analysis(
use_existing_data=use_existing_data,
use_existing_watchlist=use_existing_watchlist,
feature_weights=feature_weights,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment