Created
November 26, 2018 15:48
-
-
Save ekimekim/6d2c80adc09bb6c5f038028e8a648777 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Taken from streamlink | |
import re | |
from binascii import unhexlify | |
from collections import namedtuple | |
from itertools import starmap | |
from urlparse import urljoin, urlparse | |
# EXT-X-BYTERANGE | |
ByteRange = namedtuple("ByteRange", "range offset") | |
# EXT-X-KEY | |
Key = namedtuple("Key", "method uri iv key_format key_format_versions") | |
# EXT-X-MAP | |
Map = namedtuple("Map", "uri byterange") | |
# EXT-X-MEDIA | |
Media = namedtuple("Media", "uri type group_id language name default " | |
"autoselect forced characteristics") | |
# EXT-X-START | |
Start = namedtuple("Start", "time_offset precise") | |
# EXT-X-STREAM-INF | |
StreamInfo = namedtuple("StreamInfo", "bandwidth program_id codecs resolution " | |
"audio video subtitles") | |
# EXT-X-I-FRAME-STREAM-INF | |
IFrameStreamInfo = namedtuple("IFrameStreamInfo", "bandwidth program_id " | |
"codecs resolution video") | |
Playlist = namedtuple("Playlist", "uri stream_info media is_iframe") | |
Resolution = namedtuple("Resolution", "width height") | |
Segment = namedtuple("Segment", "uri duration title key discontinuity " | |
"byterange date map") | |
class M3U8(object): | |
def __init__(self): | |
self.is_endlist = False | |
self.is_master = False | |
self.allow_cache = None | |
self.discontinuity_sequence = None | |
self.iframes_only = None | |
self.media_sequence = None | |
self.playlist_type = None | |
self.target_duration = None | |
self.start = None | |
self.version = None | |
self.media = [] | |
self.playlists = [] | |
self.segments = [] | |
class M3U8Parser(object): | |
_extinf_re = re.compile(r"(?P<duration>\d+(\.\d+)?)(,(?P<title>.+))?") | |
_attr_re = re.compile(r"([A-Z\-]+)=(\d+\.\d+|0x[0-9A-z]+|\d+x\d+|\d+|\"(.+?)\"|[0-9A-z\-]+)") | |
_range_re = re.compile(r"(?P<range>\d+)(@(?P<offset>.+))?") | |
_tag_re = re.compile(r"#(?P<tag>[\w-]+)(:(?P<value>.+))?") | |
_res_re = re.compile(r"(\d+)x(\d+)") | |
def __init__(self, base_uri=None): | |
self.base_uri = base_uri | |
def create_stream_info(self, streaminf, cls=None): | |
program_id = streaminf.get("PROGRAM-ID") | |
bandwidth = streaminf.get("BANDWIDTH") | |
if bandwidth: | |
bandwidth = float(bandwidth) | |
resolution = streaminf.get("RESOLUTION") | |
if resolution: | |
resolution = self.parse_resolution(resolution) | |
codecs = streaminf.get("CODECS") | |
if codecs: | |
codecs = codecs.split(",") | |
else: | |
codecs = [] | |
if cls == IFrameStreamInfo: | |
return IFrameStreamInfo(bandwidth, program_id, codecs, resolution, | |
streaminf.get("VIDEO")) | |
else: | |
return StreamInfo(bandwidth, program_id, codecs, resolution, | |
streaminf.get("AUDIO"), streaminf.get("VIDEO"), | |
streaminf.get("SUBTITLES")) | |
def split_tag(self, line): | |
match = self._tag_re.match(line) | |
if match: | |
return match.group("tag"), (match.group("value") or "").strip() | |
return None, None | |
def parse_attributes(self, value): | |
def map_attribute(key, value, quoted): | |
return (key, quoted or value) | |
attr = self._attr_re.findall(value) | |
return dict(starmap(map_attribute, attr)) | |
def parse_bool(self, value): | |
return value == "YES" | |
def parse_byterange(self, value): | |
match = self._range_re.match(value) | |
if match: | |
return ByteRange(int(match.group("range")), | |
int(match.group("offset") or 0)) | |
def parse_extinf(self, value): | |
match = self._extinf_re.match(value) | |
if match: | |
return float(match.group("duration")), match.group("title") | |
return (0, None) | |
def parse_hex(self, value): | |
value = value[2:] | |
if len(value) % 2: | |
value = "0" + value | |
return unhexlify(value) | |
def parse_resolution(self, value): | |
match = self._res_re.match(value) | |
if match: | |
width, height = int(match.group(1)), int(match.group(2)) | |
else: | |
width, height = 0, 0 | |
return Resolution(width, height) | |
def parse_tag(self, line, transform=None): | |
tag, value = self.split_tag(line) | |
if transform: | |
value = transform(value) | |
return value | |
def parse_line(self, line): | |
if not line.startswith("#"): | |
if self.state.pop("expect_segment", None): | |
byterange = self.state.pop("byterange", None) | |
extinf = self.state.pop("extinf", (0, None)) | |
date = self.state.pop("date", None) | |
map_ = self.state.get("map") | |
key = self.state.get("key") | |
segment = Segment(self.uri(line), extinf[0], | |
extinf[1], key, | |
self.state.pop("discontinuity", False), | |
byterange, date, map_) | |
self.m3u8.segments.append(segment) | |
elif self.state.pop("expect_playlist", None): | |
streaminf = self.state.pop("streaminf", {}) | |
stream_info = self.create_stream_info(streaminf) | |
playlist = Playlist(self.uri(line), stream_info, [], False) | |
self.m3u8.playlists.append(playlist) | |
elif line.startswith("#EXTINF"): | |
self.state["expect_segment"] = True | |
self.state["extinf"] = self.parse_tag(line, self.parse_extinf) | |
elif line.startswith("#EXT-X-BYTERANGE"): | |
self.state["expect_segment"] = True | |
self.state["byterange"] = self.parse_tag(line, self.parse_byterange) | |
elif line.startswith("#EXT-X-TARGETDURATION"): | |
self.m3u8.target_duration = self.parse_tag(line, int) | |
elif line.startswith("#EXT-X-MEDIA-SEQUENCE"): | |
self.m3u8.media_sequence = self.parse_tag(line, int) | |
elif line.startswith("#EXT-X-KEY"): | |
attr = self.parse_tag(line, self.parse_attributes) | |
iv = attr.get("IV") | |
if iv: | |
iv = self.parse_hex(iv) | |
self.state["key"] = Key(attr.get("METHOD"), | |
self.uri(attr.get("URI")), | |
iv, attr.get("KEYFORMAT"), | |
attr.get("KEYFORMATVERSIONS")) | |
elif line.startswith("#EXT-X-PROGRAM-DATE-TIME"): | |
self.state["date"] = self.parse_tag(line) | |
elif line.startswith("#EXT-X-ALLOW-CACHE"): | |
self.m3u8.allow_cache = self.parse_tag(line, self.parse_bool) | |
elif line.startswith("#EXT-X-STREAM-INF"): | |
self.state["streaminf"] = self.parse_tag(line, self.parse_attributes) | |
self.state["expect_playlist"] = True | |
elif line.startswith("#EXT-X-PLAYLIST-TYPE"): | |
self.m3u8.playlist_type = self.parse_tag(line) | |
elif line.startswith("#EXT-X-ENDLIST"): | |
self.m3u8.is_endlist = True | |
elif line.startswith("#EXT-X-MEDIA"): | |
attr = self.parse_tag(line, self.parse_attributes) | |
media = Media(self.uri(attr.get("URI")), attr.get("TYPE"), | |
attr.get("GROUP-ID"), attr.get("LANGUAGE"), | |
attr.get("NAME"), | |
self.parse_bool(attr.get("DEFAULT")), | |
self.parse_bool(attr.get("AUTOSELECT")), | |
self.parse_bool(attr.get("FORCED")), | |
attr.get("CHARACTERISTICS")) | |
self.m3u8.media.append(media) | |
elif line.startswith("#EXT-X-DISCONTINUITY"): | |
self.state["discontinuity"] = True | |
self.state["map"] = None | |
elif line.startswith("#EXT-X-DISCONTINUITY-SEQUENCE"): | |
self.m3u8.discontinuity_sequence = self.parse_tag(line, int) | |
elif line.startswith("#EXT-X-I-FRAMES-ONLY"): | |
self.m3u8.iframes_only = True | |
elif line.startswith("#EXT-X-MAP"): | |
attr = self.parse_tag(line, self.parse_attributes) | |
byterange = self.parse_byterange(attr.get("BYTERANGE", "")) | |
self.state["map"] = Map(attr.get("URI"), byterange) | |
elif line.startswith("#EXT-X-I-FRAME-STREAM-INF"): | |
attr = self.parse_tag(line, self.parse_attributes) | |
streaminf = self.state.pop("streaminf", attr) | |
stream_info = self.create_stream_info(streaminf, IFrameStreamInfo) | |
playlist = Playlist(self.uri(attr.get("URI")), stream_info, [], True) | |
self.m3u8.playlists.append(playlist) | |
elif line.startswith("#EXT-X-VERSION"): | |
self.m3u8.version = self.parse_tag(line, int) | |
elif line.startswith("#EXT-X-START"): | |
attr = self.parse_tag(line, self.parse_attributes) | |
start = Start(attr.get("TIME-OFFSET"), | |
self.parse_bool(attr.get("PRECISE", "NO"))) | |
self.m3u8.start = start | |
def parse(self, data): | |
self.state = {} | |
self.m3u8 = M3U8() | |
lines = iter(filter(bool, data.splitlines())) | |
try: | |
line = next(lines) | |
except StopIteration: | |
return self.m3u8 | |
else: | |
if not line.startswith("#EXTM3U"): | |
raise ValueError("Missing #EXTM3U header") | |
parse_line = self.parse_line | |
for line in lines: | |
parse_line(line) | |
# Associate Media entries with each Playlist | |
for playlist in self.m3u8.playlists: | |
for media_type in ("audio", "video", "subtitles"): | |
group_id = getattr(playlist.stream_info, media_type, None) | |
if group_id: | |
for media in filter(lambda m: m.group_id == group_id, | |
self.m3u8.media): | |
playlist.media.append(media) | |
self.m3u8.is_master = not not self.m3u8.playlists | |
return self.m3u8 | |
def uri(self, uri): | |
if uri and urlparse(uri).scheme: | |
return uri | |
elif self.base_uri and uri: | |
return urljoin(self.base_uri, uri) | |
else: | |
return uri | |
def load(data, base_uri=None, parser=M3U8Parser): | |
"""Attempts to parse a M3U8 playlist from a string of data. | |
If specified, *base_uri* is the base URI that relative URIs will | |
be joined together with, otherwise relative URIs will be as is. | |
If specified, *parser* can be a M3U8Parser subclass to be used | |
to parse the data. | |
""" | |
return parser(base_uri).parse(data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
import gevent.monkey | |
except ImportError: | |
pass | |
else: | |
gevent.monkey.patch_all() | |
import base64 | |
import hashlib | |
import os | |
import random | |
import urlparse | |
from datetime import datetime, timedelta | |
from itertools import groupby, count | |
import dateutil.parser | |
import requests | |
import hls_playlist | |
def restructure_master_playlist(playlist): | |
"""Read master playlist and create structure as follows: | |
{ | |
variant name: (variant, { | |
media name: media, | |
... | |
}), | |
... | |
} | |
The main goal here is just to associate everything with a unique name. | |
""" | |
output = {} | |
for variant_num, variant in enumerate(playlist.playlists): | |
variant_name = "({}) {}".format(variant_num, get_variant_name(variant)) | |
media_names = {} | |
for media_type, medias in groupby(variant.media, lambda m: m.type): | |
for media_num, media in enumerate(medias): | |
media_name = "({} {}) {}".format(media_type, media_num, media.name or "<unknown>") | |
media_names[media_name] = media | |
output[variant_name] = variant, media_names | |
return output | |
def get_media_playlists(playlist): | |
"""Extract list of (name, uri) media playlists from master""" | |
results = [] | |
for variant_name, (variant, medias) in sorted(playlist.items()): | |
if variant.uri: | |
results.append((variant_name, variant.uri)) | |
for media_name, media in sorted(medias.items()): | |
if media.uri: | |
name = "{} / {}".format(variant_name, media_name) | |
results.append((name, media.uri)) | |
return results | |
def get_variant_name(variant): | |
"""For a given media variant in a master playlist, pick a name.""" | |
video_names = set( | |
media.name | |
for media in variant.media | |
if media.type == "VIDEO" and media.name | |
) | |
if not video_names: | |
return "<unknown>" | |
return "/".join(video_names) | |
def format_default(value, default, fn): | |
"""If value is true, return fn(value), otherwise return default. | |
Intended to be a helper for formatting values that may be not present. | |
""" | |
if value: | |
return fn(value) | |
return default | |
def format_uri(uri): | |
"""Edits down crazy long URIs to something eyeballable. | |
Any individual path part longer than THRESHOLD will be hashed and the hash given | |
inside curly bracket. If it contains a dot and is the final part, the extension is left alone. | |
Example: https://example.com/foo/something-really-long/bar/some-really-big-name.m3u8 | |
Output: https://example.com/foo/{1a6Udfzz}/bar/{Bx56jaHy}.m3u8 | |
""" | |
THRESHOLD = 40 # 40 is big enough for uuids, anything longer is dumb | |
scheme, netloc, path, query, fragment = urlparse.urlsplit(uri) | |
path = path.split('/') | |
for i, part in enumerate(path): | |
ext = '' | |
if i == len(path) - 1: | |
part, ext = os.path.splitext(part) | |
if len(part) > THRESHOLD: | |
part = "{%s}" % base64.b64encode(hashlib.sha256(part).digest()[:6], "-_") | |
path[i] = ''.join((part, ext)) | |
path = '/'.join(path) | |
return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) | |
def format_master_playlist(playlist): | |
"""Return a human-readable master playlist digest""" | |
lines = [] | |
for variant_name, (variant, medias) in sorted(playlist.items()): | |
lines.append("{name}: {iframe}{res} {bitrate}kb/s {uri}".format( | |
name=variant_name, | |
iframe="IFRAME " if variant.is_iframe else "", | |
res=format_default( | |
variant.stream_info.resolution, "NO RES", "{0.width}x{0.height}".format, | |
), | |
bitrate=format_default( | |
variant.stream_info.bandwidth, "NO BITRATE", lambda br: "{:.2f}".format(br/1024.), | |
), | |
uri=format_default(variant.uri, "NO URI", format_uri), | |
)) | |
for media_name, media in sorted(medias.items()): | |
attrs = [attr for attr in ("default", "forced", "autoselect") if getattr(media, attr)] | |
if media.language: | |
attrs.append(media.language) | |
attrs = " ".join(attrs) if attrs else "NO ATTRS" | |
lines.append("\t{name}: {attrs} {uri}".format( | |
name=media_name, attrs=attrs, | |
uri=format_default(media.uri, "NO URI", format_uri), | |
)) | |
return "\n".join(lines) | |
def format_media_playlist(name, uri, start, end, playlist): | |
"""Return a human-readable media playlist digest. | |
As media playlist contents vary with time, the time range in which the GET occurred | |
is provided.""" | |
attrs = [attr for attr in ("iframes_only", "is_endlist") if getattr(playlist, attr)] | |
attrs = " ".join(attrs) if attrs else "no attrs" | |
lines = [ | |
"{name}: {uri} at {start}/{range:.3f}s".format( | |
name=name, uri=format_uri(uri), | |
start=start.isoformat(), range=(end-start).total_seconds(), | |
), | |
"\tv{p.version}, target {p.target_duration:.3f}s, {attrs}, sequences {p.discontinuity_sequence}/{p.media_sequence}, start {p.start}".format( | |
p=playlist, attrs=attrs, | |
), | |
] | |
dis_seq = playlist.discontinuity_sequence or 0 | |
media_seq = playlist.media_sequence or 0 | |
timestamp = None | |
for segment in playlist.segments: | |
if segment.date: | |
timestamp = dateutil.parser.parse(segment.date) | |
lines.append( | |
"\t{s.title} {dis_seq}/{media_seq} {timestamp}/{s.duration:.3f}s {dis}{byterange}{uri}".format( | |
s=segment, | |
dis_seq=dis_seq, media_seq=media_seq, | |
timestamp="<unknown time>" if timestamp is None else timestamp.isoformat(), | |
dis="DISCONTINUITY " if segment.discontinuity else "", | |
byterange="" if segment.byterange is None else "{} of ".format(segment.byterange), | |
uri=format_uri(segment.uri), | |
) | |
) | |
if segment.duration is None: | |
# duration unknown, so can't get next timestamp from prev timestamp | |
timestamp = None | |
elif timestamp is not None: | |
timestamp += timedelta(seconds=segment.duration) | |
if segment.discontinuity: | |
dis_seq += 1 | |
media_seq += 1 | |
return "\n".join(lines) | |
def get_master_playlist(channel): | |
"""Get the master playlist for given channel from twitch""" | |
resp = requests.get( | |
"https://api.twitch.tv/api/channels/{}/access_token.json".format(channel), | |
params={'as3': 't'}, | |
headers={ | |
'Accept': 'application/vnd.twitchtv.v3+json', | |
'Client-ID': 'pwkzresl8kj2rdj6g7bvxl9ys1wly3j', | |
}, | |
) | |
resp.raise_for_status() # getting access token | |
token = resp.json() | |
resp = requests.get( | |
"https://usher.ttvnw.net/api/channel/hls/{}.m3u8".format(channel), | |
params={ | |
# Taken from streamlink. Unsure what's needed and what changing things can do. | |
"player": "twitchweb", | |
"p": random.randrange(1000000), | |
"type": "any", | |
"allow_source": "true", | |
"allow_audio_only": "true", | |
"allow_spectre": "false", | |
"fast_bread": "True", | |
"sig": token["sig"], | |
"token": token["token"], | |
# Also observed in the wild but not used in streamlink: | |
# "playlist_include_framerate": "true" | |
# "reassignments_supported": "true" | |
}, | |
) | |
resp.raise_for_status() # getting master playlist | |
playlist = hls_playlist.load(resp.text, base_uri=resp.url) | |
playlist = restructure_master_playlist(playlist) | |
return playlist | |
def get_media_playlist(uri): | |
resp = requests.get(uri) | |
resp.raise_for_status() # getting media playlist | |
return hls_playlist.load(resp.text, base_uri=resp.url) | |
def get_segment_hash(uri): | |
resp = requests.get(uri) | |
resp.raise_for_status() # getting segment | |
return base64.b64encode(hashlib.sha256(resp.content).digest()[:30], "-_") | |
def main(channel, all=False, content=False): | |
playlist = get_master_playlist(channel) | |
print format_master_playlist(playlist) | |
if all: | |
medias = get_media_playlists(playlist) | |
else: | |
name, (variant, _) = max(playlist.items(), key=lambda (n, (p, _)): p.stream_info.bandwidth) | |
medias = [(name, variant.uri)] | |
segments = {} | |
for name, uri in medias: | |
start = datetime.utcnow() | |
media_playlist = get_media_playlist(uri) | |
end = datetime.utcnow() | |
for seq, segment in zip(count(media_playlist.media_sequence), media_playlist.segments): | |
segments["{} #{}".format(name, seq)] = segment.uri | |
print format_media_playlist(name, uri, start, end, media_playlist) | |
if not content: | |
return | |
try: | |
import gevent | |
except ImportError: | |
for name, uri in sorted(segments.items()): | |
print "{}: {}".format(name, get_segment_hash(uri)) | |
else: | |
workers = {name: gevent.spawn(get_segment_hash, uri) for name, uri in segments.items()} | |
for name, worker in sorted(workers.items()): | |
print "{}: {}".format(name, worker.get()) | |
if __name__ == '__main__': | |
import argh | |
argh.dispatch_command(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment