Last active
April 7, 2019 13:38
-
-
Save barbolani/b26e9a3c090b2ce4c8e183d90022ce5b to your computer and use it in GitHub Desktop.
Script that parses an m3u file, downloads its components and sets everything up to assemble all components as a single mp4 file using ffmpeg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sys | |
import urllib.request | |
from urllib.parse import urlparse, urlunparse, ParseResult, urljoin | |
class M3UFile: | |
x_stream_inf = re.compile(r'#EXT-X-STREAM-INF:(?:.*),RESOLUTION=(\d+)x(\d+)(?:,(?:.*))*') | |
def parse_video_xstream_inf(self, lines): | |
"""Parses the EXT-X-STREAM-INF entries and returns them sorted, higher resolution first | |
:return: a list of tuples (line, content_of_next_line, (width, height)) of the streams | |
""" | |
line_groups = filter(lambda l: l[2], ((line, lines[i+1], self.x_stream_inf.match(line)) for i, line in enumerate(lines[:-1]))) | |
line_stats = [(l, url_loc, tuple(int(size) for size in g.groups())) for l, url_loc, g in line_groups] | |
return sorted(line_stats, key=lambda line: line[2][0] * line[2][1], reverse=True) | |
def parse_audio_xstream_inf(self, lines, audio_id): | |
x_stream_regex = re.compile('#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="{}"(?:.*),URI="(.*)"'.format(audio_id)) | |
for line in lines: | |
matches = x_stream_regex.match(line) | |
if matches: | |
return line, matches.groups(0)[0] | |
def parse_chunk_list(self, chunk_list): | |
"""Parses a chunk list and returns a list of media query parameters | |
""" | |
return (chunk_list[i + 1] for i in range(len(chunk_list) - 1) if chunk_list[i] == '#EXTINF:10.0,') | |
def derive_url_from_base(self, base_url, fragment_url): | |
components = urlparse(base_url) | |
new_url_components = ParseResult(components.scheme, | |
components.netloc, | |
components.path, | |
'', # No parameters | |
'', # No query string | |
components.fragment) | |
new_url = urlunparse(new_url_components) | |
fragment_components = urlparse(fragment_url) | |
return '{}?{}'.format(urljoin(new_url, fragment_components.path), fragment_components.query) | |
def download_chunk_list(self, url, chosen_resolution_url, prefix, out_dir): | |
out_files = [] | |
chunk_url = m3u_file.derive_url_from_base(url, chosen_resolution_url) | |
with urllib.request.urlopen(chunk_url) as chunk_list: | |
file_lines = chunk_list.read().decode('utf8').split('\n') | |
chunk_sources = m3u_file.parse_chunk_list(file_lines) | |
for i, component_source in enumerate(chunk_sources): | |
component_url = m3u_file.derive_url_from_base(url, component_source) | |
with urllib.request.urlopen(component_url) as component_content: | |
out_file_name = os.path.join(out_dir, '{}.ts'.format(i)) | |
out_files.append(out_file_name) | |
with open(out_file_name, 'wb') as out_file: | |
print('Writing {} to {}'.format(i, out_file_name)) | |
out_file.write(component_content.read()) | |
return out_files | |
def download(self, chosen_res, out_dir, url): | |
with urllib.request.urlopen(url) as m3u_spec: | |
file_lines = m3u_spec.read().decode('utf8').split('\n') | |
xstream_sources = m3u_file.parse_video_xstream_inf(file_lines) | |
print('Available resolutions') | |
for source in xstream_sources: | |
print(source[2]) | |
print('Your chosen resolution is') | |
chosen_res = xstream_sources[chosen_res] | |
print(chosen_res[2]) | |
print('Pres CTRL+C to stop, or ENTER key to proceed with download and setup') | |
sys.stdin.read(1) | |
video_folder = os.path.join(out_dir, 'video') | |
os.makedirs(video_folder, mode=0o777, exist_ok=True) | |
video_files = m3u_file.download_chunk_list(url, chosen_res[1], 'video', video_folder) | |
concat_folder = video_folder | |
# We should really parse the ID of the audio stream and use it to locate the right audio | |
# stream info. But we are too lazy for that and just assume that you'll always want the one | |
# with the 'aac' id | |
audio_components = m3u_file.parse_audio_xstream_inf(file_lines, 'aac') | |
if audio_components: | |
audio_folder = os.path.join(out_dir, 'audio') | |
os.makedirs(audio_folder, mode=0o777, exist_ok=True) | |
audio_files = m3u_file.download_chunk_list(url, audio_components[1], 'audio', audio_folder) | |
mux_folder = os.path.join(out_dir, 'muxed') | |
os.makedirs(mux_folder, mode=0o777, exist_ok=True) | |
with open('ffmpeg_mux', 'wt') as cmd_file: | |
cmd_file.write('\n'.join('ffmpeg -i {} -i {} -c copy {}'.format(v_f, a_f, os.path.join(mux_folder, os.path.basename(v_f))) for v_f, a_f in zip(video_files, audio_files))) | |
print('First do a chmod +x ffmpeg_mux and run that file') | |
concat_folder = mux_folder | |
with open('ffmpeg_input.txt', 'wt') as cmd_file: | |
cmd_file.write('\n'.join("file '{}'".format(os.path.join(concat_folder, os.path.basename(v_f))) for v_f in video_files)) | |
# Too lazy to bother automate the following steps but at least these are the instructions | |
print('Then run ffmpeg -f concat -i ffmpeg_input.txt -c copy <Output File>.mp4') | |
print('Once everything is ok you can delete the {} folder and the ffmpeg_mux and ffmpeg_input files'.format(out_dir)) | |
if __name__ == '__main__': | |
"""Crude and imperfect script to retrieve all components from a m3u file and prepare to assemble | |
in a single mp4 file | |
""" | |
print('Arguments: index of content in playlist, folder used for temporary files, playlist (m3u) URL') | |
m3u_file = M3UFile() | |
m3u_file.download(int(sys.argv[1]), sys.argv[2], sys.argv[3]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment