Skip to content

Instantly share code, notes, and snippets.

@ekimekim
Created November 26, 2018 15:48
Show Gist options
  • Save ekimekim/6d2c80adc09bb6c5f038028e8a648777 to your computer and use it in GitHub Desktop.
Save ekimekim/6d2c80adc09bb6c5f038028e8a648777 to your computer and use it in GitHub Desktop.
# Taken from streamlink
import re
from binascii import unhexlify
from collections import namedtuple
from itertools import starmap
from urlparse import urljoin, urlparse
# EXT-X-BYTERANGE
ByteRange = namedtuple("ByteRange", "range offset")
# EXT-X-KEY
Key = namedtuple("Key", "method uri iv key_format key_format_versions")
# EXT-X-MAP
Map = namedtuple("Map", "uri byterange")
# EXT-X-MEDIA
Media = namedtuple("Media", "uri type group_id language name default "
"autoselect forced characteristics")
# EXT-X-START
Start = namedtuple("Start", "time_offset precise")
# EXT-X-STREAM-INF
StreamInfo = namedtuple("StreamInfo", "bandwidth program_id codecs resolution "
"audio video subtitles")
# EXT-X-I-FRAME-STREAM-INF
IFrameStreamInfo = namedtuple("IFrameStreamInfo", "bandwidth program_id "
"codecs resolution video")
Playlist = namedtuple("Playlist", "uri stream_info media is_iframe")
Resolution = namedtuple("Resolution", "width height")
Segment = namedtuple("Segment", "uri duration title key discontinuity "
"byterange date map")
class M3U8(object):
def __init__(self):
self.is_endlist = False
self.is_master = False
self.allow_cache = None
self.discontinuity_sequence = None
self.iframes_only = None
self.media_sequence = None
self.playlist_type = None
self.target_duration = None
self.start = None
self.version = None
self.media = []
self.playlists = []
self.segments = []
class M3U8Parser(object):
_extinf_re = re.compile(r"(?P<duration>\d+(\.\d+)?)(,(?P<title>.+))?")
_attr_re = re.compile(r"([A-Z\-]+)=(\d+\.\d+|0x[0-9A-z]+|\d+x\d+|\d+|\"(.+?)\"|[0-9A-z\-]+)")
_range_re = re.compile(r"(?P<range>\d+)(@(?P<offset>.+))?")
_tag_re = re.compile(r"#(?P<tag>[\w-]+)(:(?P<value>.+))?")
_res_re = re.compile(r"(\d+)x(\d+)")
def __init__(self, base_uri=None):
self.base_uri = base_uri
def create_stream_info(self, streaminf, cls=None):
program_id = streaminf.get("PROGRAM-ID")
bandwidth = streaminf.get("BANDWIDTH")
if bandwidth:
bandwidth = float(bandwidth)
resolution = streaminf.get("RESOLUTION")
if resolution:
resolution = self.parse_resolution(resolution)
codecs = streaminf.get("CODECS")
if codecs:
codecs = codecs.split(",")
else:
codecs = []
if cls == IFrameStreamInfo:
return IFrameStreamInfo(bandwidth, program_id, codecs, resolution,
streaminf.get("VIDEO"))
else:
return StreamInfo(bandwidth, program_id, codecs, resolution,
streaminf.get("AUDIO"), streaminf.get("VIDEO"),
streaminf.get("SUBTITLES"))
def split_tag(self, line):
match = self._tag_re.match(line)
if match:
return match.group("tag"), (match.group("value") or "").strip()
return None, None
def parse_attributes(self, value):
def map_attribute(key, value, quoted):
return (key, quoted or value)
attr = self._attr_re.findall(value)
return dict(starmap(map_attribute, attr))
def parse_bool(self, value):
return value == "YES"
def parse_byterange(self, value):
match = self._range_re.match(value)
if match:
return ByteRange(int(match.group("range")),
int(match.group("offset") or 0))
def parse_extinf(self, value):
match = self._extinf_re.match(value)
if match:
return float(match.group("duration")), match.group("title")
return (0, None)
def parse_hex(self, value):
value = value[2:]
if len(value) % 2:
value = "0" + value
return unhexlify(value)
def parse_resolution(self, value):
match = self._res_re.match(value)
if match:
width, height = int(match.group(1)), int(match.group(2))
else:
width, height = 0, 0
return Resolution(width, height)
def parse_tag(self, line, transform=None):
tag, value = self.split_tag(line)
if transform:
value = transform(value)
return value
def parse_line(self, line):
if not line.startswith("#"):
if self.state.pop("expect_segment", None):
byterange = self.state.pop("byterange", None)
extinf = self.state.pop("extinf", (0, None))
date = self.state.pop("date", None)
map_ = self.state.get("map")
key = self.state.get("key")
segment = Segment(self.uri(line), extinf[0],
extinf[1], key,
self.state.pop("discontinuity", False),
byterange, date, map_)
self.m3u8.segments.append(segment)
elif self.state.pop("expect_playlist", None):
streaminf = self.state.pop("streaminf", {})
stream_info = self.create_stream_info(streaminf)
playlist = Playlist(self.uri(line), stream_info, [], False)
self.m3u8.playlists.append(playlist)
elif line.startswith("#EXTINF"):
self.state["expect_segment"] = True
self.state["extinf"] = self.parse_tag(line, self.parse_extinf)
elif line.startswith("#EXT-X-BYTERANGE"):
self.state["expect_segment"] = True
self.state["byterange"] = self.parse_tag(line, self.parse_byterange)
elif line.startswith("#EXT-X-TARGETDURATION"):
self.m3u8.target_duration = self.parse_tag(line, int)
elif line.startswith("#EXT-X-MEDIA-SEQUENCE"):
self.m3u8.media_sequence = self.parse_tag(line, int)
elif line.startswith("#EXT-X-KEY"):
attr = self.parse_tag(line, self.parse_attributes)
iv = attr.get("IV")
if iv:
iv = self.parse_hex(iv)
self.state["key"] = Key(attr.get("METHOD"),
self.uri(attr.get("URI")),
iv, attr.get("KEYFORMAT"),
attr.get("KEYFORMATVERSIONS"))
elif line.startswith("#EXT-X-PROGRAM-DATE-TIME"):
self.state["date"] = self.parse_tag(line)
elif line.startswith("#EXT-X-ALLOW-CACHE"):
self.m3u8.allow_cache = self.parse_tag(line, self.parse_bool)
elif line.startswith("#EXT-X-STREAM-INF"):
self.state["streaminf"] = self.parse_tag(line, self.parse_attributes)
self.state["expect_playlist"] = True
elif line.startswith("#EXT-X-PLAYLIST-TYPE"):
self.m3u8.playlist_type = self.parse_tag(line)
elif line.startswith("#EXT-X-ENDLIST"):
self.m3u8.is_endlist = True
elif line.startswith("#EXT-X-MEDIA"):
attr = self.parse_tag(line, self.parse_attributes)
media = Media(self.uri(attr.get("URI")), attr.get("TYPE"),
attr.get("GROUP-ID"), attr.get("LANGUAGE"),
attr.get("NAME"),
self.parse_bool(attr.get("DEFAULT")),
self.parse_bool(attr.get("AUTOSELECT")),
self.parse_bool(attr.get("FORCED")),
attr.get("CHARACTERISTICS"))
self.m3u8.media.append(media)
elif line.startswith("#EXT-X-DISCONTINUITY"):
self.state["discontinuity"] = True
self.state["map"] = None
elif line.startswith("#EXT-X-DISCONTINUITY-SEQUENCE"):
self.m3u8.discontinuity_sequence = self.parse_tag(line, int)
elif line.startswith("#EXT-X-I-FRAMES-ONLY"):
self.m3u8.iframes_only = True
elif line.startswith("#EXT-X-MAP"):
attr = self.parse_tag(line, self.parse_attributes)
byterange = self.parse_byterange(attr.get("BYTERANGE", ""))
self.state["map"] = Map(attr.get("URI"), byterange)
elif line.startswith("#EXT-X-I-FRAME-STREAM-INF"):
attr = self.parse_tag(line, self.parse_attributes)
streaminf = self.state.pop("streaminf", attr)
stream_info = self.create_stream_info(streaminf, IFrameStreamInfo)
playlist = Playlist(self.uri(attr.get("URI")), stream_info, [], True)
self.m3u8.playlists.append(playlist)
elif line.startswith("#EXT-X-VERSION"):
self.m3u8.version = self.parse_tag(line, int)
elif line.startswith("#EXT-X-START"):
attr = self.parse_tag(line, self.parse_attributes)
start = Start(attr.get("TIME-OFFSET"),
self.parse_bool(attr.get("PRECISE", "NO")))
self.m3u8.start = start
def parse(self, data):
self.state = {}
self.m3u8 = M3U8()
lines = iter(filter(bool, data.splitlines()))
try:
line = next(lines)
except StopIteration:
return self.m3u8
else:
if not line.startswith("#EXTM3U"):
raise ValueError("Missing #EXTM3U header")
parse_line = self.parse_line
for line in lines:
parse_line(line)
# Associate Media entries with each Playlist
for playlist in self.m3u8.playlists:
for media_type in ("audio", "video", "subtitles"):
group_id = getattr(playlist.stream_info, media_type, None)
if group_id:
for media in filter(lambda m: m.group_id == group_id,
self.m3u8.media):
playlist.media.append(media)
self.m3u8.is_master = not not self.m3u8.playlists
return self.m3u8
def uri(self, uri):
if uri and urlparse(uri).scheme:
return uri
elif self.base_uri and uri:
return urljoin(self.base_uri, uri)
else:
return uri
def load(data, base_uri=None, parser=M3U8Parser):
"""Attempts to parse a M3U8 playlist from a string of data.
If specified, *base_uri* is the base URI that relative URIs will
be joined together with, otherwise relative URIs will be as is.
If specified, *parser* can be a M3U8Parser subclass to be used
to parse the data.
"""
return parser(base_uri).parse(data)
try:
import gevent.monkey
except ImportError:
pass
else:
gevent.monkey.patch_all()
import base64
import hashlib
import os
import random
import urlparse
from datetime import datetime, timedelta
from itertools import groupby, count
import dateutil.parser
import requests
import hls_playlist
def restructure_master_playlist(playlist):
"""Read master playlist and create structure as follows:
{
variant name: (variant, {
media name: media,
...
}),
...
}
The main goal here is just to associate everything with a unique name.
"""
output = {}
for variant_num, variant in enumerate(playlist.playlists):
variant_name = "({}) {}".format(variant_num, get_variant_name(variant))
media_names = {}
for media_type, medias in groupby(variant.media, lambda m: m.type):
for media_num, media in enumerate(medias):
media_name = "({} {}) {}".format(media_type, media_num, media.name or "<unknown>")
media_names[media_name] = media
output[variant_name] = variant, media_names
return output
def get_media_playlists(playlist):
"""Extract list of (name, uri) media playlists from master"""
results = []
for variant_name, (variant, medias) in sorted(playlist.items()):
if variant.uri:
results.append((variant_name, variant.uri))
for media_name, media in sorted(medias.items()):
if media.uri:
name = "{} / {}".format(variant_name, media_name)
results.append((name, media.uri))
return results
def get_variant_name(variant):
"""For a given media variant in a master playlist, pick a name."""
video_names = set(
media.name
for media in variant.media
if media.type == "VIDEO" and media.name
)
if not video_names:
return "<unknown>"
return "/".join(video_names)
def format_default(value, default, fn):
"""If value is true, return fn(value), otherwise return default.
Intended to be a helper for formatting values that may be not present.
"""
if value:
return fn(value)
return default
def format_uri(uri):
"""Edits down crazy long URIs to something eyeballable.
Any individual path part longer than THRESHOLD will be hashed and the hash given
inside curly bracket. If it contains a dot and is the final part, the extension is left alone.
Example: https://example.com/foo/something-really-long/bar/some-really-big-name.m3u8
Output: https://example.com/foo/{1a6Udfzz}/bar/{Bx56jaHy}.m3u8
"""
THRESHOLD = 40 # 40 is big enough for uuids, anything longer is dumb
scheme, netloc, path, query, fragment = urlparse.urlsplit(uri)
path = path.split('/')
for i, part in enumerate(path):
ext = ''
if i == len(path) - 1:
part, ext = os.path.splitext(part)
if len(part) > THRESHOLD:
part = "{%s}" % base64.b64encode(hashlib.sha256(part).digest()[:6], "-_")
path[i] = ''.join((part, ext))
path = '/'.join(path)
return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
def format_master_playlist(playlist):
"""Return a human-readable master playlist digest"""
lines = []
for variant_name, (variant, medias) in sorted(playlist.items()):
lines.append("{name}: {iframe}{res} {bitrate}kb/s {uri}".format(
name=variant_name,
iframe="IFRAME " if variant.is_iframe else "",
res=format_default(
variant.stream_info.resolution, "NO RES", "{0.width}x{0.height}".format,
),
bitrate=format_default(
variant.stream_info.bandwidth, "NO BITRATE", lambda br: "{:.2f}".format(br/1024.),
),
uri=format_default(variant.uri, "NO URI", format_uri),
))
for media_name, media in sorted(medias.items()):
attrs = [attr for attr in ("default", "forced", "autoselect") if getattr(media, attr)]
if media.language:
attrs.append(media.language)
attrs = " ".join(attrs) if attrs else "NO ATTRS"
lines.append("\t{name}: {attrs} {uri}".format(
name=media_name, attrs=attrs,
uri=format_default(media.uri, "NO URI", format_uri),
))
return "\n".join(lines)
def format_media_playlist(name, uri, start, end, playlist):
"""Return a human-readable media playlist digest.
As media playlist contents vary with time, the time range in which the GET occurred
is provided."""
attrs = [attr for attr in ("iframes_only", "is_endlist") if getattr(playlist, attr)]
attrs = " ".join(attrs) if attrs else "no attrs"
lines = [
"{name}: {uri} at {start}/{range:.3f}s".format(
name=name, uri=format_uri(uri),
start=start.isoformat(), range=(end-start).total_seconds(),
),
"\tv{p.version}, target {p.target_duration:.3f}s, {attrs}, sequences {p.discontinuity_sequence}/{p.media_sequence}, start {p.start}".format(
p=playlist, attrs=attrs,
),
]
dis_seq = playlist.discontinuity_sequence or 0
media_seq = playlist.media_sequence or 0
timestamp = None
for segment in playlist.segments:
if segment.date:
timestamp = dateutil.parser.parse(segment.date)
lines.append(
"\t{s.title} {dis_seq}/{media_seq} {timestamp}/{s.duration:.3f}s {dis}{byterange}{uri}".format(
s=segment,
dis_seq=dis_seq, media_seq=media_seq,
timestamp="<unknown time>" if timestamp is None else timestamp.isoformat(),
dis="DISCONTINUITY " if segment.discontinuity else "",
byterange="" if segment.byterange is None else "{} of ".format(segment.byterange),
uri=format_uri(segment.uri),
)
)
if segment.duration is None:
# duration unknown, so can't get next timestamp from prev timestamp
timestamp = None
elif timestamp is not None:
timestamp += timedelta(seconds=segment.duration)
if segment.discontinuity:
dis_seq += 1
media_seq += 1
return "\n".join(lines)
def get_master_playlist(channel):
"""Get the master playlist for given channel from twitch"""
resp = requests.get(
"https://api.twitch.tv/api/channels/{}/access_token.json".format(channel),
params={'as3': 't'},
headers={
'Accept': 'application/vnd.twitchtv.v3+json',
'Client-ID': 'pwkzresl8kj2rdj6g7bvxl9ys1wly3j',
},
)
resp.raise_for_status() # getting access token
token = resp.json()
resp = requests.get(
"https://usher.ttvnw.net/api/channel/hls/{}.m3u8".format(channel),
params={
# Taken from streamlink. Unsure what's needed and what changing things can do.
"player": "twitchweb",
"p": random.randrange(1000000),
"type": "any",
"allow_source": "true",
"allow_audio_only": "true",
"allow_spectre": "false",
"fast_bread": "True",
"sig": token["sig"],
"token": token["token"],
# Also observed in the wild but not used in streamlink:
# "playlist_include_framerate": "true"
# "reassignments_supported": "true"
},
)
resp.raise_for_status() # getting master playlist
playlist = hls_playlist.load(resp.text, base_uri=resp.url)
playlist = restructure_master_playlist(playlist)
return playlist
def get_media_playlist(uri):
resp = requests.get(uri)
resp.raise_for_status() # getting media playlist
return hls_playlist.load(resp.text, base_uri=resp.url)
def get_segment_hash(uri):
resp = requests.get(uri)
resp.raise_for_status() # getting segment
return base64.b64encode(hashlib.sha256(resp.content).digest()[:30], "-_")
def main(channel, all=False, content=False):
playlist = get_master_playlist(channel)
print format_master_playlist(playlist)
print
if all:
medias = get_media_playlists(playlist)
else:
name, (variant, _) = max(playlist.items(), key=lambda (n, (p, _)): p.stream_info.bandwidth)
medias = [(name, variant.uri)]
segments = {}
for name, uri in medias:
start = datetime.utcnow()
media_playlist = get_media_playlist(uri)
end = datetime.utcnow()
for seq, segment in zip(count(media_playlist.media_sequence), media_playlist.segments):
segments["{} #{}".format(name, seq)] = segment.uri
print format_media_playlist(name, uri, start, end, media_playlist)
if not content:
return
print
try:
import gevent
except ImportError:
for name, uri in sorted(segments.items()):
print "{}: {}".format(name, get_segment_hash(uri))
else:
workers = {name: gevent.spawn(get_segment_hash, uri) for name, uri in segments.items()}
for name, worker in sorted(workers.items()):
print "{}: {}".format(name, worker.get())
if __name__ == '__main__':
import argh
argh.dispatch_command(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment