Skip to content

Instantly share code, notes, and snippets.

@cleverdevil
Created March 5, 2025 18:02
Show Gist options
  • Save cleverdevil/5e5809884d365d427f27cfd15be37526 to your computer and use it in GitHub Desktop.
Save cleverdevil/5e5809884d365d427f27cfd15be37526 to your computer and use it in GitHub Desktop.
Script to replicate collections and poster art from Plex to Jellyfin
import requests
import json
import logging
from base64 import b64encode
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class JellyfinClient:
def __init__(self, url, api_key):
"""
Initialize the Jellyfin client.
Args:
url (str): Base URL of the Jellyfin server
api_key (str): API key for authentication
"""
self.url = url.rstrip('/')
self.api_key = api_key
self._headers = {
'X-Emby-Token': api_key, # Note the X-Emby-Token instead of Authorization header
'Content-Type': 'application/json'
}
# Get user ID
users = self._get('/users')
if not users:
raise ValueError("Failed to retrieve Jellyfin users")
self.user_id = users[0]['Id']
print(f"Connected to Jellyfin as user ID: {self.user_id}")
def _request(self, verb, path, params=None, data=None, headers=None, binary_data=None):
"""
Make an HTTP request to the Jellyfin API.
Args:
verb (str): HTTP method (get, post, delete)
path (str): API endpoint path
params (dict): Query parameters
data (dict): Request body data for POST requests (JSON)
headers (dict): Optional headers to override defaults
binary_data (bytes): Raw binary data for POST requests
Returns:
dict or bytes: JSON response from the API or raw bytes for binary data
"""
# Make sure path starts with a slash
if not path.startswith('/'):
path = '/' + path
# Build the full URL
full_url = f'{self.url}{path}'
params = params or {}
# Use provided headers or default headers
request_headers = headers or self._headers.copy()
try:
if verb.lower() == 'get':
r = requests.get(full_url, headers=request_headers, params=params)
elif verb.lower() == 'post':
if binary_data:
r = requests.post(full_url, headers=request_headers, params=params, data=binary_data)
elif data:
r = requests.post(full_url, headers=request_headers, params=params, json=data)
else:
r = requests.post(full_url, headers=request_headers, params=params)
elif verb.lower() == 'delete':
r = requests.delete(full_url, headers=request_headers, params=params)
else:
raise ValueError(f"Unsupported HTTP method: {verb}")
r.raise_for_status()
# If the response is JSON, parse it; otherwise return raw content
if 'application/json' in r.headers.get('Content-Type', ''):
return r.json() if r.content else {}
return r.content
except requests.exceptions.RequestException as e:
logger.error(f"Request error ({verb.upper()} {path}): {e}")
if hasattr(e, 'response') and e.response is not None:
try:
if 'application/json' in e.response.headers.get('Content-Type', ''):
error_msg = e.response.json()
logger.error(f"Server response: {error_msg}")
else:
logger.error(f"Status code: {e.response.status_code}")
logger.error(f"Response text: {e.response.text[:200]}...")
except:
logger.error(f"Status code: {e.response.status_code}")
logger.error(f"Response text: {e.response.text[:200] if hasattr(e.response, 'text') else 'No text'}...")
raise
def _get(self, path, params=None):
"""Make a GET request to the Jellyfin API."""
return self._request('get', path, params)
def _post(self, path, params=None, data=None, headers=None, binary_data=None):
"""Make a POST request to the Jellyfin API."""
return self._request('post', path, params, data, headers, binary_data)
def _delete(self, path, params=None):
"""Make a DELETE request to the Jellyfin API."""
return self._request('delete', path, params)
def items_list(self, parentId=None, limit=None, recursive=False, include_fields=False):
"""
Get a list of items from Jellyfin.
Args:
parentId (str): ID of the parent folder
limit (int): Maximum number of items to return
recursive (bool): Whether to include items in subfolders
include_fields (bool): Whether to include additional fields
Returns:
dict: JSON response with items
"""
params = {}
if parentId:
params['parentId'] = parentId
if limit:
params['limit'] = limit
if recursive:
params['recursive'] = 'true'
# Request provider IDs and other fields to avoid additional API calls
if include_fields:
params['fields'] = 'ProviderIds,Path'
try:
return self._get('/Items', params)
except Exception as e:
logger.error(f"Error listing items: {e}")
return {"Items": []}
def item(self, item_id):
"""
Get details for a specific item.
Args:
item_id (str): Item ID
Returns:
dict: Item details
"""
try:
return self._get(f'/Users/{self.user_id}/Items/{item_id}')
except Exception as e:
logger.error(f"Error getting item {item_id}: {e}")
return {}
def create_collection(self, name, ids):
"""
Create a new collection with the given items, handling large collections by creating
them in chunks to avoid URL length limitations.
Args:
name (str): Collection name
ids (list): List of item IDs to include
Returns:
dict: Collection details
"""
if not ids:
return {"Status": "No items to add to collection"}
# First, create an empty collection
logger.info(f"Creating collection '{name}'...")
result = self._post('/Collections', {'Name': name, 'Ids': ''})
if not result or 'Id' not in result:
raise Exception(f"Failed to create collection: {result}")
collection_id = result['Id']
# Then add items in chunks to avoid URL length limitations
CHUNK_SIZE = 50 # Adjust based on your server's limitations
total_chunks = (len(ids) + CHUNK_SIZE - 1) // CHUNK_SIZE
for i in range(0, len(ids), CHUNK_SIZE):
chunk = ids[i:i + CHUNK_SIZE]
chunk_num = (i // CHUNK_SIZE) + 1
logger.info(f"Adding chunk {chunk_num}/{total_chunks} ({len(chunk)} items) to collection '{name}'...")
try:
# Add items to the collection
self._post(f'/Collections/{collection_id}/Items', {'Ids': ','.join(chunk)})
except Exception as e:
logger.error(f"Error adding chunk {chunk_num} to collection '{name}': {e}")
# Continue with next chunk even if this one fails
return {"Status": f"Collection '{name}' created with {len(ids)} items in {total_chunks} chunks", "Id": collection_id}
def add_to_collection(self, collection_id, ids):
"""
Add items to an existing collection.
Args:
collection_id (str): Collection ID
ids (list): List of item IDs to add
Returns:
dict: Response from the server
"""
if not ids:
return {"Status": "No items to add"}
# Add items in chunks to avoid URL length limitations
CHUNK_SIZE = 50 # Adjust based on your server's limitations
total_chunks = (len(ids) + CHUNK_SIZE - 1) // CHUNK_SIZE
for i in range(0, len(ids), CHUNK_SIZE):
chunk = ids[i:i + CHUNK_SIZE]
chunk_num = (i // CHUNK_SIZE) + 1
logger.info(f"Adding chunk {chunk_num}/{total_chunks} ({len(chunk)} items) to collection...")
try:
# Add items to the collection
self._post(f'/Collections/{collection_id}/Items', {'Ids': ','.join(chunk)})
except Exception as e:
logger.error(f"Error adding chunk {chunk_num} to collection: {e}")
# Continue with next chunk even if this one fails
return {"Status": f"Added {len(ids)} items to collection in {total_chunks} chunks"}
def remove_collection(self, collection_id):
"""
Delete a collection.
Args:
collection_id (str): Collection ID
Returns:
dict: Response from the server
"""
return self._delete(f'/Items/{collection_id}')
def collections_list(self):
"""
Get all collections.
Returns:
dict: List of collections
"""
return self._get(f"/Users/{self.user_id}/Items", {
'IncludeItemTypes': 'BoxSet',
'Recursive': 'true'
})
@staticmethod
def get_content_type(file_path: str) -> str:
"""
Get content type based on file extension.
Args:
file_path (str): Path to the file
Returns:
str: Content type
"""
ext = file_path.split('.')[-1].lower()
return {
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'webp': 'image/webp'
}.get(ext, 'application/octet-stream')
def upload_image(self, item_id, image_type, image_data, image_ext='jpg'):
"""
Upload an image to an item in Jellyfin using base64 encoding.
Args:
item_id (str): ID of the item
image_type (str): Type of image ('Primary', 'Backdrop', etc.)
image_data (bytes): Raw image data
image_ext (str): Image extension to determine content type
Returns:
bool: True if successful, False otherwise
"""
try:
# Determine content type based on extension
content_type = {
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'webp': 'image/webp'
}.get(image_ext.lower(), 'image/jpeg')
# Base64 encode the image data
encoded_data = b64encode(image_data)
# Set up the URL and headers
url = f"/Items/{item_id}/Images/{image_type}/0"
headers = {
'X-Emby-Token': self.api_key,
'Content-Type': content_type
}
# Make the request
self._post(url, headers=headers, binary_data=encoded_data)
logger.info(f"Successfully uploaded {image_type} image for item {item_id}")
return True
except Exception as e:
logger.error(f"Failed to upload image for item {item_id}: {e}")
return False
import time
import logging
import requests
import config
from plexapi.server import PlexServer
from jellyfin import JellyfinClient
from collections import defaultdict
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize Plex
logger.info(f"Connecting to Plex at {config.PLEX_URL}...")
plex = PlexServer(config.PLEX_URL, config.PLEX_TOKEN)
logger.info("Successfully connected to Plex")
# Initialize Jellyfin
logger.info(f"Connecting to Jellyfin at {config.JELLYFIN_URL}...")
jellyfin = JellyfinClient(config.JELLYFIN_URL, config.JELLYFIN_API_KEY)
logger.info("Successfully connected to Jellyfin")
def get_plex_collections():
"""
Gather all Plex collections and their media with IMDB IDs.
Returns:
dict: Mapping of collection names to sets of IMDB IDs
dict: Mapping of collection names to Plex collection objects (for images)
"""
collections = defaultdict(set)
collection_objects = {}
media_to_imdb = {}
# Get all media items and their IMDB IDs
logger.info(f"Scanning Plex libraries {config.PLEX_LIBRARIES} for media and their IMDB IDs...")
for library in plex.library.sections():
if library.title in config.PLEX_LIBRARIES:
logger.info(f"Processing library: {library.title} (type: {library.type})")
for media in library.all():
imdb_id = None
for guid in media.guids:
if guid.id.startswith('imdb'):
imdb_id = guid.id.replace('imdb://', '')
media_to_imdb[media] = imdb_id
break
if not imdb_id:
logger.warning(f"No IMDB ID found for {library.type}: {media.title}")
# Process all collections
logger.info("Processing Plex collections...")
for library in plex.library.sections():
if library.title in config.PLEX_LIBRARIES:
logger.info(f"Processing collections in library: {library.title}")
for collection in library.collections():
collection_objects[collection.title] = collection
for media in collection.items():
if media in media_to_imdb: # Only include media with IMDB IDs
collections[collection.title].add(media_to_imdb[media])
# Remove empty collections
collections = {name: imdb_ids for name, imdb_ids in collections.items() if imdb_ids}
logger.info(f"Found {len(collections)} collections across all libraries")
return collections, collection_objects
def get_jellyfin_media():
"""
Pre-fetch all media in Jellyfin from the configured libraries and map them by IMDB ID.
Returns:
dict: Mapping of IMDB IDs to Jellyfin media objects
"""
jf_media = {}
# Process each library type configured in PLEX_LIBRARIES
for library_name in config.PLEX_LIBRARIES:
logger.info(f"Fetching Jellyfin {library_name} library...")
items = jellyfin.items_list()
library_items = [item for item in items['Items'] if item['Name'] == library_name]
if not library_items:
logger.warning(f"No '{library_name}' library found in Jellyfin")
continue
library_id = library_items[0]['Id']
# Loop through the items in the Jellyfin library
logger.info(f"Mapping Jellyfin {library_name} by IMDB ID...")
item_count = 0
items_with_imdb = 0
# Use recursive parameter to get all items at once with all fields
library_contents = jellyfin.items_list(
parentId=library_id,
recursive=True,
include_fields=True
)
# Process each item, looking for IMDB IDs directly in the item data
for item in library_contents.get('Items', []):
# Skip folders, collections, and other non-media items
item_type = item.get('Type', '')
if item_type in ['Folder', 'CollectionFolder', 'UserView', 'BoxSet']:
continue
item_count += 1
if item_count % 100 == 0:
logger.info(f"Processed {item_count} items in {library_name}...")
# Extract IMDB ID directly from the item data
provider_ids = item.get('ProviderIds', {})
for provider, provider_id in provider_ids.items():
if provider.lower() == 'imdb':
jf_media[provider_id] = item
items_with_imdb += 1
break
logger.info(f"Found {item_count} items in {library_name}, of which {items_with_imdb} have IMDB IDs")
logger.info(f"Total: Found {len(jf_media)} media items with IMDB IDs across all Jellyfin libraries")
return jf_media
def clean_jellyfin_collections():
"""
Remove all existing collections from Jellyfin.
"""
logger.info("Removing existing Jellyfin collections...")
existing_collections = jellyfin.collections_list()
removed_count = 0
for collection in existing_collections.get('Items', []):
logger.info(f"Removing collection: {collection['Name']}")
try:
jellyfin.remove_collection(collection['Id'])
removed_count += 1
except Exception as e:
logger.error(f"Error removing collection {collection['Name']}: {e}")
logger.info(f"Removed {removed_count} collections from Jellyfin")
def get_plex_image_data(plex_obj, image_type="thumb"):
"""
Get image data from a Plex object.
Args:
plex_obj: Plex object (movie, show, collection, etc.)
image_type (str): Type of image ("thumb", "art", "poster", etc.)
Returns:
tuple: (image_data, extension) or (None, None) if image doesn't exist
"""
try:
image_url = None
# Get the appropriate image URL
if image_type == "thumb" or image_type == "poster":
if hasattr(plex_obj, 'thumb') and plex_obj.thumb:
image_url = plex_obj.thumb
elif image_type == "art":
if hasattr(plex_obj, 'art') and plex_obj.art:
image_url = plex_obj.art
elif image_type == "banner":
if hasattr(plex_obj, 'banner') and plex_obj.banner:
image_url = plex_obj.banner
if not image_url:
return None, None
# Construct full URL with token
full_url = f"{config.PLEX_URL}{image_url}?X-Plex-Token={config.PLEX_TOKEN}"
# Get the image data
response = requests.get(full_url)
response.raise_for_status()
# Determine image extension from content type
content_type = response.headers.get('Content-Type', '')
extension = {
'image/jpeg': 'jpg',
'image/png': 'png',
'image/webp': 'webp'
}.get(content_type, 'jpg')
return response.content, extension
except Exception as e:
logger.error(f"Error getting image for {plex_obj.title if hasattr(plex_obj, 'title') else 'unknown item'}: {e}")
return None, None
def sync_collection_images(plex_collection, jellyfin_collection_id):
"""
Sync images from a Plex collection to a Jellyfin collection.
Args:
plex_collection: Plex collection object
jellyfin_collection_id (str): Jellyfin collection ID
Returns:
bool: True if any images were synced, False otherwise
"""
images_synced = False
# Try to sync the poster image (thumb in Plex)
try:
image_data, extension = get_plex_image_data(plex_collection, "thumb")
if image_data:
logger.info(f"Uploading poster image for collection '{plex_collection.title}'...")
if jellyfin.upload_image(jellyfin_collection_id, "Primary", image_data, extension):
images_synced = True
logger.info(f"Successfully uploaded poster image for collection '{plex_collection.title}'")
except Exception as e:
logger.error(f"Error syncing image for collection '{plex_collection.title}': {e}")
return images_synced
def build_plex_media_map():
"""
Build a mapping of IMDB IDs to Plex media objects.
Returns:
dict: Mapping of IMDB IDs to Plex media objects
"""
plex_media_map = {}
for library in plex.library.sections():
if library.title in config.PLEX_LIBRARIES:
for media in library.all():
imdb_id = None
for guid in media.guids:
if guid.id.startswith('imdb'):
imdb_id = guid.id.replace('imdb://', '')
plex_media_map[imdb_id] = media
break
return plex_media_map
def sync_media_images(plex_media_map, jellyfin_media_map):
"""
Sync images from Plex media items to Jellyfin media items using base64 encoding.
Args:
plex_media_map (dict): Mapping of IMDB IDs to Plex media objects
jellyfin_media_map (dict): Mapping of IMDB IDs to Jellyfin media objects
Returns:
int: Number of items with synced images
"""
synced_count = 0
common_imdb_ids = set(plex_media_map.keys()) & set(jellyfin_media_map.keys())
total_items = len(common_imdb_ids)
logger.info(f"Syncing images for {total_items} media items...")
# Process in batches to show progress
processed = 0
for imdb_id in common_imdb_ids:
processed += 1
plex_obj = plex_media_map[imdb_id]
jellyfin_id = jellyfin_media_map[imdb_id]['Id']
# Just try to sync the primary/poster image
try:
image_data, extension = get_plex_image_data(plex_obj, "thumb")
if image_data and jellyfin.upload_image(jellyfin_id, "Primary", image_data, extension):
synced_count += 1
except Exception as e:
logger.error(f"Error syncing image for {plex_obj.title}: {e}")
# Show progress periodically
if processed % 20 == 0 or processed == total_items:
logger.info(f"Processed {processed}/{total_items} items, synced images for {synced_count} items")
return synced_count
def sync_collections():
"""
Main function to synchronize collections from Plex to Jellyfin.
"""
logger.info("\n=== Starting Plex to Jellyfin Collection Synchronization ===\n")
start_time = time.time()
# Step 1: Get Plex collections data
logger.info("\n--- Gathering Plex Data ---")
plex_collections, plex_collection_objects = get_plex_collections()
# Step 2: Get Jellyfin media data
logger.info("\n--- Gathering Jellyfin Data ---")
jellyfin_media = get_jellyfin_media()
# Step 3: Clean existing Jellyfin collections
logger.info("\n--- Cleaning Jellyfin Collections ---")
clean_jellyfin_collections()
# Step 4: Create new collections in Jellyfin
logger.info("\n--- Creating Jellyfin Collections ---")
collections_created = 0
collections_failed = 0
collections_with_images = 0
for collection_name, imdb_ids in plex_collections.items():
# Convert IMDB IDs to Jellyfin media IDs
jellyfin_item_ids = []
for imdb_id in imdb_ids:
if imdb_id in jellyfin_media:
jellyfin_item_ids.append(jellyfin_media[imdb_id]['Id'])
if not jellyfin_item_ids:
logger.info(f"Skipping collection '{collection_name}' - no matching items in Jellyfin")
continue
logger.info(f"Creating collection '{collection_name}' with {len(jellyfin_item_ids)} items")
try:
result = jellyfin.create_collection(collection_name, jellyfin_item_ids)
collections_created += 1
# Sync collection images if enabled
if 'Id' in result:
plex_collection = plex_collection_objects[collection_name]
if sync_collection_images(plex_collection, result['Id']):
collections_with_images += 1
except Exception as e:
logger.error(f"Error creating collection '{collection_name}': {e}")
collections_failed += 1
# Step 5: Sync media images
media_with_images = 0
logger.info("\n--- Syncing Media Images ---")
plex_media_map = build_plex_media_map()
media_with_images = sync_media_images(plex_media_map, jellyfin_media)
# Summary
elapsed_time = time.time() - start_time
logger.info(f"\n=== Synchronization Complete ===")
logger.info(f"Time elapsed: {elapsed_time:.2f} seconds")
logger.info(f"Collections created: {collections_created}")
logger.info(f"Collections failed: {collections_failed}")
logger.info(f"Collections with images: {collections_with_images}")
logger.info(f"Media items with images: {media_with_images}")
if __name__ == "__main__":
sync_collections()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment