Last active
February 14, 2024 23:41
-
-
Save ajitid/1bbbf24e47669c5c571571ca3eb189e6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from subprocess import check_output | |
from pprint import pprint | |
from plexapi.server import PlexServer | |
def process_movie_section(section, plex_media_paths, rclone_mount): | |
for directory in section.all(): | |
# in case of a movie, this will host a list of files | |
# which is actually just 1 file, mentioning the movie's location | |
if len(directory.locations) > 0 and directory.locations[0].startswith(rclone_mount): | |
plex_media_paths.add(os.path.dirname(directory.locations[0])) | |
def process_tv_show_section(section, plex_media_paths, rclone_mount): | |
for directory in section.all(): | |
# in case of a show, this will host a list of folders | |
for location in directory.locations: | |
if location.startswith(rclone_mount): | |
plex_media_paths.add(location) | |
def add_section_paths(section, plex_section_paths, rclone_mount): | |
for location in section.locations: | |
if location.startswith(rclone_mount): | |
plex_section_paths.add(location) | |
def main(): | |
# These 3 are the only actual args this script needs | |
baseurl = "http://plex:32400" | |
token = "X-Plex-Token" | |
rclone_mount = 'Z:' # ← windows, probably would be `/mnt/zurg` on linux and macos | |
# Sync rclone_mount with the remote (remote = zurg's WebDAV) first | |
check_output("rclone rc vfs/refresh recursive=true --fast-list", shell=True) # .decode() | |
plex = PlexServer(baseurl, token) | |
plex_media_paths = set() # will contain folders of all tv shows and movies | |
plex_section_paths = set() | |
for section in plex.library.sections(): | |
# A specific section can be retrieved using: section = plex.library.section('TV Shows') | |
if section.type == "show": | |
process_tv_show_section(section, plex_media_paths, rclone_mount) | |
add_section_paths(section, plex_section_paths, rclone_mount) | |
elif section.type == "movie": | |
process_movie_section(section, plex_media_paths, rclone_mount) | |
add_section_paths(section, plex_section_paths, rclone_mount) | |
rclone_media_paths = set() | |
for path in plex_section_paths: | |
for f in os.scandir(path): | |
if f.is_dir(): | |
rclone_media_paths.add(f.path) | |
missing_in_plex = rclone_media_paths.difference(plex_media_paths) | |
dead_links_in_plex = plex_media_paths.difference(rclone_media_paths) | |
directories_to_update = missing_in_plex.union(dead_links_in_plex) | |
for section in plex.library.sections(): | |
if section.type not in ["show", "movie"]: | |
continue | |
for directory in directories_to_update: | |
# It's fine if we give a movie path to TV Shows section, it'll just be ignored | |
try: | |
section.update(path=directory) | |
except Exception as e: | |
print(f"Error refreshing section {section.title}: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment