Created
February 3, 2019 05:06
-
-
Save hunterjm/c499b9bffb71ab53ce1976b757179c9f to your computer and use it in GitHub Desktop.
Very dirty POC for HLS streams in Home Assistant
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Support for HLS camera streaming. | |
For more details about this platform, please refer to the documentation at | |
https://home-assistant.io/components/camera.hls/ | |
""" | |
import asyncio | |
from contextlib import suppress | |
import logging | |
import io | |
import threading | |
import math | |
import datetime | |
import attr | |
from aiohttp import web | |
import async_timeout | |
import voluptuous as vol | |
from homeassistant.const import CONF_SOURCE, CONF_NAME, EVENT_HOMEASSISTANT_STOP | |
from homeassistant.components.camera import Camera, CameraView, PLATFORM_SCHEMA, DOMAIN | |
import homeassistant.helpers.config_validation as cv | |
from homeassistant.helpers.aiohttp_client import ( | |
async_aiohttp_proxy_stream) | |
_LOGGER = logging.getLogger(__name__) | |
REQUIREMENTS = ['av==6.1.2', 'pillow==5.3.0'] | |
DEFAULT_NAME = 'HLS' | |
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | |
vol.Required(CONF_SOURCE): cv.string, | |
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string, | |
}) | |
HLS_STREAM_URL = '/api/hls/{0}?token={1}' | |
async def async_setup_platform(hass, config, async_add_entities, | |
discovery_info=None): | |
"""Set up a HLS camera.""" | |
camera = HlsCamera(hass, config) | |
async_add_entities([camera]) | |
hass.http.register_view(CameraHlsManifestView(hass.data[DOMAIN], hass.config.api.base_url)) | |
hass.http.register_view(CameraHlsSegmentView(hass.data[DOMAIN])) | |
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, | |
lambda event: camera.stop()) | |
class HlsCamera(Camera): | |
"""An implementation of a HLS camera.""" | |
def __init__(self, hass, config): | |
"""Initialize a HLS camera.""" | |
super().__init__() | |
self._name = config.get(CONF_NAME) | |
self._source = config.get(CONF_SOURCE) | |
# Setup stream | |
options = {} | |
if self._source[:4] == 'rtsp': | |
options['rtsp_transport'] = 'tcp' | |
self._streamer = HlsStreamer(hass.loop, self._source, options=options) | |
@property | |
def supported_features(self): | |
"""Flag supported features.""" | |
return 2 | |
def stop(self): | |
self._streamer.stop() | |
async def async_camera_image(self): | |
"""Return a still image response from the camera.""" | |
img = await self._streamer.get_image() | |
result = io.BytesIO() | |
img.save(result, format='JPEG') | |
return result.getvalue() | |
@property | |
def name(self): | |
"""Return the name of this camera.""" | |
return self._name | |
class CameraHlsManifestView(CameraView): | |
"""Camera view to serve a M3U8 stream.""" | |
url = '/api/hls/{entity_id}' | |
name = 'api:camera:hls:manifest' | |
def __init__(self, component, base_url): | |
"""Initialize a basic camera view.""" | |
super().__init__(component) | |
self.base_url = base_url | |
async def handle(self, request, camera): | |
renderer = M3U8Renderer('http://localhost:8123', camera) | |
filename = '{}.m3u8'.format(camera.entity_id) | |
headers = { | |
'Access-Control-Allow-Origin': '*', | |
# 'Content-Type': 'application/vnd.apple.mpegurl', | |
'Content-Type': 'application/x-mpegURL', | |
'Content-Disposition': "inline; filename=\"{}\"".format(filename), | |
} | |
return web.Response(body=renderer.render(camera._streamer, datetime.datetime.utcnow()).encode("utf-8"), headers=headers) | |
class CameraHlsSegmentView(CameraView): | |
"""Camera view to serve a MPEG2TS segment.""" | |
url = '/api/hls/{entity_id}/segment/{sequence}' | |
name = 'api:camera:hls:segment' | |
async def get(self, request, entity_id, sequence): | |
return await super().get(request, entity_id) | |
async def handle(self, request, camera): | |
sequence = int(request.match_info['sequence']) | |
segment = camera._streamer.get_segment(sequence) | |
if not segment: | |
return web.HTTPNotFound() | |
headers = { | |
'Access-Control-Allow-Origin': '*', | |
'Accept-Ranges': 'bytes', | |
'Content-Type': 'video/mp2t', | |
'Content-Length': str(segment.segment.getbuffer().nbytes), | |
'Content-Disposition': "inline; filename=\"{}{}.ts\"".format(camera.entity_id, sequence), | |
} | |
return web.Response(body=segment.segment.getvalue(), headers=headers) | |
@attr.s | |
class HlsSegment: | |
"""Represent a MPEG2TS segment.""" | |
sequence = attr.ib(type=int) | |
segment = attr.ib(type=io.BytesIO) | |
duration = attr.ib(type=float) | |
def hls_worker(loop, streamer, quit_event): | |
# Make an output stream using the input as a template. This copies the stream | |
# setup from one to the other. | |
import av | |
in_stream = streamer.container.streams.video[0] | |
segment = io.BytesIO() | |
output = av.open(segment, mode='w', format='mpegts') | |
out_stream = output.add_stream(template=in_stream) | |
first_packet = True | |
sequence = 1 | |
while not quit_event.is_set(): | |
packet = next(streamer.container.demux(in_stream)) | |
# We need to skip the "flushing" packets that `demux` generates. | |
if packet.dts is None: | |
continue | |
# Mux on every keyframe | |
if packet.stream.type == 'video' and packet.is_keyframe and not first_packet: | |
segment_duration = (packet.pts * packet.time_base) / sequence | |
output.close() | |
asyncio.run_coroutine_threadsafe(streamer._put_segment(HlsSegment( | |
sequence, segment, segment_duration | |
)), loop) | |
segment = io.BytesIO() | |
output = av.open(segment, mode='w', format='mpegts') | |
out_stream = output.add_stream(template=in_stream) | |
sequence += 1 | |
# First packet tends to have a weird dts/pts | |
if first_packet: | |
packet.dts = 0 | |
packet.pts = 0 | |
first_packet = False | |
# We need to assign the packet to the new stream. | |
packet.stream = out_stream | |
output.mux(packet) | |
# Save packet so we can get image | |
if packet.stream.type == 'video' and packet.is_keyframe: | |
asyncio.run_coroutine_threadsafe(streamer._put_packet(bytes(packet)), loop) | |
class HlsStreamer: | |
""" | |
A media source that reads audio and/or video from a live stream. | |
Examples: | |
.. code-block:: python | |
# Open a video file. | |
player = HlsStreamer('/path/to/some.mp4') | |
# Open an HTTP stream. | |
player = HlsStreamer( | |
'http://download.tsi.telecom-paristech.fr/' | |
'gpac/dataset/dash/uhd/mux_sources/hevcds_720p30_2M.mp4') | |
# Open an RTSP stream. | |
player = HlsStreamer( | |
'rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov', options={ | |
'rtsp_transport': 'tcp', | |
}) | |
# Open webcam on Linux. | |
player = HlsStreamer('/dev/video0', format='v4l2', options={ | |
'video_size': '640x480' | |
}) | |
# Open webcam on OS X. | |
player = HlsStreamer('default:none', format='avfoundation', options={ | |
'video_size': '640x480' | |
}) | |
:param: file: The path to a file, or a file-like object. | |
:param: format: The format to use, defaults to autodect. | |
:param: options: Additional options to pass to FFmpeg. | |
""" | |
def __init__(self, loop, file, format=None, options={}): | |
import av | |
print("Opening File: %s" % file) | |
self.__loop = loop | |
self.__file = file | |
self.__container = av.open(file=file, format=format, mode='r', options=options) | |
self.__thread = None | |
self.__thread_quit = None | |
self.__cursor = None | |
self.__packet_cursor = None | |
self.__packet = None | |
self.__event = asyncio.Event() | |
self.__segments = [] | |
print("Container Format %s" % self.__container.format.name) | |
self.__video_codec = None | |
for stream in self.__container.streams: | |
print(stream) | |
if stream.type == 'video' and not self.__video_codec: | |
self.__video_codec = stream.codec_context | |
self._start() | |
@property | |
def container(self): | |
return self.__container | |
@property | |
def segments(self): | |
return [s.sequence for s in self.__segments] | |
@property | |
def target_duration(self): | |
durations = [s.duration for s in self.__segments] | |
return round(sum(durations) / len(self.__segments)) or 1 | |
async def get_image(self): | |
import av | |
if self.__cursor is None or self.__cursor >= self.__packet_cursor: | |
await self.__event.wait() | |
self.__cursor = self.__packet_cursor | |
packet = av.Packet(self.__packet) | |
frames = self.__video_codec.decode(packet) | |
if not len(frames): | |
return None | |
return frames[0].to_image() | |
def get_segment(self, sequence=None): | |
if not sequence: | |
return self.__segments | |
for segment in self.__segments: | |
if segment.sequence == sequence: | |
return segment | |
return None | |
async def _put_packet(self, packet): | |
if packet is None: | |
self.stop() | |
if self.__packet_cursor is None: | |
self.__packet_cursor = 0 | |
else: | |
self.__packet_cursor += 1 | |
self.__packet = packet | |
self.__event.set() | |
self.__event.clear() | |
async def _put_segment(self, segment): | |
self.__segments.append(segment) | |
if len(self.__segments) > 3: | |
self.__segments = self.__segments[-3:] | |
def _start(self): | |
if self.__thread is None: | |
self.__thread_quit = threading.Event() | |
self.__thread = threading.Thread( | |
name='hls-streamer', | |
target=hls_worker, | |
args=( | |
self.__loop, self, self.__thread_quit)) | |
self.__thread.start() | |
def stop(self): | |
if self.__thread is not None: | |
self.__thread_quit.set() | |
self.__thread.join() | |
self.__thread = None | |
class M3U8Renderer(object): | |
def __init__(self, root, camera, version=3, lookback=4): | |
self.root = root | |
self.camera = camera | |
self.target_duration = camera._streamer.target_duration | |
self.version = version | |
self.lookback = lookback | |
def render_format_identifier(self): | |
return ["#EXTM3U"] | |
def render_preamble(self): | |
return [ | |
"#EXT-X-VERSION:{}".format(self.version), | |
"#EXT-X-TARGETDURATION:{}".format(self.target_duration), | |
] | |
def render_segment(self, segment): | |
return [ | |
"#EXTINF:{:.04},".format(float(segment.duration)), | |
"/api/hls/{}/segment/{}?token={}".format( | |
self.camera.entity_id, | |
segment.sequence, self.camera.access_tokens[-1]), | |
] | |
def render_playlist(self, track, start_time): | |
segments = track.segments | |
if not segments: | |
return [] | |
rv = ["#EXT-X-MEDIA-SEQUENCE:{}".format(segments[0])] | |
for sequence in segments: | |
segment = track.get_segment(sequence) | |
rv.extend(self.render_segment(segment)) | |
return rv | |
def render(self, track, start_time): | |
lines = ( | |
self.render_format_identifier() + | |
self.render_preamble() + | |
self.render_playlist(track, start_time) | |
) | |
return "\n".join(lines) + "\n" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How to use?