Skip to content

Instantly share code, notes, and snippets.

@barbolani
Last active April 7, 2019 13:38
Show Gist options
  • Save barbolani/b26e9a3c090b2ce4c8e183d90022ce5b to your computer and use it in GitHub Desktop.
Save barbolani/b26e9a3c090b2ce4c8e183d90022ce5b to your computer and use it in GitHub Desktop.
Script that parses an m3u file, downloads its components and sets everything up to assemble all components as a single mp4 file using ffmpeg
import os
import re
import sys
import urllib.request
from urllib.parse import urlparse, urlunparse, ParseResult, urljoin
class M3UFile:
x_stream_inf = re.compile(r'#EXT-X-STREAM-INF:(?:.*),RESOLUTION=(\d+)x(\d+)(?:,(?:.*))*')
def parse_video_xstream_inf(self, lines):
"""Parses the EXT-X-STREAM-INF entries and returns them sorted, higher resolution first
:return: a list of tuples (line, content_of_next_line, (width, height)) of the streams
"""
line_groups = filter(lambda l: l[2], ((line, lines[i+1], self.x_stream_inf.match(line)) for i, line in enumerate(lines[:-1])))
line_stats = [(l, url_loc, tuple(int(size) for size in g.groups())) for l, url_loc, g in line_groups]
return sorted(line_stats, key=lambda line: line[2][0] * line[2][1], reverse=True)
def parse_audio_xstream_inf(self, lines, audio_id):
x_stream_regex = re.compile('#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="{}"(?:.*),URI="(.*)"'.format(audio_id))
for line in lines:
matches = x_stream_regex.match(line)
if matches:
return line, matches.groups(0)[0]
def parse_chunk_list(self, chunk_list):
"""Parses a chunk list and returns a list of media query parameters
"""
return (chunk_list[i + 1] for i in range(len(chunk_list) - 1) if chunk_list[i] == '#EXTINF:10.0,')
def derive_url_from_base(self, base_url, fragment_url):
components = urlparse(base_url)
new_url_components = ParseResult(components.scheme,
components.netloc,
components.path,
'', # No parameters
'', # No query string
components.fragment)
new_url = urlunparse(new_url_components)
fragment_components = urlparse(fragment_url)
return '{}?{}'.format(urljoin(new_url, fragment_components.path), fragment_components.query)
def download_chunk_list(self, url, chosen_resolution_url, prefix, out_dir):
out_files = []
chunk_url = m3u_file.derive_url_from_base(url, chosen_resolution_url)
with urllib.request.urlopen(chunk_url) as chunk_list:
file_lines = chunk_list.read().decode('utf8').split('\n')
chunk_sources = m3u_file.parse_chunk_list(file_lines)
for i, component_source in enumerate(chunk_sources):
component_url = m3u_file.derive_url_from_base(url, component_source)
with urllib.request.urlopen(component_url) as component_content:
out_file_name = os.path.join(out_dir, '{}.ts'.format(i))
out_files.append(out_file_name)
with open(out_file_name, 'wb') as out_file:
print('Writing {} to {}'.format(i, out_file_name))
out_file.write(component_content.read())
return out_files
def download(self, chosen_res, out_dir, url):
with urllib.request.urlopen(url) as m3u_spec:
file_lines = m3u_spec.read().decode('utf8').split('\n')
xstream_sources = m3u_file.parse_video_xstream_inf(file_lines)
print('Available resolutions')
for source in xstream_sources:
print(source[2])
print('Your chosen resolution is')
chosen_res = xstream_sources[chosen_res]
print(chosen_res[2])
print('Pres CTRL+C to stop, or ENTER key to proceed with download and setup')
sys.stdin.read(1)
video_folder = os.path.join(out_dir, 'video')
os.makedirs(video_folder, mode=0o777, exist_ok=True)
video_files = m3u_file.download_chunk_list(url, chosen_res[1], 'video', video_folder)
concat_folder = video_folder
# We should really parse the ID of the audio stream and use it to locate the right audio
# stream info. But we are too lazy for that and just assume that you'll always want the one
# with the 'aac' id
audio_components = m3u_file.parse_audio_xstream_inf(file_lines, 'aac')
if audio_components:
audio_folder = os.path.join(out_dir, 'audio')
os.makedirs(audio_folder, mode=0o777, exist_ok=True)
audio_files = m3u_file.download_chunk_list(url, audio_components[1], 'audio', audio_folder)
mux_folder = os.path.join(out_dir, 'muxed')
os.makedirs(mux_folder, mode=0o777, exist_ok=True)
with open('ffmpeg_mux', 'wt') as cmd_file:
cmd_file.write('\n'.join('ffmpeg -i {} -i {} -c copy {}'.format(v_f, a_f, os.path.join(mux_folder, os.path.basename(v_f))) for v_f, a_f in zip(video_files, audio_files)))
print('First do a chmod +x ffmpeg_mux and run that file')
concat_folder = mux_folder
with open('ffmpeg_input.txt', 'wt') as cmd_file:
cmd_file.write('\n'.join("file '{}'".format(os.path.join(concat_folder, os.path.basename(v_f))) for v_f in video_files))
# Too lazy to bother automate the following steps but at least these are the instructions
print('Then run ffmpeg -f concat -i ffmpeg_input.txt -c copy <Output File>.mp4')
print('Once everything is ok you can delete the {} folder and the ffmpeg_mux and ffmpeg_input files'.format(out_dir))
if __name__ == '__main__':
"""Crude and imperfect script to retrieve all components from a m3u file and prepare to assemble
in a single mp4 file
"""
print('Arguments: index of content in playlist, folder used for temporary files, playlist (m3u) URL')
m3u_file = M3UFile()
m3u_file.download(int(sys.argv[1]), sys.argv[2], sys.argv[3])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment