Skip to content

Instantly share code, notes, and snippets.

@catnipsy
Last active July 13, 2025 16:35
Show Gist options
  • Save catnipsy/70343b208b7b918129cc8155d1b6be9f to your computer and use it in GitHub Desktop.
Save catnipsy/70343b208b7b918129cc8155d1b6be9f to your computer and use it in GitHub Desktop.
rclone parallel transfer with TUI
#!/usr/bin/env python3
import curses
import subprocess
import os
from pathlib import Path
import select
def get_latest_entries(directory, limit=10):
entries = list(Path(directory).iterdir())
entries = [e for e in entries if e.is_file() or e.is_dir()]
entries.sort(key=lambda x: x.stat().st_mtime, reverse=True)
return entries[:limit]
def tui_select(stdscr, options):
curses.curs_set(0)
selected = [False] * len(options)
current = 0
while True:
stdscr.clear()
stdscr.addstr(0, 0, "Select files/dirs with SPACE. Press ENTER to rclone copy.", curses.A_BOLD)
for idx, item in enumerate(options):
prefix = "[x]" if selected[idx] else "[ ]"
if idx == current:
stdscr.attron(curses.A_REVERSE)
stdscr.addstr(idx + 2, 2, f"{prefix} {item.name}")
if idx == current:
stdscr.attroff(curses.A_REVERSE)
key = stdscr.getch()
if key in [curses.KEY_UP, ord('k')]:
current = (current - 1) % len(options)
elif key in [curses.KEY_DOWN, ord('j')]:
current = (current + 1) % len(options)
elif key == ord(' '):
selected[current] = not selected[current]
elif key in [10, 13]: # Enter
break
return [options[i] for i, sel in enumerate(selected) if sel]
def run_rclone_tui(stdscr, selected, dest_remote="jf-onedrive:movies"):
curses.curs_set(0)
stdscr.nodelay(True)
h, w = stdscr.getmaxyx()
n = len(selected)
pane_height = max(3, h // n)
processes = []
fds = []
last_lines = ["Waiting..." for _ in selected]
for idx, item in enumerate(selected):
dest_path = f"{dest_remote}/{item.name}"
cmd = [
"rclone", "copy", "-P",
str(item),
dest_path,
"--ignore-existing",
"--transfers=8",
"--checkers=8",
"--multi-thread-streams=8",
"--multi-thread-cutoff=5M",
"--drive-chunk-size=64M",
"--fast-list",
"--stats=1s",
"--stats-one-line"
]
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True
)
fd = proc.stdout.fileno()
os.set_blocking(fd, False)
processes.append((item.name, proc))
fds.append(fd)
while True:
done = all(p.poll() is not None for _, p in processes)
if done:
break
rlist, _, _ = select.select(fds, [], [], 0.1)
for idx, (_, proc) in enumerate(processes):
if proc.stdout.fileno() in rlist:
try:
chunk = proc.stdout.read()
if chunk:
lines = chunk.split("\r")
for line in lines:
line = line.strip()
if line:
last_lines[idx] = line
except Exception:
pass
stdscr.erase()
for idx, (name, _) in enumerate(processes):
y0 = idx * pane_height
title = f" {name[:30]} ".ljust(w - 2, "-")
stdscr.addstr(y0, 0, "+" + title + "+")
try:
stdscr.addstr(y0 + 1, 1, last_lines[idx][:w - 2])
except curses.error:
pass
stdscr.refresh()
stdscr.addstr(h - 1, 0, "All transfers finished. Press any key to exit.")
stdscr.nodelay(False)
stdscr.getch()
def main():
source_path = Path("/hdd/media/movies")
if not source_path.is_dir():
print(f"{source_path} is not a valid directory.")
return
entries = get_latest_entries(source_path, limit=20)
if not entries:
print("No entries found.")
return
selected = curses.wrapper(tui_select, entries)
if not selected:
print("No selection made.")
return
curses.wrapper(run_rclone_tui, selected)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment