Skip to content

Instantly share code, notes, and snippets.

@apenney
Created February 3, 2025 20:22
Show Gist options
  • Select an option

  • Save apenney/9c21096ba707e9acfaa64fa5d4025fb4 to your computer and use it in GitHub Desktop.

Select an option

Save apenney/9c21096ba707e9acfaa64fa5d4025fb4 to your computer and use it in GitHub Desktop.
python torrent_symlink_matcher.py /path/to/file.torrent /path/to/DirectoryA /path/to/DirectoryB
How It Works
Torrent Parsing:
The script uses bencodepy to decode the torrent file. It then extracts the file entries (for multi‐file torrents, it also records the parent folder found in info["name"]).
Directory Scanning:
It recursively walks through DirectoryA (using os.walk), filtering for video files by extension. For each file it finds, it grabs its size and uses a regular expression (S\d{1,4}E\d{1,4}) to look for a season/episode code in the filename.
Matching Process:
It first attempts to auto-match each torrent entry with a found file by comparing file sizes (and, when available, season/episode codes). If there are multiple candidates (or none), it prompts you on the command line to choose the correct match.
Symlink Creation:
Finally, for every matched pair, the script creates a symlink in DirectoryB. If the torrent file originally had a parent folder, that folder is created inside DirectoryB and the symlink’s name is taken from the torrent metadata—ensuring that when you add the torrent to a client, the file names and folder structure match what the torrent expects.
Feel free to modify the matching logic or extend the user‐interface as needed. Enjoy!
#!/usr/bin/env python3
"""
torrent_symlink_matcher.py
This script accepts three command-line arguments:
1. The path to a torrent file.
2. A directory (DirectoryA) to recursively search for video files.
3. A target directory (DirectoryB) where symlinks will be created.
It:
- Parses the torrent file (using bencodepy) to extract file metadata
(filenames, sizes, etc.) and determines if a top-level parent folder exists.
- Recursively scans DirectoryA for video files (extensions: mkv, mp4, ts, avi, m4v)
and extracts season/episode codes (matching the regex S\d{1,4}E\d{1,4}).
- Attempts to match the torrent file entries with found video files by comparing file
sizes and season/episode codes.
- If multiple or no candidates exist for a given torrent file, it asks the user (via
a simple CLI) to manually choose a match (or skip it).
- For every match, it creates a symlink in DirectoryB (creating any parent folder if needed)
so that the symlink’s filename corresponds to the original torrent file entry.
Dependencies:
- bencodepy (install with: pip install bencodepy)
"""
import os
import sys
import argparse
import re
import bencodepy
# Allowed video file extensions (lowercase)
VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.ts', '.avi', '.m4v'}
def extract_season_episode(text):
"""
Search for a season/episode code in the given text.
Returns the matched string (uppercased) or None.
"""
match = re.search(r'(S\d{1,4}E\d{1,4})', text, re.IGNORECASE)
if match:
return match.group(1).upper()
return None
def parse_torrent(torrent_path):
"""
Opens and decodes the torrent file.
Returns:
- A list of dictionaries, one per file in the torrent, each with:
'relative_path': the file path (as in the torrent)
'size': file size (in bytes)
'season_episode': season/episode code (if any)
- torrent_parent_folder: if the torrent is multi-file (i.e. has a top-level folder)
this is the decoded name; otherwise None.
"""
with open(torrent_path, 'rb') as f:
torrent_content = f.read()
torrent_dict = bencodepy.decode(torrent_content)
info = torrent_dict[b'info']
torrent_files = []
torrent_parent_folder = None
# Check for multi-file torrent (presence of the b'files' key)
if b'files' in info:
# In a multi-file torrent, info[b'name'] holds the suggested parent folder.
torrent_parent_folder = info[b'name'].decode('utf-8', errors='replace')
for file_entry in info[b'files']:
# Each file_entry has a b'path' (a list of path components) and b'length'
path_components = [comp.decode('utf-8', errors='replace') for comp in file_entry[b'path']]
relative_path = os.path.join(*path_components)
size = file_entry[b'length']
season_ep = extract_season_episode(relative_path)
torrent_files.append({
'relative_path': relative_path,
'size': size,
'season_episode': season_ep
})
else:
# Single file torrent – info[b'name'] is the file name.
name = info[b'name'].decode('utf-8', errors='replace')
size = info[b'length']
season_ep = extract_season_episode(name)
torrent_files.append({
'relative_path': name,
'size': size,
'season_episode': season_ep
})
return torrent_files, torrent_parent_folder
def scan_directory(directory):
"""
Recursively scans the given directory for video files.
For each found file, it records its full path, size, base filename,
and (if present) a season/episode code (by matching the regex).
Returns a list of dictionaries.
"""
found_files = []
for root, dirs, files in os.walk(directory):
for file in files:
ext = os.path.splitext(file)[1].lower()
if ext in VIDEO_EXTENSIONS:
file_path = os.path.join(root, file)
try:
size = os.path.getsize(file_path)
except Exception as e:
print(f"Error getting size for {file_path}: {e}")
continue
season_ep = extract_season_episode(file)
found_files.append({
'path': file_path,
'size': size,
'season_episode': season_ep,
'filename': file
})
return found_files
def find_candidates(torrent_file, found_files, matched_found):
"""
Given a torrent file entry and the list of found files (with indices not yet matched),
returns a list of candidate matches (as tuples of (found_file_index, found_file dict))
that have the same size and, if available, the same season/episode code.
"""
candidates = []
for idx, found in enumerate(found_files):
if idx in matched_found:
continue # Skip files already matched
if found['size'] != torrent_file['size']:
continue # Size must match exactly
# If a season/episode code is present in the torrent file, require a match.
if torrent_file['season_episode']:
if found['season_episode'] and torrent_file['season_episode'] == found['season_episode']:
candidates.append((idx, found))
else:
candidates.append((idx, found))
return candidates
def auto_match(torrent_files, found_files):
"""
Iterates over the torrent file entries and automatically matches them with found files,
if there is exactly one candidate (based on size and season/episode).
For ambiguous cases (more than one candidate) or no match, the user will be prompted later.
Returns a tuple:
- matches: a dict mapping torrent file index -> found file index.
- matched_found: a set of found file indices that have been matched.
"""
matches = {} # key: torrent_files index, value: found_files index
matched_found = set()
for t_idx, t_file in enumerate(torrent_files):
candidates = find_candidates(t_file, found_files, matched_found)
if len(candidates) == 1:
candidate_idx, candidate_found = candidates[0]
matches[t_idx] = candidate_idx
matched_found.add(candidate_idx)
print(f"Auto-matched torrent file '{t_file['relative_path']}' with found file '{candidate_found['filename']}'")
elif len(candidates) > 1:
print(f"\nMultiple candidates found for torrent file '{t_file['relative_path']}':")
for cand_idx, cand in candidates:
print(f" [{cand_idx}] {cand['filename']} (Size: {cand['size']}, Season/Episode: {cand['season_episode']})")
choice = input(f"Enter the index of the correct match for '{t_file['relative_path']}' (or press Enter to skip): ").strip()
if choice.isdigit():
chosen_idx = int(choice)
if any(chosen_idx == c[0] for c in candidates):
matches[t_idx] = chosen_idx
matched_found.add(chosen_idx)
else:
print(f"No automatic match found for torrent file '{t_file['relative_path']}'")
return matches, matched_found
def manual_match(torrent_files, found_files, matches, matched_found):
"""
For each torrent file that wasn’t matched automatically, this function
displays the torrent entry along with a list of candidate found files (filtered by size)
and asks the user to manually choose a match (or skip it).
"""
for t_idx, t_file in enumerate(torrent_files):
if t_idx in matches:
continue # Skip already matched entries
print(f"\nManual matching for torrent file: [{t_idx}] {t_file['relative_path']} "
f"(Size: {t_file['size']}, Season/Episode: {t_file['season_episode']})")
# Find candidate found files (here we filter by matching size)
candidates = []
for f_idx, f_file in enumerate(found_files):
if f_idx in matched_found:
continue
if f_file['size'] == t_file['size']:
candidates.append((f_idx, f_file))
if not candidates:
print(" No unmatched found files with matching size.")
continue
print("Candidates:")
for cand_idx, cand in candidates:
print(f" [{cand_idx}] {cand['filename']} (Size: {cand['size']}, Season/Episode: {cand['season_episode']})")
choice = input("Enter the index of the correct match, or 'n' if none: ").strip()
if choice.lower() == 'n':
print(" Marking as no match.")
elif choice.isdigit():
chosen_idx = int(choice)
if any(chosen_idx == c[0] for c in candidates):
matches[t_idx] = chosen_idx
matched_found.add(chosen_idx)
else:
print(" Invalid choice, skipping.")
else:
print(" Invalid input, skipping.")
return matches
def create_symlinks(matches, torrent_files, found_files, directoryB, torrent_parent_folder):
"""
For each matched pair, creates a symlink in DirectoryB.
The symlink’s filename is based on the torrent file entry (and includes the parent folder
if one was defined in the torrent).
"""
for t_idx, f_idx in matches.items():
t_file = torrent_files[t_idx]
found_file = found_files[f_idx]
# If the torrent had a parent folder, include it in the target path.
if torrent_parent_folder:
# os.path.dirname returns '' if there is no subdirectory in the torrent entry.
symlink_dir = os.path.join(directoryB, torrent_parent_folder, os.path.dirname(t_file['relative_path']))
else:
symlink_dir = os.path.join(directoryB, os.path.dirname(t_file['relative_path']))
os.makedirs(symlink_dir, exist_ok=True)
symlink_path = os.path.join(symlink_dir, os.path.basename(t_file['relative_path']))
try:
# Remove an existing symlink (or file) if present.
if os.path.lexists(symlink_path):
os.remove(symlink_path)
os.symlink(found_file['path'], symlink_path)
print(f"Created symlink: {symlink_path} -> {found_file['path']}")
except Exception as e:
print(f"Error creating symlink for '{t_file['relative_path']}': {e}")
def main():
parser = argparse.ArgumentParser(
description="Match torrent file entries to renamed video files and create symlinks for seeding."
)
parser.add_argument("torrent_file", help="Path to the torrent file")
parser.add_argument("directoryA", help="Directory to search for video files")
parser.add_argument("directoryB", help="Directory where symlinks will be created")
args = parser.parse_args()
torrent_file = args.torrent_file
directoryA = args.directoryA
directoryB = args.directoryB
# --- Parse the torrent file ---
try:
torrent_files, torrent_parent_folder = parse_torrent(torrent_file)
except Exception as e:
print(f"Error parsing torrent file: {e}")
sys.exit(1)
print(f"Parsed {len(torrent_files)} file entries from torrent.")
if torrent_parent_folder:
print(f"Torrent has parent folder: {torrent_parent_folder}")
# --- Scan DirectoryA for video files ---
found_files = scan_directory(directoryA)
print(f"Found {len(found_files)} video file(s) in '{directoryA}'.")
if not found_files:
print("No video files found. Exiting.")
sys.exit(1)
# --- Auto-match torrent entries to found files ---
matches, matched_found = auto_match(torrent_files, found_files)
# --- Manual matching for entries that weren't automatically paired ---
matches = manual_match(torrent_files, found_files, matches, matched_found)
# --- Report any torrent entries that remain unmatched ---
unmatched = [t_idx for t_idx in range(len(torrent_files)) if t_idx not in matches]
if unmatched:
print("\nThe following torrent files did not get a match and will be skipped:")
for t_idx in unmatched:
print(f" [{t_idx}] {torrent_files[t_idx]['relative_path']}")
else:
print("\nAll torrent files have been matched.")
# --- Create symlinks in DirectoryB ---
create_symlinks(matches, torrent_files, found_files, directoryB, torrent_parent_folder)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment