|
from typing import AsyncGenerator, Dict |
|
|
|
from fastapi import FastAPI, Request, Response |
|
from fastapi.encoders import jsonable_encoder |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from starlette.responses import StreamingResponse |
|
|
|
import asyncio |
|
|
|
from nats.aio.client import Client as NATS |
|
from nats.aio.client import Msg |
|
|
|
|
|
#from prisma import Client |
|
|
|
|
|
from pydantic import BaseModel |
|
|
|
|
|
#class Progress(BaseModel): |
|
# frame = str |
|
# fps = str |
|
# stream_0_0_q = str |
|
# bitrate= str |
|
# total_size = str |
|
# out_time_ms= str |
|
# out_time = str |
|
# dup_frames = str |
|
# drop_frames = str |
|
# speed = str |
|
# progress = str |
|
|
|
|
|
|
|
app = FastAPI() |
|
#prisma = Client(auto_register=True) |
|
nc = NATS() |
|
|
|
|
|
|
|
origins = [ |
|
"http://localhost", |
|
"http://localhost:8000", |
|
"http://localhost:3000", |
|
] |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup() -> None: |
|
#await prisma.connect() |
|
await nc.connect("nats://localhost:4222") |
|
|
|
|
|
@app.on_event("shutdown") |
|
async def shutdown() -> None: |
|
#if prisma.is_connected(): |
|
# await prisma.disconnect() |
|
if nc.is_connected(): |
|
await nc.close() |
|
|
|
|
|
|
|
|
|
### https://gist.github.com/herberthamaral/f1ec6e2faef904ed2d244a28faa55808 |
|
async def subscription_to_generator(nats: NATS, topic: str) -> AsyncGenerator[Msg, Msg]: |
|
queue: asyncio.Queue[Msg] = asyncio.Queue() |
|
|
|
async def subscription_callback(msg: Msg) -> None: |
|
await queue.put(msg) |
|
await nats.subscribe(topic, "workers", cb=subscription_callback) |
|
|
|
while True: |
|
try: |
|
item = await asyncio.wait_for(queue.get(), timeout=60) |
|
queue.task_done() |
|
yield item |
|
except asyncio.TimeoutError: |
|
await nats.close() |
|
yield None |
|
|
|
|
|
async def event_stream(request: Request, endcode_id: int) -> AsyncGenerator[str, str]: |
|
messages = subscription_to_generator(nc, f'encode') |
|
|
|
async for msg in messages: |
|
if msg is None or await request.is_disconnected(): |
|
return |
|
msg_data = msg.data.decode() |
|
data_str = f'data: {msg_data}\n\n' |
|
yield data_str |
|
|
|
|
|
|
|
|
|
@app.get('/stream') |
|
async def stream(request: Request) -> StreamingResponse: |
|
return StreamingResponse(event_stream(request, "encode"), media_type='text/event-stream') |