Last active
June 24, 2022 18:56
-
-
Save thevickypedia/22b8c19453d638d2aa1a5712964a4d50 to your computer and use it in GitHub Desktop.
Stream videos using FastAPI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
pip install fastapi uvicorn aiofiles jinja2 | |
uvicorn video:app --reload | |
""" | |
import mimetypes | |
import logging | |
import os | |
import pathlib | |
from fastapi import FastAPI, Request, Response, Header | |
from fastapi.templating import Jinja2Templates | |
app = FastAPI() | |
templates = Jinja2Templates(directory=os.getcwd()) | |
CHUNK_SIZE = 1024 * 1024 | |
video_path = pathlib.Path("video.mp4") | |
BROWSER = {} | |
class VideoFilter(logging.Filter): | |
"""Class to initiate ``/investment`` filter in logs while preserving other access logs. | |
>>> VideoFilter | |
See Also: | |
- Overrides logging by implementing a subclass of ``logging.Filter`` | |
- The method ``filter(record)``, that examines the log record and returns True to log it or False to discard it. | |
""" | |
def filter(self, record: logging.LogRecord) -> bool: | |
"""Filter out logging at ``/investment?token=`` from log streams. | |
Args: | |
record: ``LogRecord`` represents an event which is created every time something is logged. | |
Returns: | |
bool: | |
False flag for the endpoint that needs to be filtered. | |
""" | |
return record.getMessage().find("/video") == -1 | |
logging.getLogger("uvicorn.access").addFilter(VideoFilter()) | |
@app.get("/") | |
async def read_root(request: Request) -> templates.TemplateResponse: | |
"""Reads the root request to render HTMl page. | |
Args: | |
request: Request class. | |
Returns: | |
templates.TemplateResponse: | |
Template response. | |
""" | |
BROWSER["agent"] = request.headers.get("sec-ch-ua") | |
return templates.TemplateResponse("index.htm", context={"request": request}) | |
@app.get("/video") | |
async def video_endpoint(range: str = Header(None)) -> Response: | |
"""Opens the video file to stream the content. | |
Args: | |
range: Header information. | |
Returns: | |
Response: | |
Response class. | |
""" | |
start, end = range.replace("bytes=", "").split("-") | |
start = int(start) | |
end = int(end) if end else start + CHUNK_SIZE | |
with open(video_path, "rb") as video: | |
video.seek(start) | |
if BROWSER.get("agent") and "chrome" in BROWSER["agent"].lower(): | |
data = video.read() | |
else: | |
data = video.read(end - start) | |
file_size = str(video_path.stat().st_size) | |
headers = { | |
'Content-Range': f'bytes {str(start)}-{str(end)}/{file_size}', | |
'Accept-Ranges': 'bytes' | |
} | |
return Response(content=data, status_code=206, headers=headers, | |
media_type=mimetypes.guess_type(video_path, strict=True)[0] or "video/mp4") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment