Created
April 22, 2025 20:04
-
-
Save lennier1/79964722fde0419ad5333433cbcfe9c9 to your computer and use it in GitHub Desktop.
Download available segments of partially corrupted Twitch videos
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Twitch highlight mirror – resilient, resumable, and verbose-friendly. | |
* Reads a plain-text file (one .m3u8 URL per line) passed on the command line. | |
* Creates a `downloads/<playlist-name>` directory for each list. | |
* Saves the original `.m3u8` file beside its segments for reference. | |
* Downloads every `.ts` segment. | |
* **HTTP 403 → skip immediately** (segment really is gone). | |
* **Timeouts / transient errors → retry up to 3×** with exponential back-off. | |
* **Playlist fetch is now retried** the same way, so occasional time-outs won’t | |
abort the whole video. | |
* **Resumable** – existing non-empty files are kept; empty or partial files are | |
re-downloaded automatically. | |
* **Progress meter** – every 50 segments prints plain-text stats: | |
downloaded / skipped-403 / errors. | |
Usage | |
----- | |
python download_twitch_m3u8.py urls.txt | |
Dependencies | |
------------ | |
pip install requests | |
""" | |
from __future__ import annotations | |
import concurrent.futures | |
import pathlib | |
import sys | |
import threading | |
import time | |
import urllib.parse | |
from typing import Iterable | |
import requests | |
# --------------------------------------------------------------------------- | |
# Config | |
# --------------------------------------------------------------------------- | |
MAX_RETRIES = 3 # attempts per network operation on errors / time-outs | |
BACKOFF_BASE = 1 # seconds – grows 1, 2, 4 … | |
SEG_CHUNK = 1 << 20 # 1 MiB | |
THREADS = 8 # parallel downloads per playlist | |
REQ_TIMEOUT = 15 # seconds for each HTTP request | |
PROGRESS_EVERY = 50 # how many segments between progress prints | |
# A single MPEG-TS packet is 188 bytes; anything smaller is certainly corrupt. | |
MIN_TS_SIZE = 188 | |
# --------------------------------------------------------------------------- | |
# Helpers | |
# --------------------------------------------------------------------------- | |
def retry_sleep(attempt: int) -> None: | |
time.sleep(BACKOFF_BASE * (2 ** (attempt - 1))) | |
def iter_playlists(url_list_path: pathlib.Path) -> Iterable[str]: | |
"""Yield one cleaned URL per non-blank line in *url_list_path*.""" | |
with url_list_path.open("r", encoding="utf-8-sig") as fp: | |
for raw in fp: | |
url = raw.strip().lstrip("\ufeff") | |
if url: | |
yield url | |
# --------------------------------------------------------------------------- | |
# Core downloader | |
# --------------------------------------------------------------------------- | |
def download_playlist(m3u8_url: str, out_root: pathlib.Path, session: requests.Session) -> None: | |
playlist_name = pathlib.Path(urllib.parse.urlparse(m3u8_url).path).stem | |
target_dir = out_root / playlist_name | |
target_dir.mkdir(parents=True, exist_ok=True) | |
print(f"\n=== {playlist_name} ===") | |
# ---------------- Playlist fetch with retry --------------------------- | |
resp = None | |
for attempt in range(1, MAX_RETRIES + 1): | |
try: | |
resp = session.get(m3u8_url, timeout=REQ_TIMEOUT) | |
resp.raise_for_status() | |
break # success | |
except (requests.Timeout, requests.ConnectionError, requests.HTTPError) as exc: | |
if attempt == MAX_RETRIES: | |
print(f"Cannot download playlist {m3u8_url!s}: {exc}") | |
return | |
retry_sleep(attempt) | |
assert resp is not None # for type checkers | |
# Save a local copy of the playlist for later. | |
playlist_filename = pathlib.Path(m3u8_url).name | |
(target_dir / playlist_filename).write_text(resp.text, encoding="utf-8") | |
segments = [ln.strip() for ln in resp.text.splitlines() if ln and not ln.startswith("#")] | |
if not segments: | |
print("No segments found – maybe a variant playlist?") | |
return | |
total_segments = len(segments) | |
print(f"Listed {total_segments} segments. Beginning download…") | |
base_url = m3u8_url.rsplit("/", 1)[0] + "/" | |
# Thread-safe counters and lock for progress updates | |
counts = { | |
"done": 0, # successfully downloaded or already present | |
"gone": 0, # 403 | |
"error": 0, # other permanent failures | |
} | |
lock = threading.Lock() | |
def maybe_report_progress() -> None: | |
processed = counts["done"] + counts["gone"] + counts["error"] | |
if processed % PROGRESS_EVERY == 0 or processed == total_segments: | |
print( | |
f"Progress: downloaded {counts['done']} / skipped-403 {counts['gone']} / " | |
f"errors {counts['error']} ({processed}/{total_segments})" | |
) | |
def grab_segment(name: str) -> None: | |
seg_path = target_dir / name | |
# Already have a good copy | |
if seg_path.exists() and seg_path.stat().st_size >= MIN_TS_SIZE: | |
with lock: | |
counts["done"] += 1 | |
maybe_report_progress() | |
return | |
if seg_path.exists(): | |
seg_path.unlink(missing_ok=True) | |
seg_url = urllib.parse.urljoin(base_url, name) | |
outcome = "error" # pessimistic default | |
for attempt in range(1, MAX_RETRIES + 1): | |
try: | |
r = session.get(seg_url, stream=True, timeout=REQ_TIMEOUT) | |
if r.status_code == 200: | |
with open(seg_path, "wb") as f: | |
for chunk in r.iter_content(chunk_size=SEG_CHUNK): | |
if chunk: | |
f.write(chunk) | |
if seg_path.stat().st_size < MIN_TS_SIZE: | |
raise IOError("truncated") | |
outcome = "done" | |
break | |
if r.status_code == 403: | |
outcome = "gone" | |
break | |
raise requests.HTTPError(f"HTTP {r.status_code}") | |
except (requests.Timeout, requests.ConnectionError, requests.HTTPError, IOError): | |
if attempt == MAX_RETRIES: | |
outcome = "error" | |
else: | |
retry_sleep(attempt) | |
continue | |
break | |
with lock: | |
counts[outcome] += 1 | |
maybe_report_progress() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS) as pool: | |
pool.map(grab_segment, segments) | |
print( | |
f"Finished {playlist_name}. downloaded {counts['done']} / skipped-403 {counts['gone']} / " | |
f"errors {counts['error']} – saved to {target_dir}" | |
) | |
# --------------------------------------------------------------------------- | |
# CLI entry-point | |
# --------------------------------------------------------------------------- | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
print("Usage: python download_twitch_m3u8.py <urls.txt>") | |
sys.exit(1) | |
url_file = pathlib.Path(sys.argv[1]).expanduser() | |
if not url_file.exists(): | |
print(f"URL list {url_file} not found.") | |
sys.exit(1) | |
out_root = pathlib.Path("downloads") | |
out_root.mkdir(exist_ok=True) | |
with requests.Session() as sess: | |
for url in iter_playlists(url_file): | |
download_playlist(url, out_root, sess) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment