Skip to content

Instantly share code, notes, and snippets.

@hizkifw
Last active June 12, 2022 10:20
Show Gist options
  • Save hizkifw/ba755b81dd872e9b5bdd9910c7dc0dd7 to your computer and use it in GitHub Desktop.
Save hizkifw/ba755b81dd872e9b5bdd9910c7dc0dd7 to your computer and use it in GitHub Desktop.
Dump HLS live stream into stdout. Code taken from fc2-live-dl.
import argparse
import aiohttp
import asyncio
import time
import logging
import sys
import http
import urllib.parse
class HLSDownloader:
def __init__(self, session, url, threads):
self._session = session
self._url = url
self._threads = threads
self._logger = logging.Logger("hls")
self._frag_urls = asyncio.PriorityQueue(100)
self._frag_data = asyncio.PriorityQueue(100)
self._download_task = None
async def __aenter__(self):
self._loop = asyncio.get_running_loop()
self._logger.debug("init")
return self
async def __aexit__(self, *err):
self._logger.debug("exit %s", err)
if self._download_task is not None:
self._download_task.cancel()
await self._download_task
def absolutify(self, url):
baseurl = (
self._url
if self._url[-1] == "/"
else ("/".join(self._url.split("/")[:-1]) + "/")
)
parsed = urllib.parse.urlparse(self._url)
host = parsed.scheme + "://" + parsed.netloc
if url[0] == "/":
return host + url
elif url.startswith("http://") or url.startswith("https://"):
return url
else:
return baseurl + url
async def _get_fragment_urls(self):
async with self._session.get(self._url) as resp:
if resp.status == 403:
raise Exception("Got a 403")
elif resp.status == 404:
return []
playlist = await resp.text()
return [
self.absolutify(line.strip())
for line in playlist.split("\n")
if len(line) > 0 and not line[0] == "#"
]
async def _fill_queue(self):
last_fragment_timestamp = time.time()
last_fragment = None
frag_idx = 0
while True:
try:
frags = await self._get_fragment_urls()
new_idx = 0
try:
new_idx = 1 + frags.index(last_fragment)
except:
pass
n_new = len(frags) - new_idx
if n_new > 0:
last_fragment_timestamp = time.time()
self._logger.debug("Found %d new fragments", n_new)
for frag in frags[new_idx:]:
last_fragment = frag
await self._frag_urls.put((frag_idx, (frag, 0)))
frag_idx += 1
if time.time() - last_fragment_timestamp > 30:
self._logger.debug("Timeout receiving new segments")
return
await asyncio.sleep(1)
except Exception as ex:
self._logger.error("Error fetching new segments: %s", ex)
return
async def _download_worker(self, wid):
try:
while True:
i, (url, tries) = await self._frag_urls.get()
self._logger.debug(wid, "Downloading fragment", i)
try:
async with self._session.get(url) as resp:
if resp.status > 299:
self._logger.error(
"[%s] Fragment %d errored: %s", wid, i, resp.status
)
if tries < 5:
self._logger.debug("[%s] Retrying fragment %d", wid, i)
await self._frag_urls.put((i, (url, tries + 1)))
else:
self._logger.error(
"[%s] Gave up on fragment %d after %d tries",
wid,
i,
tries,
)
await self._frag_data.put((i, b""))
else:
await self._frag_data.put((i, await resp.read()))
except Exception as ex:
self._logger.error(wid, "Unhandled exception:", ex)
except asyncio.CancelledError:
self._logger.debug("[%s] worker cancelled", wid)
async def _download(self):
tasks = []
try:
if self._threads > 1:
self._logger.info("Downloading with %d threads", self._threads)
if self._threads > 8:
self._logger.warn("Using more than 8 threads is not recommended")
tasks = [
asyncio.create_task(self._download_worker(i))
for i in range(self._threads)
]
self._logger.debug("Starting queue worker")
await self._fill_queue()
self._logger.debug("Queue finished")
for task in tasks:
task.cancel()
await task
self._logger.debug("Workers quit")
except asyncio.CancelledError:
self._logger.debug("_download cancelled")
for task in tasks:
task.cancel()
await task
async def _read(self, index):
while True:
p, frag = await self._frag_data.get()
if p == index:
return frag
await self._frag_data.put((p, frag))
await asyncio.sleep(0.1)
async def read(self):
try:
if self._download_task is None:
self._download_task = asyncio.create_task(self._download())
index = 0
while True:
yield await self._read(index)
index += 1
except asyncio.CancelledError:
self._logger.debug("read cancelled")
if self._download_task is not None:
self._download_task.cancel()
await self._download_task
def parse_cookies_file(cookies_file):
cookies = http.cookies.SimpleCookie()
with open(cookies_file, "r") as cf:
for line in cf:
try:
domain, _flag, path, secure, _expiration, name, value = [
t.strip() for t in line.split("\t")
]
cookies[name] = value
cookies[name]["domain"] = domain.replace("#HttpOnly_", "")
cookies[name]["path"] = path
cookies[name]["secure"] = secure
cookies[name]["httponly"] = domain.startswith("#HttpOnly_")
except Exception as ex:
pass
return cookies
def sizeof_fmt(num, suffix="B"):
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
async def main():
logger = logging.Logger("main")
parser = argparse.ArgumentParser()
parser.add_argument("url", help="HLS playlist URL")
parser.add_argument(
"--threads",
type=int,
default=1,
help="The size of the thread pool used to download segments. Default is 1.",
)
parser.add_argument("--cookies", help="Path to a cookies file.")
parser.add_argument(
"--force-output",
type=bool,
help="Dump video into stdout even when attached to a tty.",
)
args = parser.parse_args()
# Check tty
if sys.stdout.isatty() and not args.force_output:
logger.error("This program will dump raw video data into stdout.")
logger.error("Please pipe the output to ffmpeg or other suitable programs.")
logger.error("To ignore this warning, pass --force-output.")
return
# Load cookies
cookie_jar = aiohttp.CookieJar()
cookies_file = args.cookies
if cookies_file is not None:
logger.info("Loading cookies from %s", cookies_file)
cookies = parse_cookies_file(cookies_file)
cookie_jar.update_cookies(cookies)
# Create a new session
session = aiohttp.ClientSession(cookie_jar=cookie_jar)
# Set up HLS downloader
async with HLSDownloader(session, args.url, args.threads) as hls:
n_frags = 0
total_size = 0
async for frag in hls.read():
n_frags += 1
total_size += len(frag)
sys.stdout.buffer.write(frag)
sys.stderr.write(
"Downloaded %d fragments, %s\r" % (n_frags, sizeof_fmt(total_size))
)
logger.info("Done")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment