Last active
August 23, 2025 07:14
-
-
Save warmonkey/704f08f22e86f5b04ac733c848dc5f07 to your computer and use it in GitHub Desktop.
A minimal GUI for pytubefix with a/v merging support (call ffmpeg if video only)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import re | |
import os | |
import shutil | |
import subprocess | |
import tempfile | |
from dataclasses import dataclass | |
from typing import List, Optional | |
from PyQt5.QtCore import Qt, QThread, pyqtSignal | |
from PyQt5.QtWidgets import ( | |
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, | |
QLineEdit, QPushButton, QTextEdit, QTableWidget, QTableWidgetItem, | |
QMessageBox, QLabel, QFileDialog | |
) | |
# ======= SETTINGS ======= | |
DEFAULT_DOWNLOAD_DIR = None # None => ask the first time | |
FFMPEG_BIN = shutil.which("ffmpeg") or "ffmpeg" # use PATH or a fixed path string | |
# ======================== | |
try: | |
import pytubefix | |
from pytubefix import YouTube | |
except Exception as e: | |
app = QApplication(sys.argv) | |
QMessageBox.critical(None, "pytubefix not available", | |
"Could not import the 'pytubefix' library.\n\n" | |
f"Import error:\n{e}\n\n" | |
"Install it with:\n pip install pytubefix") | |
sys.exit(1) | |
STREAM_RE = re.compile(r"<Stream:\s*(.*?)>") | |
@dataclass | |
class StreamRow: | |
itag: str = "" | |
type: str = "" | |
mime_type: str = "" | |
res: str = "" | |
fps: str = "" | |
vcodec: str = "" | |
acodec: str = "" | |
abr: str = "" | |
progressive: str = "" | |
def parse_stream_repr(repr_line: str) -> Optional[StreamRow]: | |
m = STREAM_RE.search(repr_line) | |
if not m: | |
return None | |
body = m.group(1).strip() | |
tokens = re.findall(r'(\w+)=(".*?"|\S+)', body) | |
d = {} | |
for k, v in tokens: | |
if len(v) >= 2 and v[0] == '"' and v[-1] == '"': | |
v = v[1:-1] | |
d[k] = v | |
return StreamRow( | |
itag=d.get("itag", ""), | |
type=d.get("type", ""), | |
mime_type=d.get("mime_type", ""), | |
res=d.get("res", ""), | |
fps=d.get("fps", ""), | |
vcodec=d.get("vcodec", ""), | |
acodec=d.get("acodec", ""), | |
abr=d.get("abr", ""), | |
progressive=d.get("progressive", "") | |
) | |
def safe_filename(title: str) -> str: | |
# conservative sanitizer for file names across platforms | |
bad = r'<>:"/\|?*' | |
name = ''.join('_' if c in bad else c for c in title) | |
# trim whitespace and collapse runs of spaces/underscores | |
name = re.sub(r'[\s_]+', ' ', name).strip() | |
return name or "video" | |
# ---------------- Workers (QThread) ---------------- | |
class InfoWorker(QThread): | |
progress = pyqtSignal(str) | |
result_streams = pyqtSignal(list) | |
error = pyqtSignal(str) | |
def __init__(self, url: str): | |
super().__init__() | |
self.url = url | |
def run(self): | |
try: | |
self.progress.emit(f"[Info] Initializing YouTube object...\n") | |
yt = YouTube(self.url) | |
self.progress.emit(f"[Info] Title: {yt.title}\n") | |
streams = yt.streams | |
rows: List[StreamRow] = [] | |
self.progress.emit(f"[Info] Found {len(streams)} streams\n\n") | |
for s in streams: | |
line = str(s) | |
self.progress.emit(line + "\n") | |
parsed = parse_stream_repr(line) | |
if parsed: | |
rows.append(parsed) | |
self.result_streams.emit(rows) | |
except Exception as e: | |
self.error.emit(f"[Info] Error: {e}\n") | |
class DownloadWorker(QThread): | |
progress = pyqtSignal(str) | |
finished_ok = pyqtSignal(str) # final output file path | |
error = pyqtSignal(str) | |
def __init__(self, url: str, itag: str, outdir: str): | |
super().__init__() | |
self.url = url | |
self.itag = itag | |
self.outdir = outdir | |
def run(self): | |
try: | |
if not shutil.which(FFMPEG_BIN): | |
self.error.emit("[Download] ffmpeg not found on PATH. Please install ffmpeg.") | |
return | |
yt = YouTube(self.url, on_progress_callback=self._on_progress) | |
title = safe_filename(yt.title) | |
target_mp4 = os.path.join(self.outdir, f"{title}.mp4") | |
stream = yt.streams.get_by_itag(int(self.itag)) | |
if not stream: | |
self.error.emit("[Download] Selected itag not available.") | |
return | |
if getattr(stream, "is_progressive", False): | |
self.progress.emit("[Download] Progressive stream selected (video+audio). Downloading...\n") | |
out_path = stream.download(output_path=self.outdir, filename=f"{title}.mp4") | |
self.progress.emit(f"[Download] Saved to: {out_path}\n") | |
self.finished_ok.emit(out_path) | |
return | |
# Non-progressive: download selected VIDEO stream + best AUDIO-only stream | |
self.progress.emit("[Download] Adaptive stream selected. Downloading video and best audio...\n") | |
# Pick best audio-only by abr | |
audio_stream = yt.streams.filter(only_audio=True).order_by("abr").last() | |
if not audio_stream: | |
self.error.emit("[Download] Could not find an audio-only stream.") | |
return | |
tmpdir = tempfile.mkdtemp(prefix="pytubefix-") | |
video_tmp = os.path.join(tmpdir, f"{title}.video.{stream.subtype or 'mp4'}") | |
audio_tmp = os.path.join(tmpdir, f"{title}.audio.{audio_stream.subtype or 'm4a'}") | |
# Download both (keeping original extensions) | |
self.progress.emit(f"[Download] Video itag={stream.itag}, res={getattr(stream, 'resolution', '')}\n") | |
v_path = stream.download(output_path=tmpdir, filename=os.path.basename(video_tmp)) | |
self.progress.emit(f"[Download] Video saved: {v_path}\n") | |
self.progress.emit(f"[Download] Audio itag={audio_stream.itag}, abr={getattr(audio_stream, 'abr', '')}\n") | |
a_path = audio_stream.download(output_path=tmpdir, filename=os.path.basename(audio_tmp)) | |
self.progress.emit(f"[Download] Audio saved: {a_path}\n") | |
# Merge with ffmpeg (copy video, AAC for audio) | |
self.progress.emit("[ffmpeg] Merging audio + video...\n") | |
cmd = [ | |
FFMPEG_BIN, | |
"-y", | |
"-i", v_path, | |
"-i", a_path, | |
"-c:v", "copy", | |
"-c:a", "aac", | |
"-movflags", "+faststart", | |
target_mp4 | |
] | |
self._run_ffmpeg(cmd) | |
self.progress.emit(f"[ffmpeg] Merged file: {target_mp4}\n") | |
self.finished_ok.emit(target_mp4) | |
# Cleanup temp files | |
try: | |
os.remove(v_path) | |
os.remove(a_path) | |
os.rmdir(tmpdir) | |
except Exception: | |
pass | |
except Exception as e: | |
self.error.emit(f"[Download] Error: {e}\n") | |
def _on_progress(self, stream, chunk, bytes_remaining): | |
try: | |
filesize = getattr(stream, "filesize", None) | |
if not filesize: | |
return | |
done = filesize - bytes_remaining | |
pct = 100.0 * done / filesize if filesize else 0 | |
self.progress.emit(f"[Download] {pct:5.1f}% ({done}/{filesize} bytes) {stream.type}\n") | |
except Exception: | |
pass | |
def _run_ffmpeg(self, cmd: List[str]): | |
# stream stderr to the UI for progress | |
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, encoding='utf-8') | |
assert proc.stderr is not None | |
for line in proc.stderr: | |
if line.strip(): | |
self.progress.emit("[ffmpeg] " + line) | |
proc.wait() | |
if proc.returncode != 0: | |
raise RuntimeError(f"ffmpeg exited with code {proc.returncode}") | |
# ------------------- Main Window ------------------- | |
class MainWindow(QMainWindow): | |
def __init__(self, pytube_version: str): | |
super().__init__() | |
self.setWindowTitle(f"pytubefix Qt GUI (pytubefix {pytube_version})") | |
self.resize(1000, 700) | |
# Widgets | |
self.url_edit = QLineEdit() | |
self.url_edit.setPlaceholderText("Enter video URL...") | |
self.get_info_btn = QPushButton("Get Info") | |
self.download_btn = QPushButton("Download") | |
self.download_btn.setEnabled(False) | |
self.pick_dir_btn = QPushButton("Set Download Folder") | |
top_row = QHBoxLayout() | |
top_row.addWidget(QLabel("URL:")) | |
top_row.addWidget(self.url_edit) | |
top_row.addWidget(self.get_info_btn) | |
top_row.addWidget(self.download_btn) | |
top_row.addWidget(self.pick_dir_btn) | |
self.table = QTableWidget(0, 9) | |
self.table.setHorizontalHeaderLabels([ | |
"itag", "type", "mime_type", "res", "fps", | |
"vcodec", "acodec", "abr", "progressive" | |
]) | |
self.table.setSelectionBehavior(self.table.SelectRows) | |
self.table.setSelectionMode(self.table.SingleSelection) | |
self.table.setEditTriggers(self.table.NoEditTriggers) | |
self.table.horizontalHeader().setStretchLastSection(True) | |
self.output = QTextEdit() | |
self.output.setReadOnly(True) | |
central = QWidget() | |
layout = QVBoxLayout(central) | |
layout.addLayout(top_row) | |
layout.addWidget(self.table, stretch=2) | |
layout.addWidget(QLabel("Console output:")) | |
layout.addWidget(self.output, stretch=3) | |
self.setCentralWidget(central) | |
# State | |
self.last_url = "" | |
self.parsed_rows: List[StreamRow] = [] | |
self.download_dir = DEFAULT_DOWNLOAD_DIR | |
self.info_worker: Optional[InfoWorker] = None | |
self.dl_worker: Optional[DownloadWorker] = None | |
# Connect UI | |
self.get_info_btn.clicked.connect(self.on_get_info) | |
self.download_btn.clicked.connect(self.on_download) | |
self.table.itemSelectionChanged.connect(self.on_selection_changed) | |
self.pick_dir_btn.clicked.connect(self.on_pick_dir) | |
# ---------- UI helpers ---------- | |
def append_output(self, text: str): | |
self.output.moveCursor(self.output.textCursor().End) | |
self.output.insertPlainText(text) | |
self.output.moveCursor(self.output.textCursor().End) | |
def clear_table(self): | |
self.table.setRowCount(0) | |
self.parsed_rows = [] | |
def populate_table(self, rows: List[StreamRow]): | |
self.clear_table() | |
for r in rows: | |
row_idx = self.table.rowCount() | |
self.table.insertRow(row_idx) | |
def put(col, val): | |
item = QTableWidgetItem(val or "") | |
item.setData(Qt.UserRole, val or "") | |
self.table.setItem(row_idx, col, item) | |
put(0, r.itag) | |
put(1, r.type) | |
put(2, r.mime_type) | |
put(3, r.res) | |
put(4, r.fps) | |
put(5, r.vcodec) | |
put(6, r.acodec) | |
put(7, r.abr) | |
put(8, r.progressive) | |
self.download_btn.setEnabled(self.table.rowCount() > 0 and self.table.currentRow() >= 0) | |
def selected_itag(self) -> Optional[str]: | |
row = self.table.currentRow() | |
if row < 0: | |
return None | |
item = self.table.item(row, 0) # itag | |
return item.text().strip() if item else None | |
# ---------- Button handlers ---------- | |
def on_get_info(self): | |
url = self.url_edit.text().strip() | |
if not url: | |
QMessageBox.warning(self, "Missing URL", "Please enter a video URL.") | |
return | |
self.last_url = url | |
self.get_info_btn.setEnabled(False) | |
self.download_btn.setEnabled(False) | |
self.clear_table() | |
self.output.clear() | |
self.append_output(f"Getting stream info via pytubefix for: {url}\n\n") | |
self.info_worker = InfoWorker(url) | |
self.info_worker.progress.connect(self.append_output) | |
self.info_worker.result_streams.connect(self._on_info_streams) | |
self.info_worker.error.connect(self._on_info_error) | |
self.info_worker.finished.connect(self._on_info_done) | |
self.info_worker.start() | |
def _on_info_streams(self, rows: List[StreamRow]): | |
self.parsed_rows = rows | |
self.populate_table(rows) | |
def _on_info_error(self, msg: str): | |
self.append_output(msg) | |
QMessageBox.critical(self, "Get Info Error", msg) | |
def _on_info_done(self): | |
self.get_info_btn.setEnabled(True) | |
self.download_btn.setEnabled(self.table.rowCount() > 0 and self.table.currentRow() >= 0) | |
self.info_worker = None | |
def on_pick_dir(self): | |
path = QFileDialog.getExistingDirectory(self, "Select download folder") | |
if path: | |
self.download_dir = path | |
self.append_output(f"[Settings] Download folder set to: {path}\n") | |
def on_download(self): | |
if not self.last_url: | |
QMessageBox.warning(self, "Missing URL", "Please enter a video URL and fetch info first.") | |
return | |
itag = self.selected_itag() | |
if not itag: | |
QMessageBox.warning(self, "No Selection", "Please select a stream (row) in the table.") | |
return | |
if not self.download_dir: | |
self.on_pick_dir() | |
if not self.download_dir: | |
QMessageBox.information(self, "Cancelled", "No download folder selected.") | |
return | |
self.get_info_btn.setEnabled(False) | |
self.download_btn.setEnabled(False) | |
self.append_output(f"\nStarting download (itag {itag})...\n\n") | |
self.dl_worker = DownloadWorker(self.last_url, itag, self.download_dir) | |
self.dl_worker.progress.connect(self.append_output) | |
self.dl_worker.finished_ok.connect(self._on_download_ok) | |
self.dl_worker.error.connect(self._on_download_error) | |
self.dl_worker.finished.connect(self._on_download_done) | |
self.dl_worker.start() | |
def _on_download_ok(self, filepath: str): | |
QMessageBox.information(self, "Download complete", f"Saved:\n{filepath}") | |
def _on_download_error(self, msg: str): | |
self.append_output(msg) | |
QMessageBox.critical(self, "Download Error", msg) | |
def _on_download_done(self): | |
self.get_info_btn.setEnabled(True) | |
self.download_btn.setEnabled(self.table.rowCount() > 0 and self.table.currentRow() >= 0) | |
self.dl_worker = None | |
def on_selection_changed(self): | |
has_selection = self.table.currentRow() >= 0 | |
self.download_btn.setEnabled(has_selection and self.table.rowCount() > 0) | |
def main(): | |
app = QApplication(sys.argv) | |
version = getattr(pytubefix, "__version__", "unknown") | |
w = MainWindow(version) | |
w.show() | |
sys.exit(app.exec_()) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment