Created
February 3, 2025 20:22
-
-
Save apenney/9c21096ba707e9acfaa64fa5d4025fb4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| python torrent_symlink_matcher.py /path/to/file.torrent /path/to/DirectoryA /path/to/DirectoryB | |
| How It Works | |
| Torrent Parsing: | |
| The script uses bencodepy to decode the torrent file. It then extracts the file entries (for multi‐file torrents, it also records the parent folder found in info["name"]). | |
| Directory Scanning: | |
| It recursively walks through DirectoryA (using os.walk), filtering for video files by extension. For each file it finds, it grabs its size and uses a regular expression (S\d{1,4}E\d{1,4}) to look for a season/episode code in the filename. | |
| Matching Process: | |
| It first attempts to auto-match each torrent entry with a found file by comparing file sizes (and, when available, season/episode codes). If there are multiple candidates (or none), it prompts you on the command line to choose the correct match. | |
| Symlink Creation: | |
| Finally, for every matched pair, the script creates a symlink in DirectoryB. If the torrent file originally had a parent folder, that folder is created inside DirectoryB and the symlink’s name is taken from the torrent metadata—ensuring that when you add the torrent to a client, the file names and folder structure match what the torrent expects. | |
| Feel free to modify the matching logic or extend the user‐interface as needed. Enjoy! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| torrent_symlink_matcher.py | |
| This script accepts three command-line arguments: | |
| 1. The path to a torrent file. | |
| 2. A directory (DirectoryA) to recursively search for video files. | |
| 3. A target directory (DirectoryB) where symlinks will be created. | |
| It: | |
| - Parses the torrent file (using bencodepy) to extract file metadata | |
| (filenames, sizes, etc.) and determines if a top-level parent folder exists. | |
| - Recursively scans DirectoryA for video files (extensions: mkv, mp4, ts, avi, m4v) | |
| and extracts season/episode codes (matching the regex S\d{1,4}E\d{1,4}). | |
| - Attempts to match the torrent file entries with found video files by comparing file | |
| sizes and season/episode codes. | |
| - If multiple or no candidates exist for a given torrent file, it asks the user (via | |
| a simple CLI) to manually choose a match (or skip it). | |
| - For every match, it creates a symlink in DirectoryB (creating any parent folder if needed) | |
| so that the symlink’s filename corresponds to the original torrent file entry. | |
| Dependencies: | |
| - bencodepy (install with: pip install bencodepy) | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import re | |
| import bencodepy | |
| # Allowed video file extensions (lowercase) | |
| VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.ts', '.avi', '.m4v'} | |
| def extract_season_episode(text): | |
| """ | |
| Search for a season/episode code in the given text. | |
| Returns the matched string (uppercased) or None. | |
| """ | |
| match = re.search(r'(S\d{1,4}E\d{1,4})', text, re.IGNORECASE) | |
| if match: | |
| return match.group(1).upper() | |
| return None | |
| def parse_torrent(torrent_path): | |
| """ | |
| Opens and decodes the torrent file. | |
| Returns: | |
| - A list of dictionaries, one per file in the torrent, each with: | |
| 'relative_path': the file path (as in the torrent) | |
| 'size': file size (in bytes) | |
| 'season_episode': season/episode code (if any) | |
| - torrent_parent_folder: if the torrent is multi-file (i.e. has a top-level folder) | |
| this is the decoded name; otherwise None. | |
| """ | |
| with open(torrent_path, 'rb') as f: | |
| torrent_content = f.read() | |
| torrent_dict = bencodepy.decode(torrent_content) | |
| info = torrent_dict[b'info'] | |
| torrent_files = [] | |
| torrent_parent_folder = None | |
| # Check for multi-file torrent (presence of the b'files' key) | |
| if b'files' in info: | |
| # In a multi-file torrent, info[b'name'] holds the suggested parent folder. | |
| torrent_parent_folder = info[b'name'].decode('utf-8', errors='replace') | |
| for file_entry in info[b'files']: | |
| # Each file_entry has a b'path' (a list of path components) and b'length' | |
| path_components = [comp.decode('utf-8', errors='replace') for comp in file_entry[b'path']] | |
| relative_path = os.path.join(*path_components) | |
| size = file_entry[b'length'] | |
| season_ep = extract_season_episode(relative_path) | |
| torrent_files.append({ | |
| 'relative_path': relative_path, | |
| 'size': size, | |
| 'season_episode': season_ep | |
| }) | |
| else: | |
| # Single file torrent – info[b'name'] is the file name. | |
| name = info[b'name'].decode('utf-8', errors='replace') | |
| size = info[b'length'] | |
| season_ep = extract_season_episode(name) | |
| torrent_files.append({ | |
| 'relative_path': name, | |
| 'size': size, | |
| 'season_episode': season_ep | |
| }) | |
| return torrent_files, torrent_parent_folder | |
| def scan_directory(directory): | |
| """ | |
| Recursively scans the given directory for video files. | |
| For each found file, it records its full path, size, base filename, | |
| and (if present) a season/episode code (by matching the regex). | |
| Returns a list of dictionaries. | |
| """ | |
| found_files = [] | |
| for root, dirs, files in os.walk(directory): | |
| for file in files: | |
| ext = os.path.splitext(file)[1].lower() | |
| if ext in VIDEO_EXTENSIONS: | |
| file_path = os.path.join(root, file) | |
| try: | |
| size = os.path.getsize(file_path) | |
| except Exception as e: | |
| print(f"Error getting size for {file_path}: {e}") | |
| continue | |
| season_ep = extract_season_episode(file) | |
| found_files.append({ | |
| 'path': file_path, | |
| 'size': size, | |
| 'season_episode': season_ep, | |
| 'filename': file | |
| }) | |
| return found_files | |
| def find_candidates(torrent_file, found_files, matched_found): | |
| """ | |
| Given a torrent file entry and the list of found files (with indices not yet matched), | |
| returns a list of candidate matches (as tuples of (found_file_index, found_file dict)) | |
| that have the same size and, if available, the same season/episode code. | |
| """ | |
| candidates = [] | |
| for idx, found in enumerate(found_files): | |
| if idx in matched_found: | |
| continue # Skip files already matched | |
| if found['size'] != torrent_file['size']: | |
| continue # Size must match exactly | |
| # If a season/episode code is present in the torrent file, require a match. | |
| if torrent_file['season_episode']: | |
| if found['season_episode'] and torrent_file['season_episode'] == found['season_episode']: | |
| candidates.append((idx, found)) | |
| else: | |
| candidates.append((idx, found)) | |
| return candidates | |
| def auto_match(torrent_files, found_files): | |
| """ | |
| Iterates over the torrent file entries and automatically matches them with found files, | |
| if there is exactly one candidate (based on size and season/episode). | |
| For ambiguous cases (more than one candidate) or no match, the user will be prompted later. | |
| Returns a tuple: | |
| - matches: a dict mapping torrent file index -> found file index. | |
| - matched_found: a set of found file indices that have been matched. | |
| """ | |
| matches = {} # key: torrent_files index, value: found_files index | |
| matched_found = set() | |
| for t_idx, t_file in enumerate(torrent_files): | |
| candidates = find_candidates(t_file, found_files, matched_found) | |
| if len(candidates) == 1: | |
| candidate_idx, candidate_found = candidates[0] | |
| matches[t_idx] = candidate_idx | |
| matched_found.add(candidate_idx) | |
| print(f"Auto-matched torrent file '{t_file['relative_path']}' with found file '{candidate_found['filename']}'") | |
| elif len(candidates) > 1: | |
| print(f"\nMultiple candidates found for torrent file '{t_file['relative_path']}':") | |
| for cand_idx, cand in candidates: | |
| print(f" [{cand_idx}] {cand['filename']} (Size: {cand['size']}, Season/Episode: {cand['season_episode']})") | |
| choice = input(f"Enter the index of the correct match for '{t_file['relative_path']}' (or press Enter to skip): ").strip() | |
| if choice.isdigit(): | |
| chosen_idx = int(choice) | |
| if any(chosen_idx == c[0] for c in candidates): | |
| matches[t_idx] = chosen_idx | |
| matched_found.add(chosen_idx) | |
| else: | |
| print(f"No automatic match found for torrent file '{t_file['relative_path']}'") | |
| return matches, matched_found | |
| def manual_match(torrent_files, found_files, matches, matched_found): | |
| """ | |
| For each torrent file that wasn’t matched automatically, this function | |
| displays the torrent entry along with a list of candidate found files (filtered by size) | |
| and asks the user to manually choose a match (or skip it). | |
| """ | |
| for t_idx, t_file in enumerate(torrent_files): | |
| if t_idx in matches: | |
| continue # Skip already matched entries | |
| print(f"\nManual matching for torrent file: [{t_idx}] {t_file['relative_path']} " | |
| f"(Size: {t_file['size']}, Season/Episode: {t_file['season_episode']})") | |
| # Find candidate found files (here we filter by matching size) | |
| candidates = [] | |
| for f_idx, f_file in enumerate(found_files): | |
| if f_idx in matched_found: | |
| continue | |
| if f_file['size'] == t_file['size']: | |
| candidates.append((f_idx, f_file)) | |
| if not candidates: | |
| print(" No unmatched found files with matching size.") | |
| continue | |
| print("Candidates:") | |
| for cand_idx, cand in candidates: | |
| print(f" [{cand_idx}] {cand['filename']} (Size: {cand['size']}, Season/Episode: {cand['season_episode']})") | |
| choice = input("Enter the index of the correct match, or 'n' if none: ").strip() | |
| if choice.lower() == 'n': | |
| print(" Marking as no match.") | |
| elif choice.isdigit(): | |
| chosen_idx = int(choice) | |
| if any(chosen_idx == c[0] for c in candidates): | |
| matches[t_idx] = chosen_idx | |
| matched_found.add(chosen_idx) | |
| else: | |
| print(" Invalid choice, skipping.") | |
| else: | |
| print(" Invalid input, skipping.") | |
| return matches | |
| def create_symlinks(matches, torrent_files, found_files, directoryB, torrent_parent_folder): | |
| """ | |
| For each matched pair, creates a symlink in DirectoryB. | |
| The symlink’s filename is based on the torrent file entry (and includes the parent folder | |
| if one was defined in the torrent). | |
| """ | |
| for t_idx, f_idx in matches.items(): | |
| t_file = torrent_files[t_idx] | |
| found_file = found_files[f_idx] | |
| # If the torrent had a parent folder, include it in the target path. | |
| if torrent_parent_folder: | |
| # os.path.dirname returns '' if there is no subdirectory in the torrent entry. | |
| symlink_dir = os.path.join(directoryB, torrent_parent_folder, os.path.dirname(t_file['relative_path'])) | |
| else: | |
| symlink_dir = os.path.join(directoryB, os.path.dirname(t_file['relative_path'])) | |
| os.makedirs(symlink_dir, exist_ok=True) | |
| symlink_path = os.path.join(symlink_dir, os.path.basename(t_file['relative_path'])) | |
| try: | |
| # Remove an existing symlink (or file) if present. | |
| if os.path.lexists(symlink_path): | |
| os.remove(symlink_path) | |
| os.symlink(found_file['path'], symlink_path) | |
| print(f"Created symlink: {symlink_path} -> {found_file['path']}") | |
| except Exception as e: | |
| print(f"Error creating symlink for '{t_file['relative_path']}': {e}") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Match torrent file entries to renamed video files and create symlinks for seeding." | |
| ) | |
| parser.add_argument("torrent_file", help="Path to the torrent file") | |
| parser.add_argument("directoryA", help="Directory to search for video files") | |
| parser.add_argument("directoryB", help="Directory where symlinks will be created") | |
| args = parser.parse_args() | |
| torrent_file = args.torrent_file | |
| directoryA = args.directoryA | |
| directoryB = args.directoryB | |
| # --- Parse the torrent file --- | |
| try: | |
| torrent_files, torrent_parent_folder = parse_torrent(torrent_file) | |
| except Exception as e: | |
| print(f"Error parsing torrent file: {e}") | |
| sys.exit(1) | |
| print(f"Parsed {len(torrent_files)} file entries from torrent.") | |
| if torrent_parent_folder: | |
| print(f"Torrent has parent folder: {torrent_parent_folder}") | |
| # --- Scan DirectoryA for video files --- | |
| found_files = scan_directory(directoryA) | |
| print(f"Found {len(found_files)} video file(s) in '{directoryA}'.") | |
| if not found_files: | |
| print("No video files found. Exiting.") | |
| sys.exit(1) | |
| # --- Auto-match torrent entries to found files --- | |
| matches, matched_found = auto_match(torrent_files, found_files) | |
| # --- Manual matching for entries that weren't automatically paired --- | |
| matches = manual_match(torrent_files, found_files, matches, matched_found) | |
| # --- Report any torrent entries that remain unmatched --- | |
| unmatched = [t_idx for t_idx in range(len(torrent_files)) if t_idx not in matches] | |
| if unmatched: | |
| print("\nThe following torrent files did not get a match and will be skipped:") | |
| for t_idx in unmatched: | |
| print(f" [{t_idx}] {torrent_files[t_idx]['relative_path']}") | |
| else: | |
| print("\nAll torrent files have been matched.") | |
| # --- Create symlinks in DirectoryB --- | |
| create_symlinks(matches, torrent_files, found_files, directoryB, torrent_parent_folder) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment