Last active
July 13, 2025 16:35
-
-
Save catnipsy/70343b208b7b918129cc8155d1b6be9f to your computer and use it in GitHub Desktop.
rclone parallel transfer with TUI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import curses | |
import subprocess | |
import os | |
from pathlib import Path | |
import select | |
def get_latest_entries(directory, limit=10): | |
entries = list(Path(directory).iterdir()) | |
entries = [e for e in entries if e.is_file() or e.is_dir()] | |
entries.sort(key=lambda x: x.stat().st_mtime, reverse=True) | |
return entries[:limit] | |
def tui_select(stdscr, options): | |
curses.curs_set(0) | |
selected = [False] * len(options) | |
current = 0 | |
while True: | |
stdscr.clear() | |
stdscr.addstr(0, 0, "Select files/dirs with SPACE. Press ENTER to rclone copy.", curses.A_BOLD) | |
for idx, item in enumerate(options): | |
prefix = "[x]" if selected[idx] else "[ ]" | |
if idx == current: | |
stdscr.attron(curses.A_REVERSE) | |
stdscr.addstr(idx + 2, 2, f"{prefix} {item.name}") | |
if idx == current: | |
stdscr.attroff(curses.A_REVERSE) | |
key = stdscr.getch() | |
if key in [curses.KEY_UP, ord('k')]: | |
current = (current - 1) % len(options) | |
elif key in [curses.KEY_DOWN, ord('j')]: | |
current = (current + 1) % len(options) | |
elif key == ord(' '): | |
selected[current] = not selected[current] | |
elif key in [10, 13]: # Enter | |
break | |
return [options[i] for i, sel in enumerate(selected) if sel] | |
def run_rclone_tui(stdscr, selected, dest_remote="jf-onedrive:movies"): | |
curses.curs_set(0) | |
stdscr.nodelay(True) | |
h, w = stdscr.getmaxyx() | |
n = len(selected) | |
pane_height = max(3, h // n) | |
processes = [] | |
fds = [] | |
last_lines = ["Waiting..." for _ in selected] | |
for idx, item in enumerate(selected): | |
dest_path = f"{dest_remote}/{item.name}" | |
cmd = [ | |
"rclone", "copy", "-P", | |
str(item), | |
dest_path, | |
"--ignore-existing", | |
"--transfers=8", | |
"--checkers=8", | |
"--multi-thread-streams=8", | |
"--multi-thread-cutoff=5M", | |
"--drive-chunk-size=64M", | |
"--fast-list", | |
"--stats=1s", | |
"--stats-one-line" | |
] | |
proc = subprocess.Popen( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
bufsize=1, | |
universal_newlines=True | |
) | |
fd = proc.stdout.fileno() | |
os.set_blocking(fd, False) | |
processes.append((item.name, proc)) | |
fds.append(fd) | |
while True: | |
done = all(p.poll() is not None for _, p in processes) | |
if done: | |
break | |
rlist, _, _ = select.select(fds, [], [], 0.1) | |
for idx, (_, proc) in enumerate(processes): | |
if proc.stdout.fileno() in rlist: | |
try: | |
chunk = proc.stdout.read() | |
if chunk: | |
lines = chunk.split("\r") | |
for line in lines: | |
line = line.strip() | |
if line: | |
last_lines[idx] = line | |
except Exception: | |
pass | |
stdscr.erase() | |
for idx, (name, _) in enumerate(processes): | |
y0 = idx * pane_height | |
title = f" {name[:30]} ".ljust(w - 2, "-") | |
stdscr.addstr(y0, 0, "+" + title + "+") | |
try: | |
stdscr.addstr(y0 + 1, 1, last_lines[idx][:w - 2]) | |
except curses.error: | |
pass | |
stdscr.refresh() | |
stdscr.addstr(h - 1, 0, "All transfers finished. Press any key to exit.") | |
stdscr.nodelay(False) | |
stdscr.getch() | |
def main(): | |
source_path = Path("/hdd/media/movies") | |
if not source_path.is_dir(): | |
print(f"{source_path} is not a valid directory.") | |
return | |
entries = get_latest_entries(source_path, limit=20) | |
if not entries: | |
print("No entries found.") | |
return | |
selected = curses.wrapper(tui_select, entries) | |
if not selected: | |
print("No selection made.") | |
return | |
curses.wrapper(run_rclone_tui, selected) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment