Skip to content

Instantly share code, notes, and snippets.

@anug7
Created January 29, 2024 07:11
Show Gist options
  • Save anug7/4a11052c9d351ce163fe55cc31ddba06 to your computer and use it in GitHub Desktop.
Save anug7/4a11052c9d351ce163fe55cc31ddba06 to your computer and use it in GitHub Desktop.
Python HLS/TS streaming (m3u8) - Downloader
"""
Download script to download the m3u8 video stream files
"""
import multiprocessing as mp
from multiprocessing.pool import ThreadPool
import os
import subprocess
import sys
from threading import Lock
from typing import List
from progress.bar import Bar
import requests
def download_and_save_file(base_url: str, video_op: str, seg_name: str):
"""
Downloads a chunk of the video
@param: base_url: absolute url for the video segments
@param: seg_name: segment name
@param: video_op: output directory where segments are downloaded
"""
url = os.path.join(base_url, seg_name)
op_path = os.path.join(video_op, seg_name)
try:
# print(f'Downloading: {base_path}')
if os.path.exists(op_path):
file_size = os.path.getsize(op_path)
if file_size > 40000:
return op_path
op = requests.request('GET', url)
if isinstance(op.content, str) or 'Too many' in str(op.content):
print(f'Retrying....{seg_name}')
return download_and_save_file(base_url, video_op, seg_name)
if not os.path.exists(op_path):
with open(op_path, 'wb') as fp:
fp.write(op.content)
else:
if file_size <= len(op.content):
print(f'{seg_name} size updated: {file_size} -> {len(op.content)}')
os.remove(op_path)
with open(op_path, 'wb') as fp:
fp.write(op.content)
except Exception as exp:
print(exp)
return download_and_save_file(base_url, video_op, seg_name)
return op_path
def find_segments(base_url: str):
"""
finds no of video segments in the video
@param: base_url: url of the video
"""
exts = os.path.basename(base_url).split('.')
if len(exts) > 1 and exts[1] == 'm3u8':
furl = base_url
else:
furl = os.path.join(base_url, 'video.m3u8')
print(f'Seg url: {furl}')
try:
op = requests.request('GET', furl)
if isinstance(op.content, str) and 'Too many' in op.content:
print('Retrying...Getting manifest')
return find_segments(base_url)
nlines = str(op.content).split('\\n')
seg_names = []
if nlines[-2] == '#EXT-X-ENDLIST':
counts, idx = len(nlines), 0
while idx < counts:
if nlines[idx] == '#EXT-X-ENDLIST':
break
elif '#EXTINF' in nlines[idx]:
seg_names.append(nlines[idx + 1])
idx += 2
else:
idx += 1
return seg_names
except Exception as exp:
print(exp)
return find_segments(base_url)
def merge_files(files: List[str], op_file: str):
"""
Merges video segments into a full mp4 file
"""
ofp = open(op_file, 'wb')
for ff in files:
fp = open(ff, 'rb')
ofp.write(fp.read())
ofp.close()
def cleaning_with_ffmpeg(orig_file: str, op_file: str):
"""
Cleans merged file with ffmpeg
"""
if os.path.exists(orig_file):
subprocess.call(['ffmpeg', '-i', f'{orig_file}', '-c', 'copy', f'{op_file}'])
else:
print(f"file {orig_file} doesn't exist")
if __name__ == '__main__':
global cidx
args = sys.argv
if len(args) < 4:
print('Usage: download_ts.py <url> <tmp> <op_path> <nthread>')
sys.exit(-1)
url = args[1]
tmp_path = args[2]
op_path = args[3]
no_threads = 1
if len(args) > 4:
no_threads = int(args[4], 10)
os.makedirs(tmp_path, exist_ok=True)
seg_names = find_segments(url)
cidx = 0
clock = Lock()
print(f'Spawning {no_threads} threads\n')
pbar = Bar('Downloading...', max=len(seg_names))
def fn(_sname):
burl = os.path.dirname(url)
nret = download_and_save_file(burl, tmp_path, _sname)
global cidx
with clock:
cidx += 1
pbar.next()
return nret
rets = []
with mp.pool.ThreadPool(processes=no_threads) as pool:
rets = pool.map(fn, seg_names)
rets = sorted(rets, key=lambda x: int(x.split('-')[1]))
tt_file = os.path.join(tmp_path, 'tmp.mp4')
merge_files(rets, tt_file)
cleaning_with_ffmpeg(tt_file, op_path)
@anug7
Copy link
Author

anug7 commented Jan 29, 2024

Requires ffmpeg to cleanup timestamps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment