Skip to content

Instantly share code, notes, and snippets.

@lennier1
Created April 22, 2025 20:04
Show Gist options
  • Save lennier1/79964722fde0419ad5333433cbcfe9c9 to your computer and use it in GitHub Desktop.
Save lennier1/79964722fde0419ad5333433cbcfe9c9 to your computer and use it in GitHub Desktop.
Download available segments of partially corrupted Twitch videos
#!/usr/bin/env python3
"""
Twitch highlight mirror – resilient, resumable, and verbose-friendly.
* Reads a plain-text file (one .m3u8 URL per line) passed on the command line.
* Creates a `downloads/<playlist-name>` directory for each list.
* Saves the original `.m3u8` file beside its segments for reference.
* Downloads every `.ts` segment.
* **HTTP 403 → skip immediately** (segment really is gone).
* **Timeouts / transient errors → retry up to 3×** with exponential back-off.
* **Playlist fetch is now retried** the same way, so occasional time-outs won’t
abort the whole video.
* **Resumable** – existing non-empty files are kept; empty or partial files are
re-downloaded automatically.
* **Progress meter** – every 50 segments prints plain-text stats:
downloaded / skipped-403 / errors.
Usage
-----
python download_twitch_m3u8.py urls.txt
Dependencies
------------
pip install requests
"""
from __future__ import annotations
import concurrent.futures
import pathlib
import sys
import threading
import time
import urllib.parse
from typing import Iterable
import requests
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
MAX_RETRIES = 3 # attempts per network operation on errors / time-outs
BACKOFF_BASE = 1 # seconds – grows 1, 2, 4 …
SEG_CHUNK = 1 << 20 # 1 MiB
THREADS = 8 # parallel downloads per playlist
REQ_TIMEOUT = 15 # seconds for each HTTP request
PROGRESS_EVERY = 50 # how many segments between progress prints
# A single MPEG-TS packet is 188 bytes; anything smaller is certainly corrupt.
MIN_TS_SIZE = 188
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def retry_sleep(attempt: int) -> None:
time.sleep(BACKOFF_BASE * (2 ** (attempt - 1)))
def iter_playlists(url_list_path: pathlib.Path) -> Iterable[str]:
"""Yield one cleaned URL per non-blank line in *url_list_path*."""
with url_list_path.open("r", encoding="utf-8-sig") as fp:
for raw in fp:
url = raw.strip().lstrip("\ufeff")
if url:
yield url
# ---------------------------------------------------------------------------
# Core downloader
# ---------------------------------------------------------------------------
def download_playlist(m3u8_url: str, out_root: pathlib.Path, session: requests.Session) -> None:
playlist_name = pathlib.Path(urllib.parse.urlparse(m3u8_url).path).stem
target_dir = out_root / playlist_name
target_dir.mkdir(parents=True, exist_ok=True)
print(f"\n=== {playlist_name} ===")
# ---------------- Playlist fetch with retry ---------------------------
resp = None
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = session.get(m3u8_url, timeout=REQ_TIMEOUT)
resp.raise_for_status()
break # success
except (requests.Timeout, requests.ConnectionError, requests.HTTPError) as exc:
if attempt == MAX_RETRIES:
print(f"Cannot download playlist {m3u8_url!s}: {exc}")
return
retry_sleep(attempt)
assert resp is not None # for type checkers
# Save a local copy of the playlist for later.
playlist_filename = pathlib.Path(m3u8_url).name
(target_dir / playlist_filename).write_text(resp.text, encoding="utf-8")
segments = [ln.strip() for ln in resp.text.splitlines() if ln and not ln.startswith("#")]
if not segments:
print("No segments found – maybe a variant playlist?")
return
total_segments = len(segments)
print(f"Listed {total_segments} segments. Beginning download…")
base_url = m3u8_url.rsplit("/", 1)[0] + "/"
# Thread-safe counters and lock for progress updates
counts = {
"done": 0, # successfully downloaded or already present
"gone": 0, # 403
"error": 0, # other permanent failures
}
lock = threading.Lock()
def maybe_report_progress() -> None:
processed = counts["done"] + counts["gone"] + counts["error"]
if processed % PROGRESS_EVERY == 0 or processed == total_segments:
print(
f"Progress: downloaded {counts['done']} / skipped-403 {counts['gone']} / "
f"errors {counts['error']} ({processed}/{total_segments})"
)
def grab_segment(name: str) -> None:
seg_path = target_dir / name
# Already have a good copy
if seg_path.exists() and seg_path.stat().st_size >= MIN_TS_SIZE:
with lock:
counts["done"] += 1
maybe_report_progress()
return
if seg_path.exists():
seg_path.unlink(missing_ok=True)
seg_url = urllib.parse.urljoin(base_url, name)
outcome = "error" # pessimistic default
for attempt in range(1, MAX_RETRIES + 1):
try:
r = session.get(seg_url, stream=True, timeout=REQ_TIMEOUT)
if r.status_code == 200:
with open(seg_path, "wb") as f:
for chunk in r.iter_content(chunk_size=SEG_CHUNK):
if chunk:
f.write(chunk)
if seg_path.stat().st_size < MIN_TS_SIZE:
raise IOError("truncated")
outcome = "done"
break
if r.status_code == 403:
outcome = "gone"
break
raise requests.HTTPError(f"HTTP {r.status_code}")
except (requests.Timeout, requests.ConnectionError, requests.HTTPError, IOError):
if attempt == MAX_RETRIES:
outcome = "error"
else:
retry_sleep(attempt)
continue
break
with lock:
counts[outcome] += 1
maybe_report_progress()
with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS) as pool:
pool.map(grab_segment, segments)
print(
f"Finished {playlist_name}. downloaded {counts['done']} / skipped-403 {counts['gone']} / "
f"errors {counts['error']} – saved to {target_dir}"
)
# ---------------------------------------------------------------------------
# CLI entry-point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python download_twitch_m3u8.py <urls.txt>")
sys.exit(1)
url_file = pathlib.Path(sys.argv[1]).expanduser()
if not url_file.exists():
print(f"URL list {url_file} not found.")
sys.exit(1)
out_root = pathlib.Path("downloads")
out_root.mkdir(exist_ok=True)
with requests.Session() as sess:
for url in iter_playlists(url_file):
download_playlist(url, out_root, sess)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment