Skip to content

Instantly share code, notes, and snippets.

@warmonkey
Last active August 23, 2025 07:14
Show Gist options
  • Save warmonkey/704f08f22e86f5b04ac733c848dc5f07 to your computer and use it in GitHub Desktop.
Save warmonkey/704f08f22e86f5b04ac733c848dc5f07 to your computer and use it in GitHub Desktop.
A minimal GUI for pytubefix with a/v merging support (call ffmpeg if video only)
#!/usr/bin/env python3
import sys
import re
import os
import shutil
import subprocess
import tempfile
from dataclasses import dataclass
from typing import List, Optional
from PyQt5.QtCore import Qt, QThread, pyqtSignal
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLineEdit, QPushButton, QTextEdit, QTableWidget, QTableWidgetItem,
QMessageBox, QLabel, QFileDialog
)
# ======= SETTINGS =======
DEFAULT_DOWNLOAD_DIR = None # None => ask the first time
FFMPEG_BIN = shutil.which("ffmpeg") or "ffmpeg" # use PATH or a fixed path string
# ========================
try:
import pytubefix
from pytubefix import YouTube
except Exception as e:
app = QApplication(sys.argv)
QMessageBox.critical(None, "pytubefix not available",
"Could not import the 'pytubefix' library.\n\n"
f"Import error:\n{e}\n\n"
"Install it with:\n pip install pytubefix")
sys.exit(1)
STREAM_RE = re.compile(r"<Stream:\s*(.*?)>")
@dataclass
class StreamRow:
itag: str = ""
type: str = ""
mime_type: str = ""
res: str = ""
fps: str = ""
vcodec: str = ""
acodec: str = ""
abr: str = ""
progressive: str = ""
def parse_stream_repr(repr_line: str) -> Optional[StreamRow]:
m = STREAM_RE.search(repr_line)
if not m:
return None
body = m.group(1).strip()
tokens = re.findall(r'(\w+)=(".*?"|\S+)', body)
d = {}
for k, v in tokens:
if len(v) >= 2 and v[0] == '"' and v[-1] == '"':
v = v[1:-1]
d[k] = v
return StreamRow(
itag=d.get("itag", ""),
type=d.get("type", ""),
mime_type=d.get("mime_type", ""),
res=d.get("res", ""),
fps=d.get("fps", ""),
vcodec=d.get("vcodec", ""),
acodec=d.get("acodec", ""),
abr=d.get("abr", ""),
progressive=d.get("progressive", "")
)
def safe_filename(title: str) -> str:
# conservative sanitizer for file names across platforms
bad = r'<>:"/\|?*'
name = ''.join('_' if c in bad else c for c in title)
# trim whitespace and collapse runs of spaces/underscores
name = re.sub(r'[\s_]+', ' ', name).strip()
return name or "video"
# ---------------- Workers (QThread) ----------------
class InfoWorker(QThread):
progress = pyqtSignal(str)
result_streams = pyqtSignal(list)
error = pyqtSignal(str)
def __init__(self, url: str):
super().__init__()
self.url = url
def run(self):
try:
self.progress.emit(f"[Info] Initializing YouTube object...\n")
yt = YouTube(self.url)
self.progress.emit(f"[Info] Title: {yt.title}\n")
streams = yt.streams
rows: List[StreamRow] = []
self.progress.emit(f"[Info] Found {len(streams)} streams\n\n")
for s in streams:
line = str(s)
self.progress.emit(line + "\n")
parsed = parse_stream_repr(line)
if parsed:
rows.append(parsed)
self.result_streams.emit(rows)
except Exception as e:
self.error.emit(f"[Info] Error: {e}\n")
class DownloadWorker(QThread):
progress = pyqtSignal(str)
finished_ok = pyqtSignal(str) # final output file path
error = pyqtSignal(str)
def __init__(self, url: str, itag: str, outdir: str):
super().__init__()
self.url = url
self.itag = itag
self.outdir = outdir
def run(self):
try:
if not shutil.which(FFMPEG_BIN):
self.error.emit("[Download] ffmpeg not found on PATH. Please install ffmpeg.")
return
yt = YouTube(self.url, on_progress_callback=self._on_progress)
title = safe_filename(yt.title)
target_mp4 = os.path.join(self.outdir, f"{title}.mp4")
stream = yt.streams.get_by_itag(int(self.itag))
if not stream:
self.error.emit("[Download] Selected itag not available.")
return
if getattr(stream, "is_progressive", False):
self.progress.emit("[Download] Progressive stream selected (video+audio). Downloading...\n")
out_path = stream.download(output_path=self.outdir, filename=f"{title}.mp4")
self.progress.emit(f"[Download] Saved to: {out_path}\n")
self.finished_ok.emit(out_path)
return
# Non-progressive: download selected VIDEO stream + best AUDIO-only stream
self.progress.emit("[Download] Adaptive stream selected. Downloading video and best audio...\n")
# Pick best audio-only by abr
audio_stream = yt.streams.filter(only_audio=True).order_by("abr").last()
if not audio_stream:
self.error.emit("[Download] Could not find an audio-only stream.")
return
tmpdir = tempfile.mkdtemp(prefix="pytubefix-")
video_tmp = os.path.join(tmpdir, f"{title}.video.{stream.subtype or 'mp4'}")
audio_tmp = os.path.join(tmpdir, f"{title}.audio.{audio_stream.subtype or 'm4a'}")
# Download both (keeping original extensions)
self.progress.emit(f"[Download] Video itag={stream.itag}, res={getattr(stream, 'resolution', '')}\n")
v_path = stream.download(output_path=tmpdir, filename=os.path.basename(video_tmp))
self.progress.emit(f"[Download] Video saved: {v_path}\n")
self.progress.emit(f"[Download] Audio itag={audio_stream.itag}, abr={getattr(audio_stream, 'abr', '')}\n")
a_path = audio_stream.download(output_path=tmpdir, filename=os.path.basename(audio_tmp))
self.progress.emit(f"[Download] Audio saved: {a_path}\n")
# Merge with ffmpeg (copy video, AAC for audio)
self.progress.emit("[ffmpeg] Merging audio + video...\n")
cmd = [
FFMPEG_BIN,
"-y",
"-i", v_path,
"-i", a_path,
"-c:v", "copy",
"-c:a", "aac",
"-movflags", "+faststart",
target_mp4
]
self._run_ffmpeg(cmd)
self.progress.emit(f"[ffmpeg] Merged file: {target_mp4}\n")
self.finished_ok.emit(target_mp4)
# Cleanup temp files
try:
os.remove(v_path)
os.remove(a_path)
os.rmdir(tmpdir)
except Exception:
pass
except Exception as e:
self.error.emit(f"[Download] Error: {e}\n")
def _on_progress(self, stream, chunk, bytes_remaining):
try:
filesize = getattr(stream, "filesize", None)
if not filesize:
return
done = filesize - bytes_remaining
pct = 100.0 * done / filesize if filesize else 0
self.progress.emit(f"[Download] {pct:5.1f}% ({done}/{filesize} bytes) {stream.type}\n")
except Exception:
pass
def _run_ffmpeg(self, cmd: List[str]):
# stream stderr to the UI for progress
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, encoding='utf-8')
assert proc.stderr is not None
for line in proc.stderr:
if line.strip():
self.progress.emit("[ffmpeg] " + line)
proc.wait()
if proc.returncode != 0:
raise RuntimeError(f"ffmpeg exited with code {proc.returncode}")
# ------------------- Main Window -------------------
class MainWindow(QMainWindow):
def __init__(self, pytube_version: str):
super().__init__()
self.setWindowTitle(f"pytubefix Qt GUI (pytubefix {pytube_version})")
self.resize(1000, 700)
# Widgets
self.url_edit = QLineEdit()
self.url_edit.setPlaceholderText("Enter video URL...")
self.get_info_btn = QPushButton("Get Info")
self.download_btn = QPushButton("Download")
self.download_btn.setEnabled(False)
self.pick_dir_btn = QPushButton("Set Download Folder")
top_row = QHBoxLayout()
top_row.addWidget(QLabel("URL:"))
top_row.addWidget(self.url_edit)
top_row.addWidget(self.get_info_btn)
top_row.addWidget(self.download_btn)
top_row.addWidget(self.pick_dir_btn)
self.table = QTableWidget(0, 9)
self.table.setHorizontalHeaderLabels([
"itag", "type", "mime_type", "res", "fps",
"vcodec", "acodec", "abr", "progressive"
])
self.table.setSelectionBehavior(self.table.SelectRows)
self.table.setSelectionMode(self.table.SingleSelection)
self.table.setEditTriggers(self.table.NoEditTriggers)
self.table.horizontalHeader().setStretchLastSection(True)
self.output = QTextEdit()
self.output.setReadOnly(True)
central = QWidget()
layout = QVBoxLayout(central)
layout.addLayout(top_row)
layout.addWidget(self.table, stretch=2)
layout.addWidget(QLabel("Console output:"))
layout.addWidget(self.output, stretch=3)
self.setCentralWidget(central)
# State
self.last_url = ""
self.parsed_rows: List[StreamRow] = []
self.download_dir = DEFAULT_DOWNLOAD_DIR
self.info_worker: Optional[InfoWorker] = None
self.dl_worker: Optional[DownloadWorker] = None
# Connect UI
self.get_info_btn.clicked.connect(self.on_get_info)
self.download_btn.clicked.connect(self.on_download)
self.table.itemSelectionChanged.connect(self.on_selection_changed)
self.pick_dir_btn.clicked.connect(self.on_pick_dir)
# ---------- UI helpers ----------
def append_output(self, text: str):
self.output.moveCursor(self.output.textCursor().End)
self.output.insertPlainText(text)
self.output.moveCursor(self.output.textCursor().End)
def clear_table(self):
self.table.setRowCount(0)
self.parsed_rows = []
def populate_table(self, rows: List[StreamRow]):
self.clear_table()
for r in rows:
row_idx = self.table.rowCount()
self.table.insertRow(row_idx)
def put(col, val):
item = QTableWidgetItem(val or "")
item.setData(Qt.UserRole, val or "")
self.table.setItem(row_idx, col, item)
put(0, r.itag)
put(1, r.type)
put(2, r.mime_type)
put(3, r.res)
put(4, r.fps)
put(5, r.vcodec)
put(6, r.acodec)
put(7, r.abr)
put(8, r.progressive)
self.download_btn.setEnabled(self.table.rowCount() > 0 and self.table.currentRow() >= 0)
def selected_itag(self) -> Optional[str]:
row = self.table.currentRow()
if row < 0:
return None
item = self.table.item(row, 0) # itag
return item.text().strip() if item else None
# ---------- Button handlers ----------
def on_get_info(self):
url = self.url_edit.text().strip()
if not url:
QMessageBox.warning(self, "Missing URL", "Please enter a video URL.")
return
self.last_url = url
self.get_info_btn.setEnabled(False)
self.download_btn.setEnabled(False)
self.clear_table()
self.output.clear()
self.append_output(f"Getting stream info via pytubefix for: {url}\n\n")
self.info_worker = InfoWorker(url)
self.info_worker.progress.connect(self.append_output)
self.info_worker.result_streams.connect(self._on_info_streams)
self.info_worker.error.connect(self._on_info_error)
self.info_worker.finished.connect(self._on_info_done)
self.info_worker.start()
def _on_info_streams(self, rows: List[StreamRow]):
self.parsed_rows = rows
self.populate_table(rows)
def _on_info_error(self, msg: str):
self.append_output(msg)
QMessageBox.critical(self, "Get Info Error", msg)
def _on_info_done(self):
self.get_info_btn.setEnabled(True)
self.download_btn.setEnabled(self.table.rowCount() > 0 and self.table.currentRow() >= 0)
self.info_worker = None
def on_pick_dir(self):
path = QFileDialog.getExistingDirectory(self, "Select download folder")
if path:
self.download_dir = path
self.append_output(f"[Settings] Download folder set to: {path}\n")
def on_download(self):
if not self.last_url:
QMessageBox.warning(self, "Missing URL", "Please enter a video URL and fetch info first.")
return
itag = self.selected_itag()
if not itag:
QMessageBox.warning(self, "No Selection", "Please select a stream (row) in the table.")
return
if not self.download_dir:
self.on_pick_dir()
if not self.download_dir:
QMessageBox.information(self, "Cancelled", "No download folder selected.")
return
self.get_info_btn.setEnabled(False)
self.download_btn.setEnabled(False)
self.append_output(f"\nStarting download (itag {itag})...\n\n")
self.dl_worker = DownloadWorker(self.last_url, itag, self.download_dir)
self.dl_worker.progress.connect(self.append_output)
self.dl_worker.finished_ok.connect(self._on_download_ok)
self.dl_worker.error.connect(self._on_download_error)
self.dl_worker.finished.connect(self._on_download_done)
self.dl_worker.start()
def _on_download_ok(self, filepath: str):
QMessageBox.information(self, "Download complete", f"Saved:\n{filepath}")
def _on_download_error(self, msg: str):
self.append_output(msg)
QMessageBox.critical(self, "Download Error", msg)
def _on_download_done(self):
self.get_info_btn.setEnabled(True)
self.download_btn.setEnabled(self.table.rowCount() > 0 and self.table.currentRow() >= 0)
self.dl_worker = None
def on_selection_changed(self):
has_selection = self.table.currentRow() >= 0
self.download_btn.setEnabled(has_selection and self.table.rowCount() > 0)
def main():
app = QApplication(sys.argv)
version = getattr(pytubefix, "__version__", "unknown")
w = MainWindow(version)
w.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment