Last active
June 12, 2022 10:20
-
-
Save hizkifw/ba755b81dd872e9b5bdd9910c7dc0dd7 to your computer and use it in GitHub Desktop.
Dump HLS live stream into stdout. Code taken from fc2-live-dl.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import aiohttp | |
import asyncio | |
import time | |
import logging | |
import sys | |
import http | |
import urllib.parse | |
class HLSDownloader: | |
def __init__(self, session, url, threads): | |
self._session = session | |
self._url = url | |
self._threads = threads | |
self._logger = logging.Logger("hls") | |
self._frag_urls = asyncio.PriorityQueue(100) | |
self._frag_data = asyncio.PriorityQueue(100) | |
self._download_task = None | |
async def __aenter__(self): | |
self._loop = asyncio.get_running_loop() | |
self._logger.debug("init") | |
return self | |
async def __aexit__(self, *err): | |
self._logger.debug("exit %s", err) | |
if self._download_task is not None: | |
self._download_task.cancel() | |
await self._download_task | |
def absolutify(self, url): | |
baseurl = ( | |
self._url | |
if self._url[-1] == "/" | |
else ("/".join(self._url.split("/")[:-1]) + "/") | |
) | |
parsed = urllib.parse.urlparse(self._url) | |
host = parsed.scheme + "://" + parsed.netloc | |
if url[0] == "/": | |
return host + url | |
elif url.startswith("http://") or url.startswith("https://"): | |
return url | |
else: | |
return baseurl + url | |
async def _get_fragment_urls(self): | |
async with self._session.get(self._url) as resp: | |
if resp.status == 403: | |
raise Exception("Got a 403") | |
elif resp.status == 404: | |
return [] | |
playlist = await resp.text() | |
return [ | |
self.absolutify(line.strip()) | |
for line in playlist.split("\n") | |
if len(line) > 0 and not line[0] == "#" | |
] | |
async def _fill_queue(self): | |
last_fragment_timestamp = time.time() | |
last_fragment = None | |
frag_idx = 0 | |
while True: | |
try: | |
frags = await self._get_fragment_urls() | |
new_idx = 0 | |
try: | |
new_idx = 1 + frags.index(last_fragment) | |
except: | |
pass | |
n_new = len(frags) - new_idx | |
if n_new > 0: | |
last_fragment_timestamp = time.time() | |
self._logger.debug("Found %d new fragments", n_new) | |
for frag in frags[new_idx:]: | |
last_fragment = frag | |
await self._frag_urls.put((frag_idx, (frag, 0))) | |
frag_idx += 1 | |
if time.time() - last_fragment_timestamp > 30: | |
self._logger.debug("Timeout receiving new segments") | |
return | |
await asyncio.sleep(1) | |
except Exception as ex: | |
self._logger.error("Error fetching new segments: %s", ex) | |
return | |
async def _download_worker(self, wid): | |
try: | |
while True: | |
i, (url, tries) = await self._frag_urls.get() | |
self._logger.debug(wid, "Downloading fragment", i) | |
try: | |
async with self._session.get(url) as resp: | |
if resp.status > 299: | |
self._logger.error( | |
"[%s] Fragment %d errored: %s", wid, i, resp.status | |
) | |
if tries < 5: | |
self._logger.debug("[%s] Retrying fragment %d", wid, i) | |
await self._frag_urls.put((i, (url, tries + 1))) | |
else: | |
self._logger.error( | |
"[%s] Gave up on fragment %d after %d tries", | |
wid, | |
i, | |
tries, | |
) | |
await self._frag_data.put((i, b"")) | |
else: | |
await self._frag_data.put((i, await resp.read())) | |
except Exception as ex: | |
self._logger.error(wid, "Unhandled exception:", ex) | |
except asyncio.CancelledError: | |
self._logger.debug("[%s] worker cancelled", wid) | |
async def _download(self): | |
tasks = [] | |
try: | |
if self._threads > 1: | |
self._logger.info("Downloading with %d threads", self._threads) | |
if self._threads > 8: | |
self._logger.warn("Using more than 8 threads is not recommended") | |
tasks = [ | |
asyncio.create_task(self._download_worker(i)) | |
for i in range(self._threads) | |
] | |
self._logger.debug("Starting queue worker") | |
await self._fill_queue() | |
self._logger.debug("Queue finished") | |
for task in tasks: | |
task.cancel() | |
await task | |
self._logger.debug("Workers quit") | |
except asyncio.CancelledError: | |
self._logger.debug("_download cancelled") | |
for task in tasks: | |
task.cancel() | |
await task | |
async def _read(self, index): | |
while True: | |
p, frag = await self._frag_data.get() | |
if p == index: | |
return frag | |
await self._frag_data.put((p, frag)) | |
await asyncio.sleep(0.1) | |
async def read(self): | |
try: | |
if self._download_task is None: | |
self._download_task = asyncio.create_task(self._download()) | |
index = 0 | |
while True: | |
yield await self._read(index) | |
index += 1 | |
except asyncio.CancelledError: | |
self._logger.debug("read cancelled") | |
if self._download_task is not None: | |
self._download_task.cancel() | |
await self._download_task | |
def parse_cookies_file(cookies_file): | |
cookies = http.cookies.SimpleCookie() | |
with open(cookies_file, "r") as cf: | |
for line in cf: | |
try: | |
domain, _flag, path, secure, _expiration, name, value = [ | |
t.strip() for t in line.split("\t") | |
] | |
cookies[name] = value | |
cookies[name]["domain"] = domain.replace("#HttpOnly_", "") | |
cookies[name]["path"] = path | |
cookies[name]["secure"] = secure | |
cookies[name]["httponly"] = domain.startswith("#HttpOnly_") | |
except Exception as ex: | |
pass | |
return cookies | |
def sizeof_fmt(num, suffix="B"): | |
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: | |
if abs(num) < 1024.0: | |
return f"{num:3.1f}{unit}{suffix}" | |
num /= 1024.0 | |
return f"{num:.1f}Yi{suffix}" | |
async def main(): | |
logger = logging.Logger("main") | |
parser = argparse.ArgumentParser() | |
parser.add_argument("url", help="HLS playlist URL") | |
parser.add_argument( | |
"--threads", | |
type=int, | |
default=1, | |
help="The size of the thread pool used to download segments. Default is 1.", | |
) | |
parser.add_argument("--cookies", help="Path to a cookies file.") | |
parser.add_argument( | |
"--force-output", | |
type=bool, | |
help="Dump video into stdout even when attached to a tty.", | |
) | |
args = parser.parse_args() | |
# Check tty | |
if sys.stdout.isatty() and not args.force_output: | |
logger.error("This program will dump raw video data into stdout.") | |
logger.error("Please pipe the output to ffmpeg or other suitable programs.") | |
logger.error("To ignore this warning, pass --force-output.") | |
return | |
# Load cookies | |
cookie_jar = aiohttp.CookieJar() | |
cookies_file = args.cookies | |
if cookies_file is not None: | |
logger.info("Loading cookies from %s", cookies_file) | |
cookies = parse_cookies_file(cookies_file) | |
cookie_jar.update_cookies(cookies) | |
# Create a new session | |
session = aiohttp.ClientSession(cookie_jar=cookie_jar) | |
# Set up HLS downloader | |
async with HLSDownloader(session, args.url, args.threads) as hls: | |
n_frags = 0 | |
total_size = 0 | |
async for frag in hls.read(): | |
n_frags += 1 | |
total_size += len(frag) | |
sys.stdout.buffer.write(frag) | |
sys.stderr.write( | |
"Downloaded %d fragments, %s\r" % (n_frags, sizeof_fmt(total_size)) | |
) | |
logger.info("Done") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment