Last active
June 22, 2023 17:49
-
-
Save d8ahazard/27caeb96449eb74db902d3ef9f4c0d2f to your computer and use it in GitHub Desktop.
A utility for cleaning Plex collections and identifying orphaned media.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pathlib | |
import sqlite3 | |
target_path = "" | |
movie_list = [] | |
tv_list = [] | |
collection_list = [] | |
def find_db(): | |
global target_path | |
paths = [ | |
"E:/PMSData/Plex Media Server/", | |
"~/Library/Application Support/Plex Media Server/", | |
"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/", | |
"/volume1/Plex/Library", | |
"/Volume1/Plex/Library/Application Support/Plex Media Server/", | |
"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/", | |
"/usr/local/plexdata/Plex Media Server/", | |
"/Internal/Android/data/com.plexapp.mediaserver.smb/Plex Media Server/", | |
"/home/plex/Library/Application Support/Plex Media Server", | |
"/raid/data/module/Plex/sys/Plex Media Server/", | |
"/mnt/HD/HD_a2/plex_conf/Plex Media Server/", | |
"/shares/Storage/.wdcache/.plexmediaserver/Application Support/Plex Media Server/" | |
] | |
if os.getenv("LOCALAPPDATA") is not None: | |
paths.append(os.getenv("LOCALAPPDATA") + "/Plex Media Server/") | |
if os.getenv("PLEX_HOME") is not None: | |
paths.append(os.getenv("PLEX_HOME") + "/Library/Application Support/Plex Media Server/") | |
if os.getenv("JAIL_ROOT") is not None: | |
paths.append(os.getenv("JAIL_ROOT") + "/var/db/plexdata/Plex Media Server/") | |
for db_path in paths: | |
pprint("") | |
pprint("Checking path {}".format(db_path)) | |
if os.path.exists(db_path): | |
target_path = db_path | |
pprint("We have the path! " + db_path) | |
pprint("") | |
break | |
else: | |
pprint("No path found.") | |
pprint("") | |
def find_orphans(): | |
find_db() | |
global target_path | |
global movie_list | |
global tv_list | |
file_extensions = [".avi", ".mkv", ".mp4", ".m2ts", ".ts", ".mov", ".wmv", ".3gpp", ".flv", ".mpeg", ".mpg"] | |
extra_media_tags = ["-behindthescenes", "-deleted", "-featurette", "-interview", "-scene", "-short", "-trailer", | |
"-other"] | |
if target_path != "": | |
movie_list = [] | |
tv_list = [] | |
media_files = [] | |
section_ids = [] | |
sections = [] | |
movie_orphans = [] | |
# tv_orphans = [] | |
empty_dirs = [] | |
db_path = target_path + "Plug-in Support/Databases/com.plexapp.plugins.library.db" | |
conn = sqlite3.connect(db_path) | |
c = conn.cursor() | |
c.execute("""select media_parts.file, metadata_items.title, metadata_items.metadata_type, metadata_items.id, | |
metadata_items.library_section_id from media_parts inner join media_items on | |
media_parts.media_item_id=media_items.id inner join metadata_items on | |
media_items.metadata_item_id=metadata_items.id;""") | |
for (file, title, metadata_type, meta_id, section_id) in c.fetchall(): | |
if metadata_type == 4: | |
tv_list.append({"file": file, "title": title, "id": meta_id}) | |
if metadata_type == 1: | |
movie_list.append({"file": file, "title": title, "id": meta_id}) | |
if section_id not in section_ids: | |
section_ids.append(section_id) | |
c.execute("""select sl.library_section_id, sl.root_path, ls.user_fields from section_locations as sl | |
inner join library_sections as ls on ls.id = sl.library_section_id""") | |
for (section_id, location, settings) in c.fetchall(): | |
if section_id in section_ids: | |
if "collectionMode=2" in settings: | |
settings.replace("collectionMode=2", "collectionMode=0") | |
pprint("Updating settings for section {} to disable collection recreation.".format(section_id)) | |
c.execute("UPDATE library_sections SET user_fields=? WHERE id=?", (settings, section_id)) | |
conn.commit() | |
sections.append(location) | |
conn.close() | |
if len(sections) > 0: | |
for section in sections: | |
pprint("Checking for files in {}".format(section)) | |
i = 0 | |
for media_dir in os.walk(section): | |
if i == 0: | |
children = media_dir[1] | |
for child in children: | |
has_media = False | |
full_path = os.path.join(section, child) | |
for media_child in os.walk(full_path): | |
media_items = media_child[2] | |
for media in media_items: | |
is_extra = False | |
for tag in extra_media_tags: | |
if tag in media: | |
is_extra = True | |
break | |
if is_extra is False: | |
ext = pathlib.Path(media).suffix.lower() | |
if ext in file_extensions: | |
has_media = True | |
full_path = os.path.join(full_path, media) | |
media_files.append(full_path) | |
if has_media is False: | |
empty_dirs.append(full_path) | |
i += 1 | |
for movie in movie_list: | |
if movie["file"] not in media_files: | |
movie_orphans.append(movie) | |
# for show in tv_list: | |
# if show["file"] not in media_files: | |
# tv_orphans.append(show) | |
pprint("") | |
pprint("Movies not in Plex Database:\n") | |
pprint("Path, title") | |
for plex_orphan in movie_orphans: | |
pprint("{}, {}".format(plex_orphan["file"], plex_orphan["title"])) | |
# pprint("") | |
# pprint("Episodes not in Plex Database:") | |
# pprint("Path, title") | |
# for tv_orphan in tv_orphans: | |
# pprint("{}, {}".format(tv_orphan["file"], tv_orphan["title"])) | |
pprint("") | |
pprint("Empty directories:\n") | |
pprint("Path") | |
for empty_dir in empty_dirs: | |
pprint(empty_dir) | |
pprint("") | |
input(" PRESS [RETURN] TO CONTINUE.") | |
main_menu() | |
def main_menu(): | |
clear() | |
ans = True | |
while ans: | |
ans = input(""" | |
GREETINGS PROFESSOR FALKEN, SHALL WE PLAY A GAME? | |
1. FIND MISSING MEDIA | |
2. COLLECTION CLEANUP | |
3. GLOBAL THERMONUCLEAR WAR | |
4. TERMINATE APPLICATION | |
""") | |
if ans == "1": | |
clear() | |
find_orphans() | |
elif ans == "2": | |
clear() | |
clean_collections() | |
elif ans == "3": | |
clear() | |
pprint("\nAre you out of your damned mind?") | |
input("\nPRESS [RETURN] TO CONTINUE") | |
main_menu() | |
elif ans == "4": | |
clear() | |
quit() | |
elif ans != "": | |
pprint("\nINVALID ENTRY.") | |
def clean_collections(): | |
ans = True | |
while ans: | |
ans = input("""\n What is the minimum number of items to have in a collection? (2)""") | |
if ans == "": | |
ans = "2" | |
if ans >= "2": | |
clear() | |
pprint("\nCleaning useless collections.\n") | |
clean_query(int(ans)) | |
main_menu() | |
else: | |
pprint("\nNothing to do.") | |
main_menu() | |
main_menu() | |
def clean_query(count): | |
find_db() | |
items_to_remove = [] | |
items_to_show = [] | |
if target_path != "": | |
db_path = target_path + "Plug-in Support/Databases/com.plexapp.plugins.library.db" | |
conn = sqlite3.connect(db_path) | |
c = conn.cursor() | |
c.execute(""" select title, extra_data, id from metadata_items where metadata_type=18 order by title asc""") | |
pprint("Items to remove:\n") | |
pprint("title, child_count") | |
for (title, extra, meta_id) in c.fetchall(): | |
child_count = extra.split("&")[0].split("=")[1] | |
child_item = {"title": title, "extra": extra, "id": meta_id, "count": child_count} | |
if int(child_count) < count and "collectionMode=0" not in extra: | |
items_to_remove.append(child_item) | |
elif int(child_count) >= count and "collectionMode=0" in extra: | |
items_to_show.append(child_item) | |
if len(items_to_remove): | |
pprint("Items to hide:\n") | |
pprint("title, count") | |
for remove in items_to_remove: | |
pprint("{}, {}".format(remove["title"], remove["count"])) | |
if len(items_to_show): | |
pprint("\n\nItems to show:\n") | |
pprint("title, count") | |
for show in items_to_show: | |
pprint("{}, {}".format(show["title"], show["count"])) | |
if len(items_to_remove) or len(items_to_show): | |
confirmation = input("About to show/hide the following collections. Press y to continue (y/n)") | |
if confirmation == "y": | |
try: | |
c.execute("DROP index 'index_title_sort_naturalsort'") | |
c.execute("DELETE from schema_migrations where version='20180501000000'") | |
except sqlite3.OperationalError: | |
pprint("Nothing to see here...") | |
conn.commit() | |
pprint("Updating collections to show.") | |
do_clean = True | |
try: | |
for remove in items_to_remove: | |
extra = remove["extra"] + "&pr%3AcollectionMode=0" | |
meta_id = remove["id"] | |
c.execute("UPDATE metadata_items SET extra_data=? WHERE id=?", (extra, meta_id)) | |
for show in items_to_show: | |
extra = show["extra"].replace("&pr%3AcollectionMode=0", "") | |
meta_id = show["id"] | |
c.execute("UPDATE metadata_items SET extra_data=? WHERE id=?", (extra, meta_id)) | |
except sqlite3.OperationalError as msg: | |
pprint("Error executing query, try repairing your DB.") | |
conn.commit() | |
pprint("Cleanup completed.\n") | |
pprint("Please restart Plex to rebuild the 'natural sort' table.\n") | |
else: | |
pprint("Cleanup canceled.") | |
else: | |
pprint("No collections found with less than {} items.".format(count)) | |
conn.close() | |
else: | |
pprint("DB Path not found!") | |
input(" PRESS [RETURN] TO CONTINUE.") | |
def clear(): | |
os.system('cls' if os.name == 'nt' else 'clear') | |
def pprint(str_input): | |
print(" " + str_input) | |
main_menu() |
Thanks d8ahazard, for showing your solution(s).
As addition to your script I can let you know that for the latest PMS (Plex Media Server) on Synology the path probably is: "/volume1/PlexMediaServer/AppData/Plex Media Server/".
Furthermore if you want to have more users use the script, a bit more explanation might be required. Option 1 is to find stuff on your drive that might not be in PMS; Option 2 is .. eh? Don't know ;-)
Since PMS is using a lot of diskspace (ca. 35GB from which ca. 5GB is the database), I wanted to answer if all that diskspace is really being used (to be able to remove this from the folders). But as this hasn't been one of your problems to solve, I need to search a bit more. Of cause I'm thankful about any hints.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Written in python 3.7, tested with windows Server 2019.
Designed to be as cross-platform friendly as possible, but any changes/fixes are welcome.