Skip to content

Instantly share code, notes, and snippets.

@DuaneR5280
Forked from d8ahazard/plex_cleanup.py
Created January 16, 2020 22:40
Show Gist options
  • Save DuaneR5280/4ccd7b527aa011ea7fa5545212925abc to your computer and use it in GitHub Desktop.
Save DuaneR5280/4ccd7b527aa011ea7fa5545212925abc to your computer and use it in GitHub Desktop.
A utility for cleaning Plex collections and identifying orphaned media.
import os
import pathlib
import sqlite3
target_path = ""
movie_list = []
tv_list = []
collection_list = []
def find_db():
global target_path
paths = [
"E:/PMSData/Plex Media Server/",
"~/Library/Application Support/Plex Media Server/",
"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/",
"/volume1/Plex/Library",
"/Volume1/Plex/Library/Application Support/Plex Media Server/",
"/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/",
"/usr/local/plexdata/Plex Media Server/",
"/Internal/Android/data/com.plexapp.mediaserver.smb/Plex Media Server/",
"/home/plex/Library/Application Support/Plex Media Server",
"/raid/data/module/Plex/sys/Plex Media Server/",
"/mnt/HD/HD_a2/plex_conf/Plex Media Server/",
"/shares/Storage/.wdcache/.plexmediaserver/Application Support/Plex Media Server/"
]
if os.getenv("LOCALAPPDATA") is not None:
paths.append(os.getenv("LOCALAPPDATA") + "/Plex Media Server/")
if os.getenv("PLEX_HOME") is not None:
paths.append(os.getenv("PLEX_HOME") + "/Library/Application Support/Plex Media Server/")
if os.getenv("JAIL_ROOT") is not None:
paths.append(os.getenv("JAIL_ROOT") + "/var/db/plexdata/Plex Media Server/")
for db_path in paths:
pprint("")
pprint("Checking path {}".format(db_path))
if os.path.exists(db_path):
target_path = db_path
pprint("We have the path! " + db_path)
pprint("")
break
else:
pprint("No path found.")
pprint("")
def find_orphans():
find_db()
global target_path
global movie_list
global tv_list
file_extensions = [".avi", ".mkv", ".mp4", ".m2ts", ".ts", ".mov", ".wmv", ".3gpp", ".flv", ".mpeg", ".mpg"]
extra_media_tags = ["-behindthescenes", "-deleted", "-featurette", "-interview", "-scene", "-short", "-trailer",
"-other"]
if target_path != "":
movie_list = []
tv_list = []
media_files = []
section_ids = []
sections = []
movie_orphans = []
# tv_orphans = []
empty_dirs = []
db_path = target_path + "Plug-in Support/Databases/com.plexapp.plugins.library.db"
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("""select media_parts.file, metadata_items.title, metadata_items.metadata_type, metadata_items.id,
metadata_items.library_section_id from media_parts inner join media_items on
media_parts.media_item_id=media_items.id inner join metadata_items on
media_items.metadata_item_id=metadata_items.id;""")
for (file, title, metadata_type, meta_id, section_id) in c.fetchall():
if metadata_type == 4:
tv_list.append({"file": file, "title": title, "id": meta_id})
if metadata_type == 1:
movie_list.append({"file": file, "title": title, "id": meta_id})
if section_id not in section_ids:
section_ids.append(section_id)
c.execute("""select sl.library_section_id, sl.root_path, ls.user_fields from section_locations as sl
inner join library_sections as ls on ls.id = sl.library_section_id""")
for (section_id, location, settings) in c.fetchall():
if section_id in section_ids:
if "collectionMode=2" in settings:
settings.replace("collectionMode=2", "collectionMode=0")
pprint("Updating settings for section {} to disable collection recreation.".format(section_id))
c.execute("UPDATE library_sections SET user_fields=? WHERE id=?", (settings, section_id))
conn.commit()
sections.append(location)
conn.close()
if len(sections) > 0:
for section in sections:
pprint("Checking for files in {}".format(section))
i = 0
for media_dir in os.walk(section):
if i == 0:
children = media_dir[1]
for child in children:
has_media = False
full_path = os.path.join(section, child)
for media_child in os.walk(full_path):
media_items = media_child[2]
for media in media_items:
is_extra = False
for tag in extra_media_tags:
if tag in media:
is_extra = True
break
if is_extra is False:
ext = pathlib.Path(media).suffix.lower()
if ext in file_extensions:
has_media = True
full_path = os.path.join(full_path, media)
media_files.append(full_path)
if has_media is False:
empty_dirs.append(full_path)
i += 1
for movie in movie_list:
if movie["file"] not in media_files:
movie_orphans.append(movie)
# for show in tv_list:
# if show["file"] not in media_files:
# tv_orphans.append(show)
pprint("")
pprint("Movies not in Plex Database:\n")
pprint("Path, title")
for plex_orphan in movie_orphans:
pprint("{}, {}".format(plex_orphan["file"], plex_orphan["title"]))
# pprint("")
# pprint("Episodes not in Plex Database:")
# pprint("Path, title")
# for tv_orphan in tv_orphans:
# pprint("{}, {}".format(tv_orphan["file"], tv_orphan["title"]))
pprint("")
pprint("Empty directories:\n")
pprint("Path")
for empty_dir in empty_dirs:
pprint(empty_dir)
pprint("")
input(" PRESS [RETURN] TO CONTINUE.")
main_menu()
def main_menu():
clear()
ans = True
while ans:
ans = input("""
GREETINGS PROFESSOR FALKEN, SHALL WE PLAY A GAME?
1. FIND MISSING MEDIA
2. COLLECTION CLEANUP
3. GLOBAL THERMONUCLEAR WAR
4. TERMINATE APPLICATION
""")
if ans == "1":
clear()
find_orphans()
elif ans == "2":
clear()
clean_collections()
elif ans == "3":
clear()
pprint("\nAre you out of your damned mind?")
input("\nPRESS [RETURN] TO CONTINUE")
main_menu()
elif ans == "4":
clear()
quit()
elif ans != "":
pprint("\nINVALID ENTRY.")
def clean_collections():
ans = True
while ans:
ans = input("""\n What is the minimum number of items to have in a collection? (2)""")
if ans == "":
ans = "2"
if ans >= "2":
clear()
pprint("\nCleaning useless collections.\n")
clean_query(int(ans))
main_menu()
else:
pprint("\nNothing to do.")
main_menu()
main_menu()
def clean_query(count):
find_db()
items_to_remove = []
items_to_show = []
if target_path != "":
db_path = target_path + "Plug-in Support/Databases/com.plexapp.plugins.library.db"
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute(""" select title, extra_data, id from metadata_items where metadata_type=18 order by title asc""")
pprint("Items to remove:\n")
pprint("title, child_count")
for (title, extra, meta_id) in c.fetchall():
child_count = extra.split("&")[0].split("=")[1]
child_item = {"title": title, "extra": extra, "id": meta_id, "count": child_count}
if int(child_count) < count and "collectionMode=0" not in extra:
items_to_remove.append(child_item)
elif int(child_count) >= count and "collectionMode=0" in extra:
items_to_show.append(child_item)
if len(items_to_remove):
pprint("Items to hide:\n")
pprint("title, count")
for remove in items_to_remove:
pprint("{}, {}".format(remove["title"], remove["count"]))
if len(items_to_show):
pprint("\n\nItems to show:\n")
pprint("title, count")
for show in items_to_show:
pprint("{}, {}".format(show["title"], show["count"]))
if len(items_to_remove) or len(items_to_show):
confirmation = input("About to show/hide the following collections. Press y to continue (y/n)")
if confirmation == "y":
try:
c.execute("DROP index 'index_title_sort_naturalsort'")
c.execute("DELETE from schema_migrations where version='20180501000000'")
except sqlite3.OperationalError:
pprint("Nothing to see here...")
conn.commit()
pprint("Updating collections to show.")
do_clean = True
try:
for remove in items_to_remove:
extra = remove["extra"] + "&pr%3AcollectionMode=0"
meta_id = remove["id"]
c.execute("UPDATE metadata_items SET extra_data=? WHERE id=?", (extra, meta_id))
for show in items_to_show:
extra = show["extra"].replace("&pr%3AcollectionMode=0", "")
meta_id = show["id"]
c.execute("UPDATE metadata_items SET extra_data=? WHERE id=?", (extra, meta_id))
except sqlite3.OperationalError as msg:
pprint("Error executing query, try repairing your DB.")
conn.commit()
pprint("Cleanup completed.\n")
pprint("Please restart Plex to rebuild the 'natural sort' table.\n")
else:
pprint("Cleanup canceled.")
else:
pprint("No collections found with less than {} items.".format(count))
conn.close()
else:
pprint("DB Path not found!")
input(" PRESS [RETURN] TO CONTINUE.")
def clear():
os.system('cls' if os.name == 'nt' else 'clear')
def pprint(str_input):
print(" " + str_input)
main_menu()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment