Skip to content

Instantly share code, notes, and snippets.

@pmrowla
Last active January 24, 2019 07:30
Show Gist options
  • Save pmrowla/842c696b32fa0387e302be290066cc28 to your computer and use it in GitHub Desktop.
Save pmrowla/842c696b32fa0387e302be290066cc28 to your computer and use it in GitHub Desktop.
segmentBase with stream=True
import itertools
import logging
import datetime
import os.path
import requests
from streamlink import StreamError, PluginError
from streamlink.compat import urlparse, urlunparse
from streamlink.stream.http import valid_args, normalize_key
from streamlink.stream.stream import Stream
from streamlink.stream.dash_manifest import MPD, sleeper, sleep_until, utc, freeze_timeline
from streamlink.stream.ffmpegmux import FFMPEGMuxer
from streamlink.stream.segmented import SegmentedStreamReader, SegmentedStreamWorker, SegmentedStreamWriter
from streamlink.utils import parse_xml
from streamlink.utils.l10n import Language
log = logging.getLogger(__name__)
class DASHStreamWriter(SegmentedStreamWriter):
def __init__(self, reader, *args, **kwargs):
options = reader.stream.session.options
kwargs["retries"] = options.get("dash-segment-attempts")
kwargs["threads"] = options.get("dash-segment-threads")
kwargs["timeout"] = options.get("dash-segment-timeout")
SegmentedStreamWriter.__init__(self, reader, *args, **kwargs)
def fetch(self, segment, retries=None):
if self.closed or not retries:
return
try:
headers = {}
now = datetime.datetime.now(tz=utc)
if segment.available_at > now:
time_to_wait = (segment.available_at - now).total_seconds()
fname = os.path.basename(urlparse(segment.url).path)
log.debug("Waiting for segment: {fname} ({wait:.01f}s)".format(fname=fname, wait=time_to_wait))
sleep_until(segment.available_at)
if segment.range:
start, length = segment.range
if length:
end = start + length - 1
else:
end = ""
headers["Range"] = "bytes={0}-{1}".format(start, end)
return self.session.http.get(segment.url,
stream=segment.stream,
timeout=self.timeout,
exception=StreamError,
headers=headers)
except StreamError as err:
log.error("Failed to open segment {0}: {1}", segment.url, err)
return self.fetch(segment, retries - 1)
def write(self, segment, res, chunk_size=8192):
for chunk in res.iter_content(chunk_size):
if not self.closed:
self.reader.buffer.write(chunk)
else:
log.warning("Download of segment: {} aborted".format(segment.url))
return
log.debug("Download of segment: {} complete".format(segment.url))
res.close()
class DASHStreamWorker(SegmentedStreamWorker):
def __init__(self, *args, **kwargs):
SegmentedStreamWorker.__init__(self, *args, **kwargs)
self.mpd = self.stream.mpd
self.period = self.stream.period
@staticmethod
def get_representation(mpd, representation_id, mime_type):
for aset in mpd.periods[0].adaptationSets:
for rep in aset.representations:
if rep.id == representation_id and rep.mimeType == mime_type:
return rep
def iter_segments(self):
init = True
back_off_factor = 1
while not self.closed:
# find the representation by ID
representation = self.get_representation(self.mpd, self.reader.representation_id, self.reader.mime_type)
refresh_wait = max(self.mpd.minimumUpdatePeriod.total_seconds(),
self.mpd.periods[0].duration.total_seconds()) or 5
with sleeper(refresh_wait * back_off_factor):
if representation:
for segment in representation.segments(init=init, http=self.session.http):
if self.closed:
break
yield segment
# log.debug("Adding segment {0} to queue", segment.url)
if self.mpd.type == "dynamic":
if not self.reload():
back_off_factor = max(back_off_factor * 1.3, 10.0)
else:
back_off_factor = 1
else:
return
init = False
def reload(self):
if self.closed:
return
self.reader.buffer.wait_free()
log.debug("Reloading manifest ({0}:{1})".format(self.reader.representation_id, self.reader.mime_type))
res = self.session.http.get(self.mpd.url, exception=StreamError)
new_mpd = MPD(self.session.http.xml(res, ignore_ns=True),
base_url=self.mpd.base_url,
url=self.mpd.url,
timelines=self.mpd.timelines)
new_rep = self.get_representation(new_mpd, self.reader.representation_id, self.reader.mime_type)
with freeze_timeline(new_mpd):
changed = len(list(itertools.islice(new_rep.segments(), 1))) > 0
if changed:
self.mpd = new_mpd
return changed
class DASHStreamReader(SegmentedStreamReader):
__worker__ = DASHStreamWorker
__writer__ = DASHStreamWriter
def __init__(self, stream, representation_id, mime_type, *args, **kwargs):
SegmentedStreamReader.__init__(self, stream, *args, **kwargs)
self.mime_type = mime_type
self.representation_id = representation_id
log.debug("Opening DASH reader for: {0} ({1})".format(self.representation_id, self.mime_type))
class DASHStream(Stream):
__shortname__ = "dash"
def __init__(self,
session,
mpd,
video_representation=None,
audio_representation=None,
period=0,
**args):
super(DASHStream, self).__init__(session)
self.mpd = mpd
self.video_representation = video_representation
self.audio_representation = audio_representation
self.period = period
self.args = args
def __json__(self):
req = requests.Request(method="GET", url=self.mpd.url, **valid_args(self.args))
req = req.prepare()
headers = dict(map(normalize_key, req.headers.items()))
return dict(type=type(self).shortname(), url=req.url, headers=headers)
@classmethod
def parse_manifest(cls, session, url_or_manifest, **args):
"""
Attempt to parse a DASH manifest file and return its streams
:param session: Streamlink session instance
:param url_or_manifest: URL of the manifest file or an XML manifest string
:return: a dict of name -> DASHStream instances
"""
ret = {}
if url_or_manifest.startswith('<?xml'):
mpd = MPD(parse_xml(url_or_manifest, ignore_ns=True))
else:
res = session.http.get(url_or_manifest, **args)
url = res.url
urlp = list(urlparse(url))
urlp[2], _ = urlp[2].rsplit("/", 1)
mpd = MPD(session.http.xml(res, ignore_ns=True), base_url=urlunparse(urlp), url=url)
video, audio = [], []
# Search for suitable video and audio representations
for aset in mpd.periods[0].adaptationSets:
if aset.contentProtection:
raise PluginError("{} is protected by DRM".format(url))
for rep in aset.representations:
if rep.mimeType.startswith("video"):
video.append(rep)
elif rep.mimeType.startswith("audio"):
audio.append(rep)
if not video:
video = [None]
if not audio:
audio = [None]
locale = session.localization
locale_lang = locale.language
lang = None
available_languages = set()
# if the locale is explicitly set, prefer that language over others
for aud in audio:
if aud and aud.lang:
available_languages.add(aud.lang)
try:
if locale.explicit and aud.lang and Language.get(aud.lang) == locale_lang:
lang = aud.lang
except LookupError:
continue
if not lang:
# filter by the first language that appears
lang = audio[0] and audio[0].lang
log.debug("Available languages for DASH audio streams: {0} (using: {1})".format(", ".join(available_languages) or "NONE", lang or "n/a"))
# if the language is given by the stream, filter out other languages that do not match
if len(available_languages) > 1:
audio = list(filter(lambda a: a.lang is None or a.lang == lang, audio))
for vid, aud in itertools.product(video, audio):
stream = DASHStream(session, mpd, vid, aud, **args)
stream_name = []
if vid:
stream_name.append("{:0.0f}{}".format(vid.height or vid.bandwidth_rounded, "p" if vid.height else "k"))
if audio and len(audio) > 1:
stream_name.append("a{:0.0f}k".format(aud.bandwidth))
ret['+'.join(stream_name)] = stream
return ret
def open(self):
if self.video_representation:
video = DASHStreamReader(self, self.video_representation.id, self.video_representation.mimeType)
video.open()
if self.audio_representation:
audio = DASHStreamReader(self, self.audio_representation.id, self.audio_representation.mimeType)
audio.open()
if self.video_representation and self.audio_representation:
return FFMPEGMuxer(self.session, video, audio, copyts=True).open()
elif self.video_representation:
return video
elif self.audio_representation:
return audio
def to_url(self):
return self.mpd.url
from __future__ import unicode_literals
import copy
import logging
import datetime
import re
import time
from collections import defaultdict, namedtuple
from itertools import repeat, count
import math
from isodate import parse_datetime, parse_duration, Duration
from contextlib import contextmanager
from streamlink.compat import urlparse, urljoin, urlunparse, izip, urlsplit, urlunsplit
from pymp4.exceptions import BoxNotFound
from pymp4.parser import MP4
from pymp4.util import BoxUtil
if hasattr(datetime, "timezone"):
utc = datetime.timezone.utc
else:
class UTC(datetime.tzinfo):
def utcoffset(self, dt):
return datetime.timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return datetime.timedelta(0)
utc = UTC()
log = logging.getLogger(__name__)
epoch_start = datetime.datetime(1970, 1, 1, tzinfo=utc)
class Segment(object):
def __init__(self, url, duration, init=False, content=True, available_at=epoch_start, range=None, stream=False):
self.url = url
self.duration = duration
self.init = init
self.content = content
self.available_at = available_at
self.range = range
self.stream = stream
def datetime_to_seconds(dt):
return (dt - epoch_start).total_seconds()
def count_dt(firstval=datetime.datetime.now(tz=utc), step=datetime.timedelta(seconds=1)):
x = firstval
while True:
yield x
x += step
@contextmanager
def freeze_timeline(mpd):
timelines = copy.copy(mpd.timelines)
yield
mpd.timelines = timelines
@contextmanager
def sleeper(duration):
s = time.time()
yield
time_to_sleep = duration - (time.time() - s)
if time_to_sleep > 0:
time.sleep(time_to_sleep)
def sleep_until(walltime):
c = datetime.datetime.now(tz=utc)
time_to_wait = (walltime - c).total_seconds()
if time_to_wait > 0:
time.sleep(time_to_wait)
class MPDParsers(object):
@staticmethod
def bool_str(v):
return v.lower() == "true"
@staticmethod
def type(type_):
if type_ not in (u"static", u"dynamic"):
raise MPDParsingError("@type must be static or dynamic")
return type_
@staticmethod
def duration(duration):
return parse_duration(duration)
@staticmethod
def datetime(dt):
return parse_datetime(dt).replace(tzinfo=utc)
@staticmethod
def segment_template(url_template):
end = 0
res = ""
for m in re.compile(r"(.*?)\$(\w+)(?:%([\w.]+))?\$").finditer(url_template):
_, end = m.span()
res += "{0}{{{1}{2}}}".format(m.group(1),
m.group(2),
(":" + m.group(3)) if m.group(3) else "")
return (res + url_template[end:]).format
@staticmethod
def frame_rate(frame_rate):
if "/" in frame_rate:
a, b = frame_rate.split("/")
return float(a) / float(b)
else:
return float(frame_rate)
@staticmethod
def timedelta(timescale=1):
def _timedelta(seconds):
return datetime.timedelta(seconds=int(float(seconds) / float(timescale)))
return _timedelta
@staticmethod
def range(range_spec):
r = range_spec.split("-")
if len(r) != 2:
raise MPDParsingError("invalid byte-range-spec")
start, end = int(r[0]), r[1] and int(r[1]) or None
return start, end and ((end-start) + 1)
class MPDParsingError(Exception):
pass
class MPDNode(object):
__tag__ = None
def __init__(self, node, root=None, parent=None, *args, **kwargs):
self.node = node
self.root = root
self.parent = parent
self._base_url = kwargs.get(u"base_url")
self.attributes = set([])
if self.__tag__ and self.node.tag.lower() != self.__tag__.lower():
raise MPDParsingError("root tag did not match the expected tag: {}".format(self.__tag__))
@property
def attrib(self):
return self.node.attrib
@property
def text(self):
return self.node.text
def __str__(self):
return "<{tag} {attrs}>".format(
tag=self.__tag__,
attrs=" ".join("@{}={}".format(attr, getattr(self, attr)) for attr in self.attributes)
)
def attr(self, key, default=None, parser=None, required=False, inherited=False):
self.attributes.add(key)
if key in self.attrib:
value = self.attrib.get(key)
if parser and callable(parser):
return parser(value)
else:
return value
elif inherited:
if self.parent and hasattr(self.parent, key) and getattr(self.parent, key):
return getattr(self.parent, key)
if required:
raise MPDParsingError("could not find required attribute {tag}@{attr} ".format(attr=key, tag=self.__tag__))
else:
return default
def children(self, cls, minimum=0, maximum=None):
children = self.node.findall(cls.__tag__)
if len(children) < minimum or (maximum and len(children) > maximum):
raise MPDParsingError("expected to find {}/{} required [{}..{})".format(
self.__tag__, cls.__tag__, minimum, maximum or "unbound"))
return list(map(lambda x: cls(x[1], root=self.root, parent=self, i=x[0], base_url=self.base_url),
enumerate(children)))
def only_child(self, cls, minimum=0):
children = self.children(cls, minimum=minimum, maximum=1)
return children[0] if len(children) else None
def walk_back(self, cls=None, f=lambda x: x):
node = self.parent
while node:
if cls is None or cls.__tag__ == node.__tag__:
yield f(node)
node = node.parent
def walk_back_get_attr(self, attr):
parent_attrs = [getattr(n, attr) for n in self.walk_back() if hasattr(n, attr)]
return parent_attrs[0] if len(parent_attrs) else None
@property
def base_url(self):
base_url = self._base_url
if hasattr(self, "baseURLs") and len(self.baseURLs):
base_url = BaseURL.join(base_url, self.baseURLs[0].url)
return base_url
class MPD(MPDNode):
"""
Represents the MPD as a whole
Should validate the XML input and provide methods to get segment URLs for each Period, AdaptationSet and
Representation.
"""
__tag__ = u"MPD"
def __init__(self, node, root=None, parent=None, url=None, *args, **kwargs):
# top level has no parent
super(MPD, self).__init__(node, root=self, *args, **kwargs)
# parser attributes
self.url = url
self.timelines = defaultdict(lambda: -1)
self.timelines.update(kwargs.pop("timelines", {}))
self.id = self.attr(u"id")
self.profiles = self.attr(u"profiles", required=True)
self.type = self.attr(u"type", default=u"static", parser=MPDParsers.type)
self.minimumUpdatePeriod = self.attr(u"minimumUpdatePeriod", parser=MPDParsers.duration, default=Duration())
self.minBufferTime = self.attr(u"minBufferTime", parser=MPDParsers.duration, required=True)
self.timeShiftBufferDepth = self.attr(u"timeShiftBufferDepth", parser=MPDParsers.duration)
self.availabilityStartTime = self.attr(u"availabilityStartTime", parser=MPDParsers.datetime,
default=datetime.datetime.fromtimestamp(0, utc), # earliest date
required=self.type == "dynamic")
self.publishTime = self.attr(u"publishTime", parser=MPDParsers.datetime, required=self.type == "dynamic")
self.availabilityEndTime = self.attr(u"availabilityEndTime", parser=MPDParsers.datetime)
self.mediaPresentationDuration = self.attr(u"mediaPresentationDuration", parser=MPDParsers.duration)
self.suggestedPresentationDelay = self.attr(u"suggestedPresentationDelay", parser=MPDParsers.duration)
# parse children
location = self.children(Location)
self.location = location[0] if location else None
if self.location:
self.url = self.location.text
urlp = list(urlparse(self.url))
if urlp[2]:
urlp[2], _ = urlp[2].rsplit("/", 1)
self._base_url = urlunparse(urlp)
self.baseURLs = self.children(BaseURL)
self.periods = self.children(Period, minimum=1)
self.programInformation = self.children(ProgramInformation)
class ProgramInformation(MPDNode):
__tag__ = "ProgramInformation"
class BaseURL(MPDNode):
__tag__ = "BaseURL"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(BaseURL, self).__init__(node, root, parent, *args, **kwargs)
self.url = self.node.text.strip()
@property
def is_absolute(self):
return urlparse(self.url).scheme
@staticmethod
def join(url, other):
# if the other URL is an absolute url, then return that
if urlparse(other).scheme:
return other
elif url:
parts = list(urlsplit(url))
if not parts[2].endswith("/"):
parts[2] += "/"
url = urlunsplit(parts)
return urljoin(url, other)
else:
return other
class Location(MPDNode):
__tag__ = "Location"
class Period(MPDNode):
__tag__ = u"Period"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(Period, self).__init__(node, root, parent, *args, **kwargs)
self.i = kwargs.get(u"i", 0)
self.id = self.attr(u"id")
self.bitstreamSwitching = self.attr(u"bitstreamSwitching", parser=MPDParsers.bool_str)
self.duration = self.attr(u"duration", default=Duration(), parser=MPDParsers.duration)
self.start = self.attr(u"start", default=Duration(), parser=MPDParsers.duration)
if self.start is None and self.i == 0 and self.root.type == "static":
self.start = 0
# TODO: Early Access Periods
self.baseURLs = self.children(BaseURL)
self.segmentBase = self.only_child(SegmentBase)
self.adaptationSets = self.children(AdaptationSet, minimum=1)
self.segmentList = self.only_child(SegmentList)
self.segmentTemplate = self.only_child(SegmentTemplate)
self.sssetIdentifier = self.only_child(AssetIdentifier)
self.eventStream = self.children(EventStream)
self.subset = self.children(Subset)
class SegmentBase(MPDNode):
__tag__ = "SegmentBase"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(SegmentBase, self).__init__(node, root, parent, *args, **kwargs)
self.timescale = self.attr("timescale", parser=int)
self.presentation_time_offset = self.attr("presentationTimeOffset")
self.index_range = self.attr("indexRange", parser=MPDParsers.range)
self.index_range_exact = self.attr("indexRangeExact", parser=MPDParsers.bool_str, default=False)
self.availability_time_offset = self.attr("availabilityTimeOffset", parser=int)
self.availability_time_complete = self.attr("availabilityTimeComplete", parser=MPDParsers.bool_str)
self.initialization = self.only_child(Initialization)
self.representation_index = self.only_child(RepresentationIndex)
self.period = list(self.walk_back(Period))[0]
self.sidx = None
self.sidx_offset = 0
def init_sidx(self, http):
# Look for sidx existence in the initialization range, and the first
# box following the initialization range
if self.initialization:
offset, length = self.initialization.range
else:
offset, length = self.index_range
headers = {"Range": "bytes={0}-{1}".format(offset, offset + length - 1)}
res = http.get(self.base_url, headers=headers)
for box in MP4.parse(res.content):
try:
sidx = BoxUtil.first(box, b'sidx')
except BoxNotFound:
sidx = None
if sidx:
break
if not sidx:
data = []
offset += length
length = 1500
mp4 = None
while not mp4:
headers = {"Range": "bytes={0}-{1}".format(offset, offset + length - 1)}
res = http.get(self.base_url, headers=headers)
data.append(res.content)
mp4 = MP4.parse(b''.join(data))
offset += length
for box in mp4:
try:
sidx = BoxUtil.first(box, b'sidx')
except BoxNotFound:
sidx = None
if sidx:
break
self.sidx = sidx
def segments(self, **kwargs):
http = kwargs.pop("http", None)
if http:
self.init_sidx(http)
if self.sidx:
# Segmented MP4
yield Segment(self.base_url, 0, init=True, content=False, range=(0, self.sidx.end))
offset = self.sidx.end + self.sidx.first_offset
for ref in self.sidx.references:
length = ref.referenced_size
duration = ref.segment_duration / self.sidx.timescale
yield Segment(self.base_url, duration, init=False, content=True, range=(offset, length))
offset += length
else:
# Non-segmented MP4
yield Segment(self.base_url, 0, init=True, content=True)
class AssetIdentifier(MPDNode):
__tag__ = "AssetIdentifier"
class Subset(MPDNode):
__tag__ = "Subset"
class EventStream(MPDNode):
__tag__ = "EventStream"
class Initialization(MPDNode):
__tag__ = "Initialization"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(Initialization, self).__init__(node, root, parent, *args, **kwargs)
self.source_url = self.attr("sourceURL")
self.range = self.attr("range", parser=MPDParsers.range)
class SegmentURL(MPDNode):
__tag__ = "SegmentURL"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(SegmentURL, self).__init__(node, root, parent, *args, **kwargs)
self.media = self.attr("media")
self.media_range = self.attr("mediaRange", parser=MPDParsers.range)
class SegmentList(MPDNode):
__tag__ = "SegmentList"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(SegmentList, self).__init__(node, root, parent, *args, **kwargs)
self.presentation_time_offset = self.attr("presentationTimeOffset")
self.timescale = self.attr("timescale", parser=int)
self.duration = self.attr("duration", parser=int)
self.start_number = self.attr("startNumber", parser=int, default=1)
if self.duration:
self.duration_seconds = self.duration / float(self.timescale)
else:
self.duration_seconds = None
self.initialization = self.only_child(Initialization)
self.segment_urls = self.children(SegmentURL, minimum=1)
@property
def segments(self):
if self.initialization:
yield Segment(self.make_url(self.initialization.source_url), 0, init=True, content=False)
for n, segment_url in enumerate(self.segment_urls, self.start_number):
yield Segment(self.make_url(segment_url.media), self.duration_seconds, range=segment_url.media_range)
def make_url(self, url):
return BaseURL.join(self.base_url, url)
class AdaptationSet(MPDNode):
__tag__ = u"AdaptationSet"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(AdaptationSet, self).__init__(node, root, parent, *args, **kwargs)
self.id = self.attr(u"id")
self.group = self.attr(u"group")
self.mimeType = self.attr(u"mimeType")
self.lang = self.attr(u"lang")
self.contentType = self.attr(u"contentType")
self.par = self.attr(u"par")
self.minBandwidth = self.attr(u"minBandwidth")
self.maxBandwidth = self.attr(u"maxBandwidth")
self.minWidth = self.attr(u"minWidth", parser=int)
self.maxWidth = self.attr(u"maxWidth", parser=int)
self.minHeight = self.attr(u"minHeight", parser=int)
self.maxHeight = self.attr(u"maxHeight", parser=int)
self.minFrameRate = self.attr(u"minFrameRate", parser=MPDParsers.frame_rate)
self.maxFrameRate = self.attr(u"maxFrameRate", parser=MPDParsers.frame_rate)
self.segmentAlignment = self.attr(u"segmentAlignment", default=False, parser=MPDParsers.bool_str)
self.bitstreamSwitching = self.attr(u"bitstreamSwitching", parser=MPDParsers.bool_str)
self.subsegmentAlignment = self.attr(u"subsegmentAlignment", default=False, parser=MPDParsers.bool_str)
self.subsegmentStartsWithSAP = self.attr(u"subsegmentStartsWithSAP", default=0, parser=int)
self.baseURLs = self.children(BaseURL)
self.segmentTemplate = self.only_child(SegmentTemplate)
self.representations = self.children(Representation, minimum=1)
self.contentProtection = self.children(ContentProtection)
class SegmentTemplate(MPDNode):
__tag__ = "SegmentTemplate"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(SegmentTemplate, self).__init__(node, root, parent, *args, **kwargs)
self.defaultSegmentTemplate = self.walk_back_get_attr('segmentTemplate')
self.initialization = self.attr(u"initialization", parser=MPDParsers.segment_template)
self.media = self.attr(u"media", parser=MPDParsers.segment_template)
self.duration = self.attr(u"duration", parser=int,
default=self.defaultSegmentTemplate.duration if self.defaultSegmentTemplate else None)
self.timescale = self.attr(u"timescale", parser=int,
default=self.defaultSegmentTemplate.timescale if self.defaultSegmentTemplate else 1)
self.startNumber = self.attr(u"startNumber", parser=int,
default=self.defaultSegmentTemplate.startNumber if self.defaultSegmentTemplate else 1)
self.presentationTimeOffset = self.attr(u"presentationTimeOffset", parser=MPDParsers.timedelta(self.timescale))
if self.duration:
self.duration_seconds = self.duration / float(self.timescale)
else:
self.duration_seconds = None
self.period = list(self.walk_back(Period))[0]
# children
self.segmentTimeline = self.only_child(SegmentTimeline)
def segments(self, **kwargs):
if kwargs.pop("init", True):
init_url = self.format_initialization(**kwargs)
if init_url:
yield Segment(init_url, 0, True, False)
for media_url, available_at in self.format_media(**kwargs):
yield Segment(media_url, self.duration_seconds, False, True, available_at)
def make_url(self, url):
"""
Join the URL with the base URL, unless it's an absolute URL
:param url: maybe relative URL
:return: joined URL
"""
return BaseURL.join(self.base_url, url)
def format_initialization(self, **kwargs):
if self.initialization:
return self.make_url(self.initialization(**kwargs))
def segment_numbers(self):
"""
yield the segment number and when it will be available
There are two cases for segment number generation, static and dynamic.
In the case of static stream, the segment number starts at the startNumber and counts
up to the number of segments that are represented by the periods duration.
In the case of dynamic streams, the segments should appear at the specified time
in the simplest case the segment number is based on the time since the availabilityStartTime
:return:
"""
log.debug("Generating segment numbers for {0} playlist (id={1})".format(self.root.type, self.parent.id))
if self.root.type == u"static":
available_iter = repeat(epoch_start)
duration = self.period.duration.seconds or self.root.mediaPresentationDuration.seconds
if duration:
number_iter = range(self.startNumber, int(duration / self.duration_seconds) + 1)
else:
number_iter = count(self.startNumber)
else:
now = datetime.datetime.now(utc)
if self.presentationTimeOffset:
since_start = (now - self.presentationTimeOffset) - self.root.availabilityStartTime
available_start_date = self.root.availabilityStartTime + self.presentationTimeOffset + since_start
available_start = available_start_date
else:
since_start = now - self.root.availabilityStartTime
available_start = now
# if there is no delay, use a delay of 3 seconds
suggested_delay = datetime.timedelta(seconds=(self.root.suggestedPresentationDelay.total_seconds()
if self.root.suggestedPresentationDelay
else 3))
# the number of the segment that is available at NOW - SUGGESTED_DELAY - BUFFER_TIME
number_iter = count(self.startNumber +
int((since_start - suggested_delay - self.root.minBufferTime).total_seconds() /
self.duration_seconds))
# the time the segment number is available at NOW
available_iter = count_dt(available_start,
step=datetime.timedelta(seconds=self.duration_seconds))
for number, available_at in izip(number_iter, available_iter):
yield number, available_at
def format_media(self, **kwargs):
if self.segmentTimeline:
if self.parent.id is None:
# workaround for invalid `self.root.timelines[self.parent.id]`
# creates a timeline for every mimeType instead of one for both
self.parent.id = self.parent.mimeType
log.debug("Generating segment timeline for {0} playlist (id={1}))".format(self.root.type, self.parent.id))
if self.root.type == "dynamic":
# if there is no delay, use a delay of 3 seconds
suggested_delay = datetime.timedelta(seconds=(self.root.suggestedPresentationDelay.total_seconds()
if self.root.suggestedPresentationDelay
else 3))
publish_time = self.root.publishTime or epoch_start
# transform the time line in to a segment list
timeline = []
available_at = publish_time
for segment, n in reversed(list(zip(self.segmentTimeline.segments, count(self.startNumber)))):
# the last segment in the timeline is the most recent
# so, work backwards and calculate when each of the segments was
# available, based on the durations relative to the publish time
url = self.make_url(self.media(Time=segment.t, Number=n, **kwargs))
duration = datetime.timedelta(seconds=segment.d / self.timescale)
# once the suggested_delay is reach stop
if self.root.timelines[self.parent.id] == -1 and publish_time - available_at >= suggested_delay:
break
timeline.append((url, available_at, segment.t))
available_at -= duration # walk backwards in time
# return the segments in chronological order
for url, available_at, t in reversed(timeline):
if t > self.root.timelines[self.parent.id]:
self.root.timelines[self.parent.id] = t
yield (url, available_at)
else:
for segment, n in zip(self.segmentTimeline.segments, count(self.startNumber)):
yield (self.make_url(self.media(Time=segment.t, Number=n, **kwargs)),
datetime.datetime.now(tz=utc))
else:
for number, available_at in self.segment_numbers():
yield (self.make_url(self.media(Number=number, **kwargs)),
available_at)
class Representation(MPDNode):
__tag__ = u"Representation"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(Representation, self).__init__(node, root, parent, *args, **kwargs)
self.id = self.attr(u"id", required=True)
self.bandwidth = self.attr(u"bandwidth", parser=lambda b: float(b) / 1000.0, required=True)
self.mimeType = self.attr(u"mimeType", required=True, inherited=True)
self.codecs = self.attr(u"codecs")
self.startWithSAP = self.attr(u"startWithSAP")
# video
self.width = self.attr(u"width", parser=int)
self.height = self.attr(u"height", parser=int)
self.frameRate = self.attr(u"frameRate", parser=MPDParsers.frame_rate)
# audio
self.audioSamplingRate = self.attr(u"audioSamplingRate", parser=int)
self.numChannels = self.attr(u"numChannels", parser=int)
# subtitle
self.lang = self.attr(u"lang", inherited=True)
self.baseURLs = self.children(BaseURL)
self.subRepresentation = self.children(SubRepresentation)
self.segmentBase = self.only_child(SegmentBase)
self.segmentList = self.children(SegmentList)
self.segmentTemplate = self.only_child(SegmentTemplate)
@property
def bandwidth_rounded(self):
return round(self.bandwidth, 1 - int(math.log10(self.bandwidth)))
def segments(self, **kwargs):
"""
Segments are yielded when they are available
Segments appear on a time line, for dynamic content they are only available at a certain time
and sometimes for a limited time. For static content they are all available at the same time.
:param kwargs: extra args to pass to the segment template
:return: yields Segments
"""
segmentBase = self.segmentBase or self.walk_back_get_attr("segmentBase")
segmentLists = self.segmentList or self.walk_back_get_attr("segmentList")
segmentTemplate = self.segmentTemplate or self.walk_back_get_attr("segmentTemplate")
if segmentTemplate:
for segment in segmentTemplate.segments(RepresentationID=self.id,
Bandwidth=int(self.bandwidth * 1000),
**kwargs):
if segment.init:
yield segment
else:
yield segment
elif segmentLists:
for segmentList in segmentLists:
for segment in segmentList.segments:
yield segment
elif segmentBase:
# for segment in segmentBase.segments(**kwargs):
# yield segment
if segmentBase.initialization:
offset, length = segmentBase.initialization.range
else:
offset, length = segmentBase.index_range
yield Segment(self.base_url, 0, init=True, content=False, range=(offset, length))
yield Segment(self.base_url, 0, init=False, content=True, range=(offset + length, 0), stream=True)
else:
yield Segment(self.base_url, 0, True, True)
class SubRepresentation(MPDNode):
__tag__ = "SubRepresentation"
class RepresentationIndex(MPDNode):
__tag__ = "RepresentationIndex"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(RepresentationIndex, self).__init__(node, root, parent, *args, **kwargs)
self.source_url = self.attr("sourceURL")
self.range = self.attr("range", parser=MPDParsers.range)
class SegmentTimeline(MPDNode):
__tag__ = "SegmentTimeline"
TimelineSegment = namedtuple("TimelineSegment", "t d")
def __init__(self, node, *args, **kwargs):
super(SegmentTimeline, self).__init__(node, *args, **kwargs)
self.timescale = self.walk_back_get_attr("timescale")
self.timeline_segments = self.children(_TimelineSegment)
@property
def segments(self):
t = 0
for tsegment in self.timeline_segments:
if t == 0 and tsegment.t is not None:
t = tsegment.t
# check the start time from MPD
for repeated_i in range(tsegment.r + 1):
yield self.TimelineSegment(t, tsegment.d)
t += tsegment.d
class _TimelineSegment(MPDNode):
__tag__ = "S"
def __init__(self, node, *args, **kwargs):
super(_TimelineSegment, self).__init__(node, *args, **kwargs)
self.t = self.attr("t", parser=int)
self.d = self.attr("d", parser=int)
self.r = self.attr("r", parser=int, default=0)
class ContentProtection(MPDNode):
__tag__ = "ContentProtection"
def __init__(self, node, root=None, parent=None, *args, **kwargs):
super(ContentProtection, self).__init__(node, root, parent, *args, **kwargs)
self.schemeIdUri = self.attr(u"schemeIdUri")
self.value = self.attr(u"value")
self.default_KID = self.attr(u"default_KID")
diff --git a/src/streamlink/stream/dash.py b/src/streamlink/stream/dash.py
index 3885e9ba..3b15f6c9 100644
--- a/src/streamlink/stream/dash.py
+++ b/src/streamlink/stream/dash.py
@@ -47,6 +47,7 @@ class DASHStreamWriter(SegmentedStreamWriter):
headers["Range"] = "bytes={0}-{1}".format(start, end)
return self.session.http.get(segment.url,
+ stream=segment.stream,
timeout=self.timeout,
exception=StreamError,
headers=headers)
@@ -63,6 +64,7 @@ class DASHStreamWriter(SegmentedStreamWriter):
return
log.debug("Download of segment: {} complete".format(segment.url))
+ res.close()
class DASHStreamWorker(SegmentedStreamWorker):
diff --git a/src/streamlink/stream/dash_manifest.py b/src/streamlink/stream/dash_manifest.py
index 01185df2..0caf64c1 100644
--- a/src/streamlink/stream/dash_manifest.py
+++ b/src/streamlink/stream/dash_manifest.py
@@ -38,13 +38,14 @@ epoch_start = datetime.datetime(1970, 1, 1, tzinfo=utc)
class Segment(object):
- def __init__(self, url, duration, init=False, content=True, available_at=epoch_start, range=None):
+ def __init__(self, url, duration, init=False, content=True, available_at=epoch_start, range=None, stream=False):
self.url = url
self.duration = duration
self.init = init
self.content = content
self.available_at = available_at
self.range = range
+ self.stream = stream
def datetime_to_seconds(dt):
@@ -692,8 +693,14 @@ class Representation(MPDNode):
for segment in segmentList.segments:
yield segment
elif segmentBase:
- for segment in segmentBase.segments(**kwargs):
- yield segment
+ # for segment in segmentBase.segments(**kwargs):
+ # yield segment
+ if segmentBase.initialization:
+ offset, length = segmentBase.initialization.range
+ else:
+ offset, length = segmentBase.index_range
+ yield Segment(self.base_url, 0, init=True, content=False, range=(offset, length))
+ yield Segment(self.base_url, 0, init=False, content=True, range=(offset + length, 0), stream=True)
else:
yield Segment(self.base_url, 0, True, True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment