Created
February 16, 2024 00:54
-
-
Save Xetera/b98a424a30f499b2eeb1c4a5a8524e50 to your computer and use it in GitHub Desktop.
Script for hardlinking usenet imports in sonarr into a separate folder with its original download name
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import requests | |
import os | |
import pathlib | |
SONARR_TOKEN = os.environ.get("SONARR_TOKEN") | |
# directory where hardlinks will be created for renaming purposes | |
HARDLINK_TARGET = os.environ.get("HARDLINK_TARGET") | |
# DRY_RUN=true to test without touching the filesystem | |
DRY_RUN = os.environ.get("DRY_RUN", "false").lower() == "true" | |
USENET_DIRECTORY = os.environ.get("USENET_DIRECTORY", None) | |
TORRENT_DIRECTORY = os.environ.get("TORRENT_DIRECTORY", None) | |
# folder mappings inside the cross-seed container as keys and the actual folder on the host as values | |
DOCKER_MAPPINGS = { | |
# must include trailing slash | |
"/data/": "/mnt/storage/" | |
} | |
if USENET_DIRECTORY is None: | |
USENET_DIRECTORY = "/data/usenet" | |
print("USENET_DIRECTORY is not set, using default: /data/usenet") | |
USENET_PATHS = [ | |
f"{USENET_DIRECTORY}/movies/", | |
f"{USENET_DIRECTORY}/tv/", | |
f"{USENET_DIRECTORY}/", | |
] | |
sonarr_url = ( | |
f'{os.environ.get("SONARR_URL", "http://localhost:8989").rstrip("/")}/api/v3' | |
) | |
def unmap_path(path: str) -> str: | |
for key, value in DOCKER_MAPPINGS.items(): | |
path = path.replace(key, value) | |
return path | |
def get_total_records(api_key: str) -> int: | |
response = requests.get( | |
f"{sonarr_url}/history", | |
headers={"X-Api-Key": api_key}, | |
params={ | |
"page": 0, | |
"pageSize": 1, | |
"sortKey": "date", | |
"sortDir": "desc", | |
}, | |
timeout=10, | |
).json() | |
return response["totalRecords"] | |
def strip_media_extension(path: str) -> str: | |
return path.rstrip(".mkv").rstrip(".mp4") | |
def make_episode_identifier(data) -> str: | |
return f"{data['seriesId']}-{data['episodeId']}" | |
def process_page( | |
api_key: str, | |
page: int, | |
grab_count: int, | |
series: dict[str, (pathlib.Path, pathlib.Path)] | |
) -> int: | |
response = requests.get( | |
f"{sonarr_url}/history", | |
headers={"X-Api-Key": api_key}, | |
params={ | |
"page": page, | |
"pageSize": grab_count, | |
"sortKey": "date", | |
"sortDirection": "ascending", | |
# "eventType": 3, | |
}, | |
timeout=10, | |
) | |
with open("history.json", "w", -1, "utf8") as file: | |
file.write(response.text) | |
history = response.json() | |
symlink_folder = pathlib.Path(HARDLINK_TARGET) | |
symlink_folder.mkdir(exist_ok=True) | |
for record in history["records"]: | |
if record["eventType"] == "episodeFileDeleted": | |
try: | |
del shows[make_episode_identifier(record)] | |
except: | |
pass | |
if record["eventType"] != "downloadFolderImported": | |
continue | |
data = record["data"] | |
originalPath = pathlib.Path(unmap_path(data["droppedPath"])) | |
if not any(str(originalPath).startswith(path) for path in USENET_PATHS): | |
# only usenet paths supported | |
# print(f"[Debug] Path is not a usenet directory: {originalPath.name}") | |
continue | |
import_path = unmap_path(data["importedPath"]) | |
data_path = pathlib.Path(import_path) | |
if not data_path.exists(): | |
print(f"[Warning] Data path does not exist: {data_path}") | |
continue | |
direct_path_fragment = str(originalPath) | |
for path in USENET_PATHS: | |
direct_path_fragment = direct_path_fragment.lstrip(path) | |
if direct_path_fragment.endswith(".rar"): | |
continue | |
direct_path = pathlib.Path(symlink_folder, direct_path_fragment) | |
if TORRENT_DIRECTORY is not None: | |
torrent_dirs = [ | |
pathlib.Path(TORRENT_DIRECTORY, direct_path.parent.name), | |
pathlib.Path(TORRENT_DIRECTORY, direct_path.name), | |
] | |
if any(directory.exists() for directory in torrent_dirs): | |
print( | |
f"[Warning] The imported path is also in the torrent directory: {direct_path}" | |
) | |
continue | |
if strip_media_extension(direct_path.parent.name) == strip_media_extension( | |
direct_path.name | |
): | |
print( | |
f"[Warning] Parent and child directory are the same, this is probably a single episode: {direct_path}" | |
) | |
continue | |
if not DRY_RUN and not direct_path.exists(): | |
# assuming sabnzbd always creates a parent folder for the directory | |
direct_path.parent.mkdir(exist_ok=True, parents=True) | |
series[make_episode_identifier(record)] = (direct_path, data_path) | |
print(f"[Info] Found previously imported: {direct_path.name}") | |
return count | |
def shows(api_key, grab_count=100): | |
total_records = get_total_records(api_key) | |
movies: dict[str, (pathlib.Path, pathlib.Path)] = {} | |
for page in range(0, total_records // grab_count): | |
print(f"[Debug] Processing page: {page}") | |
process_page(api_key, page, grab_count, movies) | |
print("[Info] Hardlinking series...") | |
for direct_path, data_path in movies.values(): | |
if DRY_RUN: | |
return | |
try: | |
direct_path.hardlink_to(data_path) | |
print(f"[Info] Hardlinked: {direct_path.name}") | |
except FileExistsError: | |
print(f"[Info] Hardlink already exists for: {direct_path.name}") | |
if __name__ == "__main__": | |
print("[Debug] Starting xseed-sonarr.py...") | |
shows(SONARR_TOKEN) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment