Skip to content

Instantly share code, notes, and snippets.

@hunterjm
Created February 3, 2019 05:06
Show Gist options
  • Save hunterjm/c499b9bffb71ab53ce1976b757179c9f to your computer and use it in GitHub Desktop.
Save hunterjm/c499b9bffb71ab53ce1976b757179c9f to your computer and use it in GitHub Desktop.
Very dirty POC for HLS streams in Home Assistant
"""
Support for HLS camera streaming.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/camera.hls/
"""
import asyncio
from contextlib import suppress
import logging
import io
import threading
import math
import datetime
import attr
from aiohttp import web
import async_timeout
import voluptuous as vol
from homeassistant.const import CONF_SOURCE, CONF_NAME, EVENT_HOMEASSISTANT_STOP
from homeassistant.components.camera import Camera, CameraView, PLATFORM_SCHEMA, DOMAIN
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.aiohttp_client import (
async_aiohttp_proxy_stream)
_LOGGER = logging.getLogger(__name__)
REQUIREMENTS = ['av==6.1.2', 'pillow==5.3.0']
DEFAULT_NAME = 'HLS'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_SOURCE): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
})
HLS_STREAM_URL = '/api/hls/{0}?token={1}'
async def async_setup_platform(hass, config, async_add_entities,
discovery_info=None):
"""Set up a HLS camera."""
camera = HlsCamera(hass, config)
async_add_entities([camera])
hass.http.register_view(CameraHlsManifestView(hass.data[DOMAIN], hass.config.api.base_url))
hass.http.register_view(CameraHlsSegmentView(hass.data[DOMAIN]))
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP,
lambda event: camera.stop())
class HlsCamera(Camera):
"""An implementation of a HLS camera."""
def __init__(self, hass, config):
"""Initialize a HLS camera."""
super().__init__()
self._name = config.get(CONF_NAME)
self._source = config.get(CONF_SOURCE)
# Setup stream
options = {}
if self._source[:4] == 'rtsp':
options['rtsp_transport'] = 'tcp'
self._streamer = HlsStreamer(hass.loop, self._source, options=options)
@property
def supported_features(self):
"""Flag supported features."""
return 2
def stop(self):
self._streamer.stop()
async def async_camera_image(self):
"""Return a still image response from the camera."""
img = await self._streamer.get_image()
result = io.BytesIO()
img.save(result, format='JPEG')
return result.getvalue()
@property
def name(self):
"""Return the name of this camera."""
return self._name
class CameraHlsManifestView(CameraView):
"""Camera view to serve a M3U8 stream."""
url = '/api/hls/{entity_id}'
name = 'api:camera:hls:manifest'
def __init__(self, component, base_url):
"""Initialize a basic camera view."""
super().__init__(component)
self.base_url = base_url
async def handle(self, request, camera):
renderer = M3U8Renderer('http://localhost:8123', camera)
filename = '{}.m3u8'.format(camera.entity_id)
headers = {
'Access-Control-Allow-Origin': '*',
# 'Content-Type': 'application/vnd.apple.mpegurl',
'Content-Type': 'application/x-mpegURL',
'Content-Disposition': "inline; filename=\"{}\"".format(filename),
}
return web.Response(body=renderer.render(camera._streamer, datetime.datetime.utcnow()).encode("utf-8"), headers=headers)
class CameraHlsSegmentView(CameraView):
"""Camera view to serve a MPEG2TS segment."""
url = '/api/hls/{entity_id}/segment/{sequence}'
name = 'api:camera:hls:segment'
async def get(self, request, entity_id, sequence):
return await super().get(request, entity_id)
async def handle(self, request, camera):
sequence = int(request.match_info['sequence'])
segment = camera._streamer.get_segment(sequence)
if not segment:
return web.HTTPNotFound()
headers = {
'Access-Control-Allow-Origin': '*',
'Accept-Ranges': 'bytes',
'Content-Type': 'video/mp2t',
'Content-Length': str(segment.segment.getbuffer().nbytes),
'Content-Disposition': "inline; filename=\"{}{}.ts\"".format(camera.entity_id, sequence),
}
return web.Response(body=segment.segment.getvalue(), headers=headers)
@attr.s
class HlsSegment:
"""Represent a MPEG2TS segment."""
sequence = attr.ib(type=int)
segment = attr.ib(type=io.BytesIO)
duration = attr.ib(type=float)
def hls_worker(loop, streamer, quit_event):
# Make an output stream using the input as a template. This copies the stream
# setup from one to the other.
import av
in_stream = streamer.container.streams.video[0]
segment = io.BytesIO()
output = av.open(segment, mode='w', format='mpegts')
out_stream = output.add_stream(template=in_stream)
first_packet = True
sequence = 1
while not quit_event.is_set():
packet = next(streamer.container.demux(in_stream))
# We need to skip the "flushing" packets that `demux` generates.
if packet.dts is None:
continue
# Mux on every keyframe
if packet.stream.type == 'video' and packet.is_keyframe and not first_packet:
segment_duration = (packet.pts * packet.time_base) / sequence
output.close()
asyncio.run_coroutine_threadsafe(streamer._put_segment(HlsSegment(
sequence, segment, segment_duration
)), loop)
segment = io.BytesIO()
output = av.open(segment, mode='w', format='mpegts')
out_stream = output.add_stream(template=in_stream)
sequence += 1
# First packet tends to have a weird dts/pts
if first_packet:
packet.dts = 0
packet.pts = 0
first_packet = False
# We need to assign the packet to the new stream.
packet.stream = out_stream
output.mux(packet)
# Save packet so we can get image
if packet.stream.type == 'video' and packet.is_keyframe:
asyncio.run_coroutine_threadsafe(streamer._put_packet(bytes(packet)), loop)
class HlsStreamer:
"""
A media source that reads audio and/or video from a live stream.
Examples:
.. code-block:: python
# Open a video file.
player = HlsStreamer('/path/to/some.mp4')
# Open an HTTP stream.
player = HlsStreamer(
'http://download.tsi.telecom-paristech.fr/'
'gpac/dataset/dash/uhd/mux_sources/hevcds_720p30_2M.mp4')
# Open an RTSP stream.
player = HlsStreamer(
'rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov', options={
'rtsp_transport': 'tcp',
})
# Open webcam on Linux.
player = HlsStreamer('/dev/video0', format='v4l2', options={
'video_size': '640x480'
})
# Open webcam on OS X.
player = HlsStreamer('default:none', format='avfoundation', options={
'video_size': '640x480'
})
:param: file: The path to a file, or a file-like object.
:param: format: The format to use, defaults to autodect.
:param: options: Additional options to pass to FFmpeg.
"""
def __init__(self, loop, file, format=None, options={}):
import av
print("Opening File: %s" % file)
self.__loop = loop
self.__file = file
self.__container = av.open(file=file, format=format, mode='r', options=options)
self.__thread = None
self.__thread_quit = None
self.__cursor = None
self.__packet_cursor = None
self.__packet = None
self.__event = asyncio.Event()
self.__segments = []
print("Container Format %s" % self.__container.format.name)
self.__video_codec = None
for stream in self.__container.streams:
print(stream)
if stream.type == 'video' and not self.__video_codec:
self.__video_codec = stream.codec_context
self._start()
@property
def container(self):
return self.__container
@property
def segments(self):
return [s.sequence for s in self.__segments]
@property
def target_duration(self):
durations = [s.duration for s in self.__segments]
return round(sum(durations) / len(self.__segments)) or 1
async def get_image(self):
import av
if self.__cursor is None or self.__cursor >= self.__packet_cursor:
await self.__event.wait()
self.__cursor = self.__packet_cursor
packet = av.Packet(self.__packet)
frames = self.__video_codec.decode(packet)
if not len(frames):
return None
return frames[0].to_image()
def get_segment(self, sequence=None):
if not sequence:
return self.__segments
for segment in self.__segments:
if segment.sequence == sequence:
return segment
return None
async def _put_packet(self, packet):
if packet is None:
self.stop()
if self.__packet_cursor is None:
self.__packet_cursor = 0
else:
self.__packet_cursor += 1
self.__packet = packet
self.__event.set()
self.__event.clear()
async def _put_segment(self, segment):
self.__segments.append(segment)
if len(self.__segments) > 3:
self.__segments = self.__segments[-3:]
def _start(self):
if self.__thread is None:
self.__thread_quit = threading.Event()
self.__thread = threading.Thread(
name='hls-streamer',
target=hls_worker,
args=(
self.__loop, self, self.__thread_quit))
self.__thread.start()
def stop(self):
if self.__thread is not None:
self.__thread_quit.set()
self.__thread.join()
self.__thread = None
class M3U8Renderer(object):
def __init__(self, root, camera, version=3, lookback=4):
self.root = root
self.camera = camera
self.target_duration = camera._streamer.target_duration
self.version = version
self.lookback = lookback
def render_format_identifier(self):
return ["#EXTM3U"]
def render_preamble(self):
return [
"#EXT-X-VERSION:{}".format(self.version),
"#EXT-X-TARGETDURATION:{}".format(self.target_duration),
]
def render_segment(self, segment):
return [
"#EXTINF:{:.04},".format(float(segment.duration)),
"/api/hls/{}/segment/{}?token={}".format(
self.camera.entity_id,
segment.sequence, self.camera.access_tokens[-1]),
]
def render_playlist(self, track, start_time):
segments = track.segments
if not segments:
return []
rv = ["#EXT-X-MEDIA-SEQUENCE:{}".format(segments[0])]
for sequence in segments:
segment = track.get_segment(sequence)
rv.extend(self.render_segment(segment))
return rv
def render(self, track, start_time):
lines = (
self.render_format_identifier() +
self.render_preamble() +
self.render_playlist(track, start_time)
)
return "\n".join(lines) + "\n"
@sdavides
Copy link

How to use?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment