Created
March 5, 2025 18:02
-
-
Save cleverdevil/5e5809884d365d427f27cfd15be37526 to your computer and use it in GitHub Desktop.
Script to replicate collections and poster art from Plex to Jellyfin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import logging | |
from base64 import b64encode | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class JellyfinClient: | |
def __init__(self, url, api_key): | |
""" | |
Initialize the Jellyfin client. | |
Args: | |
url (str): Base URL of the Jellyfin server | |
api_key (str): API key for authentication | |
""" | |
self.url = url.rstrip('/') | |
self.api_key = api_key | |
self._headers = { | |
'X-Emby-Token': api_key, # Note the X-Emby-Token instead of Authorization header | |
'Content-Type': 'application/json' | |
} | |
# Get user ID | |
users = self._get('/users') | |
if not users: | |
raise ValueError("Failed to retrieve Jellyfin users") | |
self.user_id = users[0]['Id'] | |
print(f"Connected to Jellyfin as user ID: {self.user_id}") | |
def _request(self, verb, path, params=None, data=None, headers=None, binary_data=None): | |
""" | |
Make an HTTP request to the Jellyfin API. | |
Args: | |
verb (str): HTTP method (get, post, delete) | |
path (str): API endpoint path | |
params (dict): Query parameters | |
data (dict): Request body data for POST requests (JSON) | |
headers (dict): Optional headers to override defaults | |
binary_data (bytes): Raw binary data for POST requests | |
Returns: | |
dict or bytes: JSON response from the API or raw bytes for binary data | |
""" | |
# Make sure path starts with a slash | |
if not path.startswith('/'): | |
path = '/' + path | |
# Build the full URL | |
full_url = f'{self.url}{path}' | |
params = params or {} | |
# Use provided headers or default headers | |
request_headers = headers or self._headers.copy() | |
try: | |
if verb.lower() == 'get': | |
r = requests.get(full_url, headers=request_headers, params=params) | |
elif verb.lower() == 'post': | |
if binary_data: | |
r = requests.post(full_url, headers=request_headers, params=params, data=binary_data) | |
elif data: | |
r = requests.post(full_url, headers=request_headers, params=params, json=data) | |
else: | |
r = requests.post(full_url, headers=request_headers, params=params) | |
elif verb.lower() == 'delete': | |
r = requests.delete(full_url, headers=request_headers, params=params) | |
else: | |
raise ValueError(f"Unsupported HTTP method: {verb}") | |
r.raise_for_status() | |
# If the response is JSON, parse it; otherwise return raw content | |
if 'application/json' in r.headers.get('Content-Type', ''): | |
return r.json() if r.content else {} | |
return r.content | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Request error ({verb.upper()} {path}): {e}") | |
if hasattr(e, 'response') and e.response is not None: | |
try: | |
if 'application/json' in e.response.headers.get('Content-Type', ''): | |
error_msg = e.response.json() | |
logger.error(f"Server response: {error_msg}") | |
else: | |
logger.error(f"Status code: {e.response.status_code}") | |
logger.error(f"Response text: {e.response.text[:200]}...") | |
except: | |
logger.error(f"Status code: {e.response.status_code}") | |
logger.error(f"Response text: {e.response.text[:200] if hasattr(e.response, 'text') else 'No text'}...") | |
raise | |
def _get(self, path, params=None): | |
"""Make a GET request to the Jellyfin API.""" | |
return self._request('get', path, params) | |
def _post(self, path, params=None, data=None, headers=None, binary_data=None): | |
"""Make a POST request to the Jellyfin API.""" | |
return self._request('post', path, params, data, headers, binary_data) | |
def _delete(self, path, params=None): | |
"""Make a DELETE request to the Jellyfin API.""" | |
return self._request('delete', path, params) | |
def items_list(self, parentId=None, limit=None, recursive=False, include_fields=False): | |
""" | |
Get a list of items from Jellyfin. | |
Args: | |
parentId (str): ID of the parent folder | |
limit (int): Maximum number of items to return | |
recursive (bool): Whether to include items in subfolders | |
include_fields (bool): Whether to include additional fields | |
Returns: | |
dict: JSON response with items | |
""" | |
params = {} | |
if parentId: | |
params['parentId'] = parentId | |
if limit: | |
params['limit'] = limit | |
if recursive: | |
params['recursive'] = 'true' | |
# Request provider IDs and other fields to avoid additional API calls | |
if include_fields: | |
params['fields'] = 'ProviderIds,Path' | |
try: | |
return self._get('/Items', params) | |
except Exception as e: | |
logger.error(f"Error listing items: {e}") | |
return {"Items": []} | |
def item(self, item_id): | |
""" | |
Get details for a specific item. | |
Args: | |
item_id (str): Item ID | |
Returns: | |
dict: Item details | |
""" | |
try: | |
return self._get(f'/Users/{self.user_id}/Items/{item_id}') | |
except Exception as e: | |
logger.error(f"Error getting item {item_id}: {e}") | |
return {} | |
def create_collection(self, name, ids): | |
""" | |
Create a new collection with the given items, handling large collections by creating | |
them in chunks to avoid URL length limitations. | |
Args: | |
name (str): Collection name | |
ids (list): List of item IDs to include | |
Returns: | |
dict: Collection details | |
""" | |
if not ids: | |
return {"Status": "No items to add to collection"} | |
# First, create an empty collection | |
logger.info(f"Creating collection '{name}'...") | |
result = self._post('/Collections', {'Name': name, 'Ids': ''}) | |
if not result or 'Id' not in result: | |
raise Exception(f"Failed to create collection: {result}") | |
collection_id = result['Id'] | |
# Then add items in chunks to avoid URL length limitations | |
CHUNK_SIZE = 50 # Adjust based on your server's limitations | |
total_chunks = (len(ids) + CHUNK_SIZE - 1) // CHUNK_SIZE | |
for i in range(0, len(ids), CHUNK_SIZE): | |
chunk = ids[i:i + CHUNK_SIZE] | |
chunk_num = (i // CHUNK_SIZE) + 1 | |
logger.info(f"Adding chunk {chunk_num}/{total_chunks} ({len(chunk)} items) to collection '{name}'...") | |
try: | |
# Add items to the collection | |
self._post(f'/Collections/{collection_id}/Items', {'Ids': ','.join(chunk)}) | |
except Exception as e: | |
logger.error(f"Error adding chunk {chunk_num} to collection '{name}': {e}") | |
# Continue with next chunk even if this one fails | |
return {"Status": f"Collection '{name}' created with {len(ids)} items in {total_chunks} chunks", "Id": collection_id} | |
def add_to_collection(self, collection_id, ids): | |
""" | |
Add items to an existing collection. | |
Args: | |
collection_id (str): Collection ID | |
ids (list): List of item IDs to add | |
Returns: | |
dict: Response from the server | |
""" | |
if not ids: | |
return {"Status": "No items to add"} | |
# Add items in chunks to avoid URL length limitations | |
CHUNK_SIZE = 50 # Adjust based on your server's limitations | |
total_chunks = (len(ids) + CHUNK_SIZE - 1) // CHUNK_SIZE | |
for i in range(0, len(ids), CHUNK_SIZE): | |
chunk = ids[i:i + CHUNK_SIZE] | |
chunk_num = (i // CHUNK_SIZE) + 1 | |
logger.info(f"Adding chunk {chunk_num}/{total_chunks} ({len(chunk)} items) to collection...") | |
try: | |
# Add items to the collection | |
self._post(f'/Collections/{collection_id}/Items', {'Ids': ','.join(chunk)}) | |
except Exception as e: | |
logger.error(f"Error adding chunk {chunk_num} to collection: {e}") | |
# Continue with next chunk even if this one fails | |
return {"Status": f"Added {len(ids)} items to collection in {total_chunks} chunks"} | |
def remove_collection(self, collection_id): | |
""" | |
Delete a collection. | |
Args: | |
collection_id (str): Collection ID | |
Returns: | |
dict: Response from the server | |
""" | |
return self._delete(f'/Items/{collection_id}') | |
def collections_list(self): | |
""" | |
Get all collections. | |
Returns: | |
dict: List of collections | |
""" | |
return self._get(f"/Users/{self.user_id}/Items", { | |
'IncludeItemTypes': 'BoxSet', | |
'Recursive': 'true' | |
}) | |
@staticmethod | |
def get_content_type(file_path: str) -> str: | |
""" | |
Get content type based on file extension. | |
Args: | |
file_path (str): Path to the file | |
Returns: | |
str: Content type | |
""" | |
ext = file_path.split('.')[-1].lower() | |
return { | |
'png': 'image/png', | |
'jpg': 'image/jpeg', | |
'jpeg': 'image/jpeg', | |
'webp': 'image/webp' | |
}.get(ext, 'application/octet-stream') | |
def upload_image(self, item_id, image_type, image_data, image_ext='jpg'): | |
""" | |
Upload an image to an item in Jellyfin using base64 encoding. | |
Args: | |
item_id (str): ID of the item | |
image_type (str): Type of image ('Primary', 'Backdrop', etc.) | |
image_data (bytes): Raw image data | |
image_ext (str): Image extension to determine content type | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
try: | |
# Determine content type based on extension | |
content_type = { | |
'png': 'image/png', | |
'jpg': 'image/jpeg', | |
'jpeg': 'image/jpeg', | |
'webp': 'image/webp' | |
}.get(image_ext.lower(), 'image/jpeg') | |
# Base64 encode the image data | |
encoded_data = b64encode(image_data) | |
# Set up the URL and headers | |
url = f"/Items/{item_id}/Images/{image_type}/0" | |
headers = { | |
'X-Emby-Token': self.api_key, | |
'Content-Type': content_type | |
} | |
# Make the request | |
self._post(url, headers=headers, binary_data=encoded_data) | |
logger.info(f"Successfully uploaded {image_type} image for item {item_id}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to upload image for item {item_id}: {e}") | |
return False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
import requests | |
import config | |
from plexapi.server import PlexServer | |
from jellyfin import JellyfinClient | |
from collections import defaultdict | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Initialize Plex | |
logger.info(f"Connecting to Plex at {config.PLEX_URL}...") | |
plex = PlexServer(config.PLEX_URL, config.PLEX_TOKEN) | |
logger.info("Successfully connected to Plex") | |
# Initialize Jellyfin | |
logger.info(f"Connecting to Jellyfin at {config.JELLYFIN_URL}...") | |
jellyfin = JellyfinClient(config.JELLYFIN_URL, config.JELLYFIN_API_KEY) | |
logger.info("Successfully connected to Jellyfin") | |
def get_plex_collections(): | |
""" | |
Gather all Plex collections and their media with IMDB IDs. | |
Returns: | |
dict: Mapping of collection names to sets of IMDB IDs | |
dict: Mapping of collection names to Plex collection objects (for images) | |
""" | |
collections = defaultdict(set) | |
collection_objects = {} | |
media_to_imdb = {} | |
# Get all media items and their IMDB IDs | |
logger.info(f"Scanning Plex libraries {config.PLEX_LIBRARIES} for media and their IMDB IDs...") | |
for library in plex.library.sections(): | |
if library.title in config.PLEX_LIBRARIES: | |
logger.info(f"Processing library: {library.title} (type: {library.type})") | |
for media in library.all(): | |
imdb_id = None | |
for guid in media.guids: | |
if guid.id.startswith('imdb'): | |
imdb_id = guid.id.replace('imdb://', '') | |
media_to_imdb[media] = imdb_id | |
break | |
if not imdb_id: | |
logger.warning(f"No IMDB ID found for {library.type}: {media.title}") | |
# Process all collections | |
logger.info("Processing Plex collections...") | |
for library in plex.library.sections(): | |
if library.title in config.PLEX_LIBRARIES: | |
logger.info(f"Processing collections in library: {library.title}") | |
for collection in library.collections(): | |
collection_objects[collection.title] = collection | |
for media in collection.items(): | |
if media in media_to_imdb: # Only include media with IMDB IDs | |
collections[collection.title].add(media_to_imdb[media]) | |
# Remove empty collections | |
collections = {name: imdb_ids for name, imdb_ids in collections.items() if imdb_ids} | |
logger.info(f"Found {len(collections)} collections across all libraries") | |
return collections, collection_objects | |
def get_jellyfin_media(): | |
""" | |
Pre-fetch all media in Jellyfin from the configured libraries and map them by IMDB ID. | |
Returns: | |
dict: Mapping of IMDB IDs to Jellyfin media objects | |
""" | |
jf_media = {} | |
# Process each library type configured in PLEX_LIBRARIES | |
for library_name in config.PLEX_LIBRARIES: | |
logger.info(f"Fetching Jellyfin {library_name} library...") | |
items = jellyfin.items_list() | |
library_items = [item for item in items['Items'] if item['Name'] == library_name] | |
if not library_items: | |
logger.warning(f"No '{library_name}' library found in Jellyfin") | |
continue | |
library_id = library_items[0]['Id'] | |
# Loop through the items in the Jellyfin library | |
logger.info(f"Mapping Jellyfin {library_name} by IMDB ID...") | |
item_count = 0 | |
items_with_imdb = 0 | |
# Use recursive parameter to get all items at once with all fields | |
library_contents = jellyfin.items_list( | |
parentId=library_id, | |
recursive=True, | |
include_fields=True | |
) | |
# Process each item, looking for IMDB IDs directly in the item data | |
for item in library_contents.get('Items', []): | |
# Skip folders, collections, and other non-media items | |
item_type = item.get('Type', '') | |
if item_type in ['Folder', 'CollectionFolder', 'UserView', 'BoxSet']: | |
continue | |
item_count += 1 | |
if item_count % 100 == 0: | |
logger.info(f"Processed {item_count} items in {library_name}...") | |
# Extract IMDB ID directly from the item data | |
provider_ids = item.get('ProviderIds', {}) | |
for provider, provider_id in provider_ids.items(): | |
if provider.lower() == 'imdb': | |
jf_media[provider_id] = item | |
items_with_imdb += 1 | |
break | |
logger.info(f"Found {item_count} items in {library_name}, of which {items_with_imdb} have IMDB IDs") | |
logger.info(f"Total: Found {len(jf_media)} media items with IMDB IDs across all Jellyfin libraries") | |
return jf_media | |
def clean_jellyfin_collections(): | |
""" | |
Remove all existing collections from Jellyfin. | |
""" | |
logger.info("Removing existing Jellyfin collections...") | |
existing_collections = jellyfin.collections_list() | |
removed_count = 0 | |
for collection in existing_collections.get('Items', []): | |
logger.info(f"Removing collection: {collection['Name']}") | |
try: | |
jellyfin.remove_collection(collection['Id']) | |
removed_count += 1 | |
except Exception as e: | |
logger.error(f"Error removing collection {collection['Name']}: {e}") | |
logger.info(f"Removed {removed_count} collections from Jellyfin") | |
def get_plex_image_data(plex_obj, image_type="thumb"): | |
""" | |
Get image data from a Plex object. | |
Args: | |
plex_obj: Plex object (movie, show, collection, etc.) | |
image_type (str): Type of image ("thumb", "art", "poster", etc.) | |
Returns: | |
tuple: (image_data, extension) or (None, None) if image doesn't exist | |
""" | |
try: | |
image_url = None | |
# Get the appropriate image URL | |
if image_type == "thumb" or image_type == "poster": | |
if hasattr(plex_obj, 'thumb') and plex_obj.thumb: | |
image_url = plex_obj.thumb | |
elif image_type == "art": | |
if hasattr(plex_obj, 'art') and plex_obj.art: | |
image_url = plex_obj.art | |
elif image_type == "banner": | |
if hasattr(plex_obj, 'banner') and plex_obj.banner: | |
image_url = plex_obj.banner | |
if not image_url: | |
return None, None | |
# Construct full URL with token | |
full_url = f"{config.PLEX_URL}{image_url}?X-Plex-Token={config.PLEX_TOKEN}" | |
# Get the image data | |
response = requests.get(full_url) | |
response.raise_for_status() | |
# Determine image extension from content type | |
content_type = response.headers.get('Content-Type', '') | |
extension = { | |
'image/jpeg': 'jpg', | |
'image/png': 'png', | |
'image/webp': 'webp' | |
}.get(content_type, 'jpg') | |
return response.content, extension | |
except Exception as e: | |
logger.error(f"Error getting image for {plex_obj.title if hasattr(plex_obj, 'title') else 'unknown item'}: {e}") | |
return None, None | |
def sync_collection_images(plex_collection, jellyfin_collection_id): | |
""" | |
Sync images from a Plex collection to a Jellyfin collection. | |
Args: | |
plex_collection: Plex collection object | |
jellyfin_collection_id (str): Jellyfin collection ID | |
Returns: | |
bool: True if any images were synced, False otherwise | |
""" | |
images_synced = False | |
# Try to sync the poster image (thumb in Plex) | |
try: | |
image_data, extension = get_plex_image_data(plex_collection, "thumb") | |
if image_data: | |
logger.info(f"Uploading poster image for collection '{plex_collection.title}'...") | |
if jellyfin.upload_image(jellyfin_collection_id, "Primary", image_data, extension): | |
images_synced = True | |
logger.info(f"Successfully uploaded poster image for collection '{plex_collection.title}'") | |
except Exception as e: | |
logger.error(f"Error syncing image for collection '{plex_collection.title}': {e}") | |
return images_synced | |
def build_plex_media_map(): | |
""" | |
Build a mapping of IMDB IDs to Plex media objects. | |
Returns: | |
dict: Mapping of IMDB IDs to Plex media objects | |
""" | |
plex_media_map = {} | |
for library in plex.library.sections(): | |
if library.title in config.PLEX_LIBRARIES: | |
for media in library.all(): | |
imdb_id = None | |
for guid in media.guids: | |
if guid.id.startswith('imdb'): | |
imdb_id = guid.id.replace('imdb://', '') | |
plex_media_map[imdb_id] = media | |
break | |
return plex_media_map | |
def sync_media_images(plex_media_map, jellyfin_media_map): | |
""" | |
Sync images from Plex media items to Jellyfin media items using base64 encoding. | |
Args: | |
plex_media_map (dict): Mapping of IMDB IDs to Plex media objects | |
jellyfin_media_map (dict): Mapping of IMDB IDs to Jellyfin media objects | |
Returns: | |
int: Number of items with synced images | |
""" | |
synced_count = 0 | |
common_imdb_ids = set(plex_media_map.keys()) & set(jellyfin_media_map.keys()) | |
total_items = len(common_imdb_ids) | |
logger.info(f"Syncing images for {total_items} media items...") | |
# Process in batches to show progress | |
processed = 0 | |
for imdb_id in common_imdb_ids: | |
processed += 1 | |
plex_obj = plex_media_map[imdb_id] | |
jellyfin_id = jellyfin_media_map[imdb_id]['Id'] | |
# Just try to sync the primary/poster image | |
try: | |
image_data, extension = get_plex_image_data(plex_obj, "thumb") | |
if image_data and jellyfin.upload_image(jellyfin_id, "Primary", image_data, extension): | |
synced_count += 1 | |
except Exception as e: | |
logger.error(f"Error syncing image for {plex_obj.title}: {e}") | |
# Show progress periodically | |
if processed % 20 == 0 or processed == total_items: | |
logger.info(f"Processed {processed}/{total_items} items, synced images for {synced_count} items") | |
return synced_count | |
def sync_collections(): | |
""" | |
Main function to synchronize collections from Plex to Jellyfin. | |
""" | |
logger.info("\n=== Starting Plex to Jellyfin Collection Synchronization ===\n") | |
start_time = time.time() | |
# Step 1: Get Plex collections data | |
logger.info("\n--- Gathering Plex Data ---") | |
plex_collections, plex_collection_objects = get_plex_collections() | |
# Step 2: Get Jellyfin media data | |
logger.info("\n--- Gathering Jellyfin Data ---") | |
jellyfin_media = get_jellyfin_media() | |
# Step 3: Clean existing Jellyfin collections | |
logger.info("\n--- Cleaning Jellyfin Collections ---") | |
clean_jellyfin_collections() | |
# Step 4: Create new collections in Jellyfin | |
logger.info("\n--- Creating Jellyfin Collections ---") | |
collections_created = 0 | |
collections_failed = 0 | |
collections_with_images = 0 | |
for collection_name, imdb_ids in plex_collections.items(): | |
# Convert IMDB IDs to Jellyfin media IDs | |
jellyfin_item_ids = [] | |
for imdb_id in imdb_ids: | |
if imdb_id in jellyfin_media: | |
jellyfin_item_ids.append(jellyfin_media[imdb_id]['Id']) | |
if not jellyfin_item_ids: | |
logger.info(f"Skipping collection '{collection_name}' - no matching items in Jellyfin") | |
continue | |
logger.info(f"Creating collection '{collection_name}' with {len(jellyfin_item_ids)} items") | |
try: | |
result = jellyfin.create_collection(collection_name, jellyfin_item_ids) | |
collections_created += 1 | |
# Sync collection images if enabled | |
if 'Id' in result: | |
plex_collection = plex_collection_objects[collection_name] | |
if sync_collection_images(plex_collection, result['Id']): | |
collections_with_images += 1 | |
except Exception as e: | |
logger.error(f"Error creating collection '{collection_name}': {e}") | |
collections_failed += 1 | |
# Step 5: Sync media images | |
media_with_images = 0 | |
logger.info("\n--- Syncing Media Images ---") | |
plex_media_map = build_plex_media_map() | |
media_with_images = sync_media_images(plex_media_map, jellyfin_media) | |
# Summary | |
elapsed_time = time.time() - start_time | |
logger.info(f"\n=== Synchronization Complete ===") | |
logger.info(f"Time elapsed: {elapsed_time:.2f} seconds") | |
logger.info(f"Collections created: {collections_created}") | |
logger.info(f"Collections failed: {collections_failed}") | |
logger.info(f"Collections with images: {collections_with_images}") | |
logger.info(f"Media items with images: {media_with_images}") | |
if __name__ == "__main__": | |
sync_collections() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment