Last active
August 7, 2019 07:31
-
-
Save dzil123/0eed50967931bcd44d6c4c70004752a4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from typing import AsyncGenerator, Dict, List, Tuple | |
import cv2 | |
import uvicorn | |
from starlette.applications import Starlette | |
from starlette.responses import HTMLResponse | |
# ----------------------------------------------------------------------------- | |
# Reusable ASGI framework | |
# Takes care of waiting to start sending | |
# Nice way to be peacefully notified when the client has left | |
# Usage: ``` | |
# async with ASGILifespan(receive) as lifespan: | |
# # do your thing, and periodically, do | |
# if lifespan.end: | |
# # cleanup tasks | |
# return | |
# ``` | |
class ASGILifespan: | |
def __init__(self, receive): | |
self._receive = receive | |
self._task = None | |
@staticmethod | |
def is_msg_start(message): | |
return (message["type"] == "http.request") and (not message["more_body"]) | |
@staticmethod | |
def is_msg_end(message): | |
return message["type"] == "http.disconnect" | |
# Blocks until it is time to start the response | |
# Private, internal use only | |
async def _task_start(self): | |
while True: | |
message = await self._receive() | |
if self.is_msg_start(message) or self.is_msg_end(message): | |
return | |
# Blocks until it is time to end the response | |
# Private, internal use only | |
async def _task_end(self): | |
while True: | |
message = await self._receive() | |
if self.is_msg_end(message): | |
return | |
# Blocks until it is time to end the response | |
# Why would you use this, though? | |
async def wait_end(self): | |
if self.end: | |
return | |
await self._task | |
# Returns True if it is time to end the response | |
@property | |
def end(self): | |
if self._task is None: | |
return True # Invalid state | |
return self._task.done() | |
# Blocks until it is time to start the response | |
async def __aenter__(self): | |
await self._task_start() | |
if self._task is None: | |
self._task = asyncio.ensure_future(self._task_end()) | |
return self | |
async def __aexit__(self, *exc_info): | |
if self._task is not None: | |
self._task.cancel() | |
self._task = None | |
# Takes care of all headers, preamble, and postamble | |
# Usage: ``` | |
# async with ASGIApplication(receive, send) as app: | |
# # do your thing, and periodically, do | |
# if app.end: | |
# # cleanup tasks | |
# return | |
# ``` | |
class ASGIApplication(ASGILifespan): | |
def __init__(self, receive, send, *, status=200, headers={}): | |
self._send = send | |
self._status = status | |
self._headers = headers | |
super().__init__(receive) | |
@staticmethod | |
def _encode_bytes(val): | |
return val.encode("latin-1") | |
@classmethod | |
def _convert_headers(cls, headers={}): | |
return [ | |
(cls._encode_bytes(k), cls._encode_bytes(v)) for k, v in headers.items() | |
] | |
async def send(self, data): | |
await self._send( | |
{"type": "http.response.body", "body": data, "more_body": True} | |
) | |
async def __aenter__(self): | |
await super().__aenter__() | |
await self._send( | |
{ | |
"type": "http.response.start", | |
"status": self._status, | |
"headers": self._convert_headers(self._headers), | |
} | |
) | |
return self | |
async def __aexit__(self, *exc_info): | |
await self._send({"type": "http.response.body"}) | |
return await super().__aexit__(*exc_info) | |
# Takes care of streaming with multipart/x-mixed-replace | |
# Usage: ``` | |
# async with ASGIStreamer(receive, send) as app: | |
# # do your thing, and periodically, do | |
# if app.end: | |
# # cleanup tasks | |
# return | |
# ``` | |
class ASGIStreamer(ASGIApplication): | |
def __init__(self, receive, send, *, boundary="frame", status=200, headers={}): | |
self._boundary = self._encode_bytes(f"\r\n--{boundary}\r\n") | |
headers["Content-Type"] = f"multipart/x-mixed-replace; boundary={boundary}" | |
headers["Connection"] = "close" | |
super().__init__(receive, send, status=status, headers=headers) | |
async def send(self, data): | |
await super().send(self._boundary + data) | |
# ----------------------------------------------------------------------------- | |
# An ASGI application that streams mjpeg from a jpg iterable | |
class MjpegResponse: | |
HEADERS = ASGIApplication._encode_bytes("Content-Type: image/jpeg\r\n\r\n") | |
def __init__(self, src): | |
self.src = src | |
async def __call__(self, scope, receive, send): | |
async with ASGIStreamer(receive, send) as app: | |
for img in self.src: | |
if app.end: | |
return | |
await app.send(self.HEADERS + img) | |
# ----------------------------------------------------------------------------- | |
# OpenCV wrappers | |
def mat_to_jpg(mat): | |
return cv2.imencode(".jpg", mat)[1].tobytes() | |
def camera_to_jpg(cam): | |
yield from map(mat_to_jpg, cam) | |
# Context Manager and iterable | |
# Usage: ``` | |
# with Camera(0) as cam: | |
# # cam is now an iterable of mats | |
# ``` | |
class Camera: | |
def __init__(self, *args, **kwargs): | |
self.args = args | |
self.kwargs = kwargs | |
self.cam = None | |
def __iter__(self): | |
return self | |
def __next__(self): | |
return self.cam.read()[1] | |
def __enter__(self): | |
if self.cam is None: | |
self.cam = cv2.VideoCapture(*self.args, **self.kwargs) | |
return self | |
def __exit__(self, *exc_info): | |
if self.cam is None: | |
return | |
self.cam.release() | |
# ----------------------------------------------------------------------------- | |
# Application | |
class WebServer: | |
IMAGE_URL = "/image.mjpeg" | |
def __init__(self, cam): | |
self.cam = camera_to_jpg(cam) | |
self.app = Starlette() | |
self.app.debug = True | |
self.app.route(self.IMAGE_URL)(self.image) | |
self.app.route("/")(self.index) | |
def image(self, request): | |
return MjpegResponse(self.cam) | |
def index(self, request): | |
return HTMLResponse( | |
f"""<html><title>Hello</title><body><h1>Hello</h1><br/><img src="{self.IMAGE_URL}"/></body></html>""" | |
) | |
def desktop_demo(): | |
try: | |
with Camera(0) as cam: | |
for img in cam: | |
cv2.imshow("Asdf", img) | |
if cv2.waitKey(1) & 0xFF == ord("q"): | |
break | |
finally: | |
cv2.destroyAllWindows() | |
def web_demo(): | |
with Camera(0) as cam: | |
server = WebServer(cam) | |
uvicorn.run(server.app) | |
# ----------------------------------------------------------------------------- | |
def main(): | |
web_demo() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment