Last active
January 24, 2019 07:30
-
-
Save pmrowla/842c696b32fa0387e302be290066cc28 to your computer and use it in GitHub Desktop.
segmentBase with stream=True
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import logging | |
import datetime | |
import os.path | |
import requests | |
from streamlink import StreamError, PluginError | |
from streamlink.compat import urlparse, urlunparse | |
from streamlink.stream.http import valid_args, normalize_key | |
from streamlink.stream.stream import Stream | |
from streamlink.stream.dash_manifest import MPD, sleeper, sleep_until, utc, freeze_timeline | |
from streamlink.stream.ffmpegmux import FFMPEGMuxer | |
from streamlink.stream.segmented import SegmentedStreamReader, SegmentedStreamWorker, SegmentedStreamWriter | |
from streamlink.utils import parse_xml | |
from streamlink.utils.l10n import Language | |
log = logging.getLogger(__name__) | |
class DASHStreamWriter(SegmentedStreamWriter): | |
def __init__(self, reader, *args, **kwargs): | |
options = reader.stream.session.options | |
kwargs["retries"] = options.get("dash-segment-attempts") | |
kwargs["threads"] = options.get("dash-segment-threads") | |
kwargs["timeout"] = options.get("dash-segment-timeout") | |
SegmentedStreamWriter.__init__(self, reader, *args, **kwargs) | |
def fetch(self, segment, retries=None): | |
if self.closed or not retries: | |
return | |
try: | |
headers = {} | |
now = datetime.datetime.now(tz=utc) | |
if segment.available_at > now: | |
time_to_wait = (segment.available_at - now).total_seconds() | |
fname = os.path.basename(urlparse(segment.url).path) | |
log.debug("Waiting for segment: {fname} ({wait:.01f}s)".format(fname=fname, wait=time_to_wait)) | |
sleep_until(segment.available_at) | |
if segment.range: | |
start, length = segment.range | |
if length: | |
end = start + length - 1 | |
else: | |
end = "" | |
headers["Range"] = "bytes={0}-{1}".format(start, end) | |
return self.session.http.get(segment.url, | |
stream=segment.stream, | |
timeout=self.timeout, | |
exception=StreamError, | |
headers=headers) | |
except StreamError as err: | |
log.error("Failed to open segment {0}: {1}", segment.url, err) | |
return self.fetch(segment, retries - 1) | |
def write(self, segment, res, chunk_size=8192): | |
for chunk in res.iter_content(chunk_size): | |
if not self.closed: | |
self.reader.buffer.write(chunk) | |
else: | |
log.warning("Download of segment: {} aborted".format(segment.url)) | |
return | |
log.debug("Download of segment: {} complete".format(segment.url)) | |
res.close() | |
class DASHStreamWorker(SegmentedStreamWorker): | |
def __init__(self, *args, **kwargs): | |
SegmentedStreamWorker.__init__(self, *args, **kwargs) | |
self.mpd = self.stream.mpd | |
self.period = self.stream.period | |
@staticmethod | |
def get_representation(mpd, representation_id, mime_type): | |
for aset in mpd.periods[0].adaptationSets: | |
for rep in aset.representations: | |
if rep.id == representation_id and rep.mimeType == mime_type: | |
return rep | |
def iter_segments(self): | |
init = True | |
back_off_factor = 1 | |
while not self.closed: | |
# find the representation by ID | |
representation = self.get_representation(self.mpd, self.reader.representation_id, self.reader.mime_type) | |
refresh_wait = max(self.mpd.minimumUpdatePeriod.total_seconds(), | |
self.mpd.periods[0].duration.total_seconds()) or 5 | |
with sleeper(refresh_wait * back_off_factor): | |
if representation: | |
for segment in representation.segments(init=init, http=self.session.http): | |
if self.closed: | |
break | |
yield segment | |
# log.debug("Adding segment {0} to queue", segment.url) | |
if self.mpd.type == "dynamic": | |
if not self.reload(): | |
back_off_factor = max(back_off_factor * 1.3, 10.0) | |
else: | |
back_off_factor = 1 | |
else: | |
return | |
init = False | |
def reload(self): | |
if self.closed: | |
return | |
self.reader.buffer.wait_free() | |
log.debug("Reloading manifest ({0}:{1})".format(self.reader.representation_id, self.reader.mime_type)) | |
res = self.session.http.get(self.mpd.url, exception=StreamError) | |
new_mpd = MPD(self.session.http.xml(res, ignore_ns=True), | |
base_url=self.mpd.base_url, | |
url=self.mpd.url, | |
timelines=self.mpd.timelines) | |
new_rep = self.get_representation(new_mpd, self.reader.representation_id, self.reader.mime_type) | |
with freeze_timeline(new_mpd): | |
changed = len(list(itertools.islice(new_rep.segments(), 1))) > 0 | |
if changed: | |
self.mpd = new_mpd | |
return changed | |
class DASHStreamReader(SegmentedStreamReader): | |
__worker__ = DASHStreamWorker | |
__writer__ = DASHStreamWriter | |
def __init__(self, stream, representation_id, mime_type, *args, **kwargs): | |
SegmentedStreamReader.__init__(self, stream, *args, **kwargs) | |
self.mime_type = mime_type | |
self.representation_id = representation_id | |
log.debug("Opening DASH reader for: {0} ({1})".format(self.representation_id, self.mime_type)) | |
class DASHStream(Stream): | |
__shortname__ = "dash" | |
def __init__(self, | |
session, | |
mpd, | |
video_representation=None, | |
audio_representation=None, | |
period=0, | |
**args): | |
super(DASHStream, self).__init__(session) | |
self.mpd = mpd | |
self.video_representation = video_representation | |
self.audio_representation = audio_representation | |
self.period = period | |
self.args = args | |
def __json__(self): | |
req = requests.Request(method="GET", url=self.mpd.url, **valid_args(self.args)) | |
req = req.prepare() | |
headers = dict(map(normalize_key, req.headers.items())) | |
return dict(type=type(self).shortname(), url=req.url, headers=headers) | |
@classmethod | |
def parse_manifest(cls, session, url_or_manifest, **args): | |
""" | |
Attempt to parse a DASH manifest file and return its streams | |
:param session: Streamlink session instance | |
:param url_or_manifest: URL of the manifest file or an XML manifest string | |
:return: a dict of name -> DASHStream instances | |
""" | |
ret = {} | |
if url_or_manifest.startswith('<?xml'): | |
mpd = MPD(parse_xml(url_or_manifest, ignore_ns=True)) | |
else: | |
res = session.http.get(url_or_manifest, **args) | |
url = res.url | |
urlp = list(urlparse(url)) | |
urlp[2], _ = urlp[2].rsplit("/", 1) | |
mpd = MPD(session.http.xml(res, ignore_ns=True), base_url=urlunparse(urlp), url=url) | |
video, audio = [], [] | |
# Search for suitable video and audio representations | |
for aset in mpd.periods[0].adaptationSets: | |
if aset.contentProtection: | |
raise PluginError("{} is protected by DRM".format(url)) | |
for rep in aset.representations: | |
if rep.mimeType.startswith("video"): | |
video.append(rep) | |
elif rep.mimeType.startswith("audio"): | |
audio.append(rep) | |
if not video: | |
video = [None] | |
if not audio: | |
audio = [None] | |
locale = session.localization | |
locale_lang = locale.language | |
lang = None | |
available_languages = set() | |
# if the locale is explicitly set, prefer that language over others | |
for aud in audio: | |
if aud and aud.lang: | |
available_languages.add(aud.lang) | |
try: | |
if locale.explicit and aud.lang and Language.get(aud.lang) == locale_lang: | |
lang = aud.lang | |
except LookupError: | |
continue | |
if not lang: | |
# filter by the first language that appears | |
lang = audio[0] and audio[0].lang | |
log.debug("Available languages for DASH audio streams: {0} (using: {1})".format(", ".join(available_languages) or "NONE", lang or "n/a")) | |
# if the language is given by the stream, filter out other languages that do not match | |
if len(available_languages) > 1: | |
audio = list(filter(lambda a: a.lang is None or a.lang == lang, audio)) | |
for vid, aud in itertools.product(video, audio): | |
stream = DASHStream(session, mpd, vid, aud, **args) | |
stream_name = [] | |
if vid: | |
stream_name.append("{:0.0f}{}".format(vid.height or vid.bandwidth_rounded, "p" if vid.height else "k")) | |
if audio and len(audio) > 1: | |
stream_name.append("a{:0.0f}k".format(aud.bandwidth)) | |
ret['+'.join(stream_name)] = stream | |
return ret | |
def open(self): | |
if self.video_representation: | |
video = DASHStreamReader(self, self.video_representation.id, self.video_representation.mimeType) | |
video.open() | |
if self.audio_representation: | |
audio = DASHStreamReader(self, self.audio_representation.id, self.audio_representation.mimeType) | |
audio.open() | |
if self.video_representation and self.audio_representation: | |
return FFMPEGMuxer(self.session, video, audio, copyts=True).open() | |
elif self.video_representation: | |
return video | |
elif self.audio_representation: | |
return audio | |
def to_url(self): | |
return self.mpd.url |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals | |
import copy | |
import logging | |
import datetime | |
import re | |
import time | |
from collections import defaultdict, namedtuple | |
from itertools import repeat, count | |
import math | |
from isodate import parse_datetime, parse_duration, Duration | |
from contextlib import contextmanager | |
from streamlink.compat import urlparse, urljoin, urlunparse, izip, urlsplit, urlunsplit | |
from pymp4.exceptions import BoxNotFound | |
from pymp4.parser import MP4 | |
from pymp4.util import BoxUtil | |
if hasattr(datetime, "timezone"): | |
utc = datetime.timezone.utc | |
else: | |
class UTC(datetime.tzinfo): | |
def utcoffset(self, dt): | |
return datetime.timedelta(0) | |
def tzname(self, dt): | |
return "UTC" | |
def dst(self, dt): | |
return datetime.timedelta(0) | |
utc = UTC() | |
log = logging.getLogger(__name__) | |
epoch_start = datetime.datetime(1970, 1, 1, tzinfo=utc) | |
class Segment(object): | |
def __init__(self, url, duration, init=False, content=True, available_at=epoch_start, range=None, stream=False): | |
self.url = url | |
self.duration = duration | |
self.init = init | |
self.content = content | |
self.available_at = available_at | |
self.range = range | |
self.stream = stream | |
def datetime_to_seconds(dt): | |
return (dt - epoch_start).total_seconds() | |
def count_dt(firstval=datetime.datetime.now(tz=utc), step=datetime.timedelta(seconds=1)): | |
x = firstval | |
while True: | |
yield x | |
x += step | |
@contextmanager | |
def freeze_timeline(mpd): | |
timelines = copy.copy(mpd.timelines) | |
yield | |
mpd.timelines = timelines | |
@contextmanager | |
def sleeper(duration): | |
s = time.time() | |
yield | |
time_to_sleep = duration - (time.time() - s) | |
if time_to_sleep > 0: | |
time.sleep(time_to_sleep) | |
def sleep_until(walltime): | |
c = datetime.datetime.now(tz=utc) | |
time_to_wait = (walltime - c).total_seconds() | |
if time_to_wait > 0: | |
time.sleep(time_to_wait) | |
class MPDParsers(object): | |
@staticmethod | |
def bool_str(v): | |
return v.lower() == "true" | |
@staticmethod | |
def type(type_): | |
if type_ not in (u"static", u"dynamic"): | |
raise MPDParsingError("@type must be static or dynamic") | |
return type_ | |
@staticmethod | |
def duration(duration): | |
return parse_duration(duration) | |
@staticmethod | |
def datetime(dt): | |
return parse_datetime(dt).replace(tzinfo=utc) | |
@staticmethod | |
def segment_template(url_template): | |
end = 0 | |
res = "" | |
for m in re.compile(r"(.*?)\$(\w+)(?:%([\w.]+))?\$").finditer(url_template): | |
_, end = m.span() | |
res += "{0}{{{1}{2}}}".format(m.group(1), | |
m.group(2), | |
(":" + m.group(3)) if m.group(3) else "") | |
return (res + url_template[end:]).format | |
@staticmethod | |
def frame_rate(frame_rate): | |
if "/" in frame_rate: | |
a, b = frame_rate.split("/") | |
return float(a) / float(b) | |
else: | |
return float(frame_rate) | |
@staticmethod | |
def timedelta(timescale=1): | |
def _timedelta(seconds): | |
return datetime.timedelta(seconds=int(float(seconds) / float(timescale))) | |
return _timedelta | |
@staticmethod | |
def range(range_spec): | |
r = range_spec.split("-") | |
if len(r) != 2: | |
raise MPDParsingError("invalid byte-range-spec") | |
start, end = int(r[0]), r[1] and int(r[1]) or None | |
return start, end and ((end-start) + 1) | |
class MPDParsingError(Exception): | |
pass | |
class MPDNode(object): | |
__tag__ = None | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
self.node = node | |
self.root = root | |
self.parent = parent | |
self._base_url = kwargs.get(u"base_url") | |
self.attributes = set([]) | |
if self.__tag__ and self.node.tag.lower() != self.__tag__.lower(): | |
raise MPDParsingError("root tag did not match the expected tag: {}".format(self.__tag__)) | |
@property | |
def attrib(self): | |
return self.node.attrib | |
@property | |
def text(self): | |
return self.node.text | |
def __str__(self): | |
return "<{tag} {attrs}>".format( | |
tag=self.__tag__, | |
attrs=" ".join("@{}={}".format(attr, getattr(self, attr)) for attr in self.attributes) | |
) | |
def attr(self, key, default=None, parser=None, required=False, inherited=False): | |
self.attributes.add(key) | |
if key in self.attrib: | |
value = self.attrib.get(key) | |
if parser and callable(parser): | |
return parser(value) | |
else: | |
return value | |
elif inherited: | |
if self.parent and hasattr(self.parent, key) and getattr(self.parent, key): | |
return getattr(self.parent, key) | |
if required: | |
raise MPDParsingError("could not find required attribute {tag}@{attr} ".format(attr=key, tag=self.__tag__)) | |
else: | |
return default | |
def children(self, cls, minimum=0, maximum=None): | |
children = self.node.findall(cls.__tag__) | |
if len(children) < minimum or (maximum and len(children) > maximum): | |
raise MPDParsingError("expected to find {}/{} required [{}..{})".format( | |
self.__tag__, cls.__tag__, minimum, maximum or "unbound")) | |
return list(map(lambda x: cls(x[1], root=self.root, parent=self, i=x[0], base_url=self.base_url), | |
enumerate(children))) | |
def only_child(self, cls, minimum=0): | |
children = self.children(cls, minimum=minimum, maximum=1) | |
return children[0] if len(children) else None | |
def walk_back(self, cls=None, f=lambda x: x): | |
node = self.parent | |
while node: | |
if cls is None or cls.__tag__ == node.__tag__: | |
yield f(node) | |
node = node.parent | |
def walk_back_get_attr(self, attr): | |
parent_attrs = [getattr(n, attr) for n in self.walk_back() if hasattr(n, attr)] | |
return parent_attrs[0] if len(parent_attrs) else None | |
@property | |
def base_url(self): | |
base_url = self._base_url | |
if hasattr(self, "baseURLs") and len(self.baseURLs): | |
base_url = BaseURL.join(base_url, self.baseURLs[0].url) | |
return base_url | |
class MPD(MPDNode): | |
""" | |
Represents the MPD as a whole | |
Should validate the XML input and provide methods to get segment URLs for each Period, AdaptationSet and | |
Representation. | |
""" | |
__tag__ = u"MPD" | |
def __init__(self, node, root=None, parent=None, url=None, *args, **kwargs): | |
# top level has no parent | |
super(MPD, self).__init__(node, root=self, *args, **kwargs) | |
# parser attributes | |
self.url = url | |
self.timelines = defaultdict(lambda: -1) | |
self.timelines.update(kwargs.pop("timelines", {})) | |
self.id = self.attr(u"id") | |
self.profiles = self.attr(u"profiles", required=True) | |
self.type = self.attr(u"type", default=u"static", parser=MPDParsers.type) | |
self.minimumUpdatePeriod = self.attr(u"minimumUpdatePeriod", parser=MPDParsers.duration, default=Duration()) | |
self.minBufferTime = self.attr(u"minBufferTime", parser=MPDParsers.duration, required=True) | |
self.timeShiftBufferDepth = self.attr(u"timeShiftBufferDepth", parser=MPDParsers.duration) | |
self.availabilityStartTime = self.attr(u"availabilityStartTime", parser=MPDParsers.datetime, | |
default=datetime.datetime.fromtimestamp(0, utc), # earliest date | |
required=self.type == "dynamic") | |
self.publishTime = self.attr(u"publishTime", parser=MPDParsers.datetime, required=self.type == "dynamic") | |
self.availabilityEndTime = self.attr(u"availabilityEndTime", parser=MPDParsers.datetime) | |
self.mediaPresentationDuration = self.attr(u"mediaPresentationDuration", parser=MPDParsers.duration) | |
self.suggestedPresentationDelay = self.attr(u"suggestedPresentationDelay", parser=MPDParsers.duration) | |
# parse children | |
location = self.children(Location) | |
self.location = location[0] if location else None | |
if self.location: | |
self.url = self.location.text | |
urlp = list(urlparse(self.url)) | |
if urlp[2]: | |
urlp[2], _ = urlp[2].rsplit("/", 1) | |
self._base_url = urlunparse(urlp) | |
self.baseURLs = self.children(BaseURL) | |
self.periods = self.children(Period, minimum=1) | |
self.programInformation = self.children(ProgramInformation) | |
class ProgramInformation(MPDNode): | |
__tag__ = "ProgramInformation" | |
class BaseURL(MPDNode): | |
__tag__ = "BaseURL" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(BaseURL, self).__init__(node, root, parent, *args, **kwargs) | |
self.url = self.node.text.strip() | |
@property | |
def is_absolute(self): | |
return urlparse(self.url).scheme | |
@staticmethod | |
def join(url, other): | |
# if the other URL is an absolute url, then return that | |
if urlparse(other).scheme: | |
return other | |
elif url: | |
parts = list(urlsplit(url)) | |
if not parts[2].endswith("/"): | |
parts[2] += "/" | |
url = urlunsplit(parts) | |
return urljoin(url, other) | |
else: | |
return other | |
class Location(MPDNode): | |
__tag__ = "Location" | |
class Period(MPDNode): | |
__tag__ = u"Period" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(Period, self).__init__(node, root, parent, *args, **kwargs) | |
self.i = kwargs.get(u"i", 0) | |
self.id = self.attr(u"id") | |
self.bitstreamSwitching = self.attr(u"bitstreamSwitching", parser=MPDParsers.bool_str) | |
self.duration = self.attr(u"duration", default=Duration(), parser=MPDParsers.duration) | |
self.start = self.attr(u"start", default=Duration(), parser=MPDParsers.duration) | |
if self.start is None and self.i == 0 and self.root.type == "static": | |
self.start = 0 | |
# TODO: Early Access Periods | |
self.baseURLs = self.children(BaseURL) | |
self.segmentBase = self.only_child(SegmentBase) | |
self.adaptationSets = self.children(AdaptationSet, minimum=1) | |
self.segmentList = self.only_child(SegmentList) | |
self.segmentTemplate = self.only_child(SegmentTemplate) | |
self.sssetIdentifier = self.only_child(AssetIdentifier) | |
self.eventStream = self.children(EventStream) | |
self.subset = self.children(Subset) | |
class SegmentBase(MPDNode): | |
__tag__ = "SegmentBase" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(SegmentBase, self).__init__(node, root, parent, *args, **kwargs) | |
self.timescale = self.attr("timescale", parser=int) | |
self.presentation_time_offset = self.attr("presentationTimeOffset") | |
self.index_range = self.attr("indexRange", parser=MPDParsers.range) | |
self.index_range_exact = self.attr("indexRangeExact", parser=MPDParsers.bool_str, default=False) | |
self.availability_time_offset = self.attr("availabilityTimeOffset", parser=int) | |
self.availability_time_complete = self.attr("availabilityTimeComplete", parser=MPDParsers.bool_str) | |
self.initialization = self.only_child(Initialization) | |
self.representation_index = self.only_child(RepresentationIndex) | |
self.period = list(self.walk_back(Period))[0] | |
self.sidx = None | |
self.sidx_offset = 0 | |
def init_sidx(self, http): | |
# Look for sidx existence in the initialization range, and the first | |
# box following the initialization range | |
if self.initialization: | |
offset, length = self.initialization.range | |
else: | |
offset, length = self.index_range | |
headers = {"Range": "bytes={0}-{1}".format(offset, offset + length - 1)} | |
res = http.get(self.base_url, headers=headers) | |
for box in MP4.parse(res.content): | |
try: | |
sidx = BoxUtil.first(box, b'sidx') | |
except BoxNotFound: | |
sidx = None | |
if sidx: | |
break | |
if not sidx: | |
data = [] | |
offset += length | |
length = 1500 | |
mp4 = None | |
while not mp4: | |
headers = {"Range": "bytes={0}-{1}".format(offset, offset + length - 1)} | |
res = http.get(self.base_url, headers=headers) | |
data.append(res.content) | |
mp4 = MP4.parse(b''.join(data)) | |
offset += length | |
for box in mp4: | |
try: | |
sidx = BoxUtil.first(box, b'sidx') | |
except BoxNotFound: | |
sidx = None | |
if sidx: | |
break | |
self.sidx = sidx | |
def segments(self, **kwargs): | |
http = kwargs.pop("http", None) | |
if http: | |
self.init_sidx(http) | |
if self.sidx: | |
# Segmented MP4 | |
yield Segment(self.base_url, 0, init=True, content=False, range=(0, self.sidx.end)) | |
offset = self.sidx.end + self.sidx.first_offset | |
for ref in self.sidx.references: | |
length = ref.referenced_size | |
duration = ref.segment_duration / self.sidx.timescale | |
yield Segment(self.base_url, duration, init=False, content=True, range=(offset, length)) | |
offset += length | |
else: | |
# Non-segmented MP4 | |
yield Segment(self.base_url, 0, init=True, content=True) | |
class AssetIdentifier(MPDNode): | |
__tag__ = "AssetIdentifier" | |
class Subset(MPDNode): | |
__tag__ = "Subset" | |
class EventStream(MPDNode): | |
__tag__ = "EventStream" | |
class Initialization(MPDNode): | |
__tag__ = "Initialization" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(Initialization, self).__init__(node, root, parent, *args, **kwargs) | |
self.source_url = self.attr("sourceURL") | |
self.range = self.attr("range", parser=MPDParsers.range) | |
class SegmentURL(MPDNode): | |
__tag__ = "SegmentURL" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(SegmentURL, self).__init__(node, root, parent, *args, **kwargs) | |
self.media = self.attr("media") | |
self.media_range = self.attr("mediaRange", parser=MPDParsers.range) | |
class SegmentList(MPDNode): | |
__tag__ = "SegmentList" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(SegmentList, self).__init__(node, root, parent, *args, **kwargs) | |
self.presentation_time_offset = self.attr("presentationTimeOffset") | |
self.timescale = self.attr("timescale", parser=int) | |
self.duration = self.attr("duration", parser=int) | |
self.start_number = self.attr("startNumber", parser=int, default=1) | |
if self.duration: | |
self.duration_seconds = self.duration / float(self.timescale) | |
else: | |
self.duration_seconds = None | |
self.initialization = self.only_child(Initialization) | |
self.segment_urls = self.children(SegmentURL, minimum=1) | |
@property | |
def segments(self): | |
if self.initialization: | |
yield Segment(self.make_url(self.initialization.source_url), 0, init=True, content=False) | |
for n, segment_url in enumerate(self.segment_urls, self.start_number): | |
yield Segment(self.make_url(segment_url.media), self.duration_seconds, range=segment_url.media_range) | |
def make_url(self, url): | |
return BaseURL.join(self.base_url, url) | |
class AdaptationSet(MPDNode): | |
__tag__ = u"AdaptationSet" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(AdaptationSet, self).__init__(node, root, parent, *args, **kwargs) | |
self.id = self.attr(u"id") | |
self.group = self.attr(u"group") | |
self.mimeType = self.attr(u"mimeType") | |
self.lang = self.attr(u"lang") | |
self.contentType = self.attr(u"contentType") | |
self.par = self.attr(u"par") | |
self.minBandwidth = self.attr(u"minBandwidth") | |
self.maxBandwidth = self.attr(u"maxBandwidth") | |
self.minWidth = self.attr(u"minWidth", parser=int) | |
self.maxWidth = self.attr(u"maxWidth", parser=int) | |
self.minHeight = self.attr(u"minHeight", parser=int) | |
self.maxHeight = self.attr(u"maxHeight", parser=int) | |
self.minFrameRate = self.attr(u"minFrameRate", parser=MPDParsers.frame_rate) | |
self.maxFrameRate = self.attr(u"maxFrameRate", parser=MPDParsers.frame_rate) | |
self.segmentAlignment = self.attr(u"segmentAlignment", default=False, parser=MPDParsers.bool_str) | |
self.bitstreamSwitching = self.attr(u"bitstreamSwitching", parser=MPDParsers.bool_str) | |
self.subsegmentAlignment = self.attr(u"subsegmentAlignment", default=False, parser=MPDParsers.bool_str) | |
self.subsegmentStartsWithSAP = self.attr(u"subsegmentStartsWithSAP", default=0, parser=int) | |
self.baseURLs = self.children(BaseURL) | |
self.segmentTemplate = self.only_child(SegmentTemplate) | |
self.representations = self.children(Representation, minimum=1) | |
self.contentProtection = self.children(ContentProtection) | |
class SegmentTemplate(MPDNode): | |
__tag__ = "SegmentTemplate" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(SegmentTemplate, self).__init__(node, root, parent, *args, **kwargs) | |
self.defaultSegmentTemplate = self.walk_back_get_attr('segmentTemplate') | |
self.initialization = self.attr(u"initialization", parser=MPDParsers.segment_template) | |
self.media = self.attr(u"media", parser=MPDParsers.segment_template) | |
self.duration = self.attr(u"duration", parser=int, | |
default=self.defaultSegmentTemplate.duration if self.defaultSegmentTemplate else None) | |
self.timescale = self.attr(u"timescale", parser=int, | |
default=self.defaultSegmentTemplate.timescale if self.defaultSegmentTemplate else 1) | |
self.startNumber = self.attr(u"startNumber", parser=int, | |
default=self.defaultSegmentTemplate.startNumber if self.defaultSegmentTemplate else 1) | |
self.presentationTimeOffset = self.attr(u"presentationTimeOffset", parser=MPDParsers.timedelta(self.timescale)) | |
if self.duration: | |
self.duration_seconds = self.duration / float(self.timescale) | |
else: | |
self.duration_seconds = None | |
self.period = list(self.walk_back(Period))[0] | |
# children | |
self.segmentTimeline = self.only_child(SegmentTimeline) | |
def segments(self, **kwargs): | |
if kwargs.pop("init", True): | |
init_url = self.format_initialization(**kwargs) | |
if init_url: | |
yield Segment(init_url, 0, True, False) | |
for media_url, available_at in self.format_media(**kwargs): | |
yield Segment(media_url, self.duration_seconds, False, True, available_at) | |
def make_url(self, url): | |
""" | |
Join the URL with the base URL, unless it's an absolute URL | |
:param url: maybe relative URL | |
:return: joined URL | |
""" | |
return BaseURL.join(self.base_url, url) | |
def format_initialization(self, **kwargs): | |
if self.initialization: | |
return self.make_url(self.initialization(**kwargs)) | |
def segment_numbers(self): | |
""" | |
yield the segment number and when it will be available | |
There are two cases for segment number generation, static and dynamic. | |
In the case of static stream, the segment number starts at the startNumber and counts | |
up to the number of segments that are represented by the periods duration. | |
In the case of dynamic streams, the segments should appear at the specified time | |
in the simplest case the segment number is based on the time since the availabilityStartTime | |
:return: | |
""" | |
log.debug("Generating segment numbers for {0} playlist (id={1})".format(self.root.type, self.parent.id)) | |
if self.root.type == u"static": | |
available_iter = repeat(epoch_start) | |
duration = self.period.duration.seconds or self.root.mediaPresentationDuration.seconds | |
if duration: | |
number_iter = range(self.startNumber, int(duration / self.duration_seconds) + 1) | |
else: | |
number_iter = count(self.startNumber) | |
else: | |
now = datetime.datetime.now(utc) | |
if self.presentationTimeOffset: | |
since_start = (now - self.presentationTimeOffset) - self.root.availabilityStartTime | |
available_start_date = self.root.availabilityStartTime + self.presentationTimeOffset + since_start | |
available_start = available_start_date | |
else: | |
since_start = now - self.root.availabilityStartTime | |
available_start = now | |
# if there is no delay, use a delay of 3 seconds | |
suggested_delay = datetime.timedelta(seconds=(self.root.suggestedPresentationDelay.total_seconds() | |
if self.root.suggestedPresentationDelay | |
else 3)) | |
# the number of the segment that is available at NOW - SUGGESTED_DELAY - BUFFER_TIME | |
number_iter = count(self.startNumber + | |
int((since_start - suggested_delay - self.root.minBufferTime).total_seconds() / | |
self.duration_seconds)) | |
# the time the segment number is available at NOW | |
available_iter = count_dt(available_start, | |
step=datetime.timedelta(seconds=self.duration_seconds)) | |
for number, available_at in izip(number_iter, available_iter): | |
yield number, available_at | |
def format_media(self, **kwargs): | |
if self.segmentTimeline: | |
if self.parent.id is None: | |
# workaround for invalid `self.root.timelines[self.parent.id]` | |
# creates a timeline for every mimeType instead of one for both | |
self.parent.id = self.parent.mimeType | |
log.debug("Generating segment timeline for {0} playlist (id={1}))".format(self.root.type, self.parent.id)) | |
if self.root.type == "dynamic": | |
# if there is no delay, use a delay of 3 seconds | |
suggested_delay = datetime.timedelta(seconds=(self.root.suggestedPresentationDelay.total_seconds() | |
if self.root.suggestedPresentationDelay | |
else 3)) | |
publish_time = self.root.publishTime or epoch_start | |
# transform the time line in to a segment list | |
timeline = [] | |
available_at = publish_time | |
for segment, n in reversed(list(zip(self.segmentTimeline.segments, count(self.startNumber)))): | |
# the last segment in the timeline is the most recent | |
# so, work backwards and calculate when each of the segments was | |
# available, based on the durations relative to the publish time | |
url = self.make_url(self.media(Time=segment.t, Number=n, **kwargs)) | |
duration = datetime.timedelta(seconds=segment.d / self.timescale) | |
# once the suggested_delay is reach stop | |
if self.root.timelines[self.parent.id] == -1 and publish_time - available_at >= suggested_delay: | |
break | |
timeline.append((url, available_at, segment.t)) | |
available_at -= duration # walk backwards in time | |
# return the segments in chronological order | |
for url, available_at, t in reversed(timeline): | |
if t > self.root.timelines[self.parent.id]: | |
self.root.timelines[self.parent.id] = t | |
yield (url, available_at) | |
else: | |
for segment, n in zip(self.segmentTimeline.segments, count(self.startNumber)): | |
yield (self.make_url(self.media(Time=segment.t, Number=n, **kwargs)), | |
datetime.datetime.now(tz=utc)) | |
else: | |
for number, available_at in self.segment_numbers(): | |
yield (self.make_url(self.media(Number=number, **kwargs)), | |
available_at) | |
class Representation(MPDNode): | |
__tag__ = u"Representation" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(Representation, self).__init__(node, root, parent, *args, **kwargs) | |
self.id = self.attr(u"id", required=True) | |
self.bandwidth = self.attr(u"bandwidth", parser=lambda b: float(b) / 1000.0, required=True) | |
self.mimeType = self.attr(u"mimeType", required=True, inherited=True) | |
self.codecs = self.attr(u"codecs") | |
self.startWithSAP = self.attr(u"startWithSAP") | |
# video | |
self.width = self.attr(u"width", parser=int) | |
self.height = self.attr(u"height", parser=int) | |
self.frameRate = self.attr(u"frameRate", parser=MPDParsers.frame_rate) | |
# audio | |
self.audioSamplingRate = self.attr(u"audioSamplingRate", parser=int) | |
self.numChannels = self.attr(u"numChannels", parser=int) | |
# subtitle | |
self.lang = self.attr(u"lang", inherited=True) | |
self.baseURLs = self.children(BaseURL) | |
self.subRepresentation = self.children(SubRepresentation) | |
self.segmentBase = self.only_child(SegmentBase) | |
self.segmentList = self.children(SegmentList) | |
self.segmentTemplate = self.only_child(SegmentTemplate) | |
@property | |
def bandwidth_rounded(self): | |
return round(self.bandwidth, 1 - int(math.log10(self.bandwidth))) | |
def segments(self, **kwargs): | |
""" | |
Segments are yielded when they are available | |
Segments appear on a time line, for dynamic content they are only available at a certain time | |
and sometimes for a limited time. For static content they are all available at the same time. | |
:param kwargs: extra args to pass to the segment template | |
:return: yields Segments | |
""" | |
segmentBase = self.segmentBase or self.walk_back_get_attr("segmentBase") | |
segmentLists = self.segmentList or self.walk_back_get_attr("segmentList") | |
segmentTemplate = self.segmentTemplate or self.walk_back_get_attr("segmentTemplate") | |
if segmentTemplate: | |
for segment in segmentTemplate.segments(RepresentationID=self.id, | |
Bandwidth=int(self.bandwidth * 1000), | |
**kwargs): | |
if segment.init: | |
yield segment | |
else: | |
yield segment | |
elif segmentLists: | |
for segmentList in segmentLists: | |
for segment in segmentList.segments: | |
yield segment | |
elif segmentBase: | |
# for segment in segmentBase.segments(**kwargs): | |
# yield segment | |
if segmentBase.initialization: | |
offset, length = segmentBase.initialization.range | |
else: | |
offset, length = segmentBase.index_range | |
yield Segment(self.base_url, 0, init=True, content=False, range=(offset, length)) | |
yield Segment(self.base_url, 0, init=False, content=True, range=(offset + length, 0), stream=True) | |
else: | |
yield Segment(self.base_url, 0, True, True) | |
class SubRepresentation(MPDNode): | |
__tag__ = "SubRepresentation" | |
class RepresentationIndex(MPDNode): | |
__tag__ = "RepresentationIndex" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(RepresentationIndex, self).__init__(node, root, parent, *args, **kwargs) | |
self.source_url = self.attr("sourceURL") | |
self.range = self.attr("range", parser=MPDParsers.range) | |
class SegmentTimeline(MPDNode): | |
__tag__ = "SegmentTimeline" | |
TimelineSegment = namedtuple("TimelineSegment", "t d") | |
def __init__(self, node, *args, **kwargs): | |
super(SegmentTimeline, self).__init__(node, *args, **kwargs) | |
self.timescale = self.walk_back_get_attr("timescale") | |
self.timeline_segments = self.children(_TimelineSegment) | |
@property | |
def segments(self): | |
t = 0 | |
for tsegment in self.timeline_segments: | |
if t == 0 and tsegment.t is not None: | |
t = tsegment.t | |
# check the start time from MPD | |
for repeated_i in range(tsegment.r + 1): | |
yield self.TimelineSegment(t, tsegment.d) | |
t += tsegment.d | |
class _TimelineSegment(MPDNode): | |
__tag__ = "S" | |
def __init__(self, node, *args, **kwargs): | |
super(_TimelineSegment, self).__init__(node, *args, **kwargs) | |
self.t = self.attr("t", parser=int) | |
self.d = self.attr("d", parser=int) | |
self.r = self.attr("r", parser=int, default=0) | |
class ContentProtection(MPDNode): | |
__tag__ = "ContentProtection" | |
def __init__(self, node, root=None, parent=None, *args, **kwargs): | |
super(ContentProtection, self).__init__(node, root, parent, *args, **kwargs) | |
self.schemeIdUri = self.attr(u"schemeIdUri") | |
self.value = self.attr(u"value") | |
self.default_KID = self.attr(u"default_KID") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/streamlink/stream/dash.py b/src/streamlink/stream/dash.py | |
index 3885e9ba..3b15f6c9 100644 | |
--- a/src/streamlink/stream/dash.py | |
+++ b/src/streamlink/stream/dash.py | |
@@ -47,6 +47,7 @@ class DASHStreamWriter(SegmentedStreamWriter): | |
headers["Range"] = "bytes={0}-{1}".format(start, end) | |
return self.session.http.get(segment.url, | |
+ stream=segment.stream, | |
timeout=self.timeout, | |
exception=StreamError, | |
headers=headers) | |
@@ -63,6 +64,7 @@ class DASHStreamWriter(SegmentedStreamWriter): | |
return | |
log.debug("Download of segment: {} complete".format(segment.url)) | |
+ res.close() | |
class DASHStreamWorker(SegmentedStreamWorker): | |
diff --git a/src/streamlink/stream/dash_manifest.py b/src/streamlink/stream/dash_manifest.py | |
index 01185df2..0caf64c1 100644 | |
--- a/src/streamlink/stream/dash_manifest.py | |
+++ b/src/streamlink/stream/dash_manifest.py | |
@@ -38,13 +38,14 @@ epoch_start = datetime.datetime(1970, 1, 1, tzinfo=utc) | |
class Segment(object): | |
- def __init__(self, url, duration, init=False, content=True, available_at=epoch_start, range=None): | |
+ def __init__(self, url, duration, init=False, content=True, available_at=epoch_start, range=None, stream=False): | |
self.url = url | |
self.duration = duration | |
self.init = init | |
self.content = content | |
self.available_at = available_at | |
self.range = range | |
+ self.stream = stream | |
def datetime_to_seconds(dt): | |
@@ -692,8 +693,14 @@ class Representation(MPDNode): | |
for segment in segmentList.segments: | |
yield segment | |
elif segmentBase: | |
- for segment in segmentBase.segments(**kwargs): | |
- yield segment | |
+ # for segment in segmentBase.segments(**kwargs): | |
+ # yield segment | |
+ if segmentBase.initialization: | |
+ offset, length = segmentBase.initialization.range | |
+ else: | |
+ offset, length = segmentBase.index_range | |
+ yield Segment(self.base_url, 0, init=True, content=False, range=(offset, length)) | |
+ yield Segment(self.base_url, 0, init=False, content=True, range=(offset + length, 0), stream=True) | |
else: | |
yield Segment(self.base_url, 0, True, True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment