Skip to content

Instantly share code, notes, and snippets.

@Stankye
Last active February 24, 2022 22:52
Show Gist options
  • Save Stankye/da4586e2d331f26017999c9ae2669ad6 to your computer and use it in GitHub Desktop.
Save Stankye/da4586e2d331f26017999c9ae2669ad6 to your computer and use it in GitHub Desktop.
ffmpeg -progress nats fastapi sse react toy

Encoding Progess Toy

docker run --name nats --rm -p 4222:4222 -p 8222:8222 nats --http_port 8222

uvicorn main:app --reload

python progress.py

C:\path\ffmpeg\bin\ffmpeg.exe -i .\Big_Buck_Bunny_720_10s_30MB.mp4 out.mkv -progress tcp://localhost:8888

tldr: ffmpeg -progress -> python tcp nats pub -> nats -> fastapi sub -> server side events -> react frontend

from typing import AsyncGenerator, Dict
from fastapi import FastAPI, Request, Response
from fastapi.encoders import jsonable_encoder
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.client import Msg
#from prisma import Client
from pydantic import BaseModel
#class Progress(BaseModel):
# frame = str
# fps = str
# stream_0_0_q = str
# bitrate= str
# total_size = str
# out_time_ms= str
# out_time = str
# dup_frames = str
# drop_frames = str
# speed = str
# progress = str
app = FastAPI()
#prisma = Client(auto_register=True)
nc = NATS()
origins = [
"http://localhost",
"http://localhost:8000",
"http://localhost:3000",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup() -> None:
#await prisma.connect()
await nc.connect("nats://localhost:4222")
@app.on_event("shutdown")
async def shutdown() -> None:
#if prisma.is_connected():
# await prisma.disconnect()
if nc.is_connected():
await nc.close()
### https://gist.github.com/herberthamaral/f1ec6e2faef904ed2d244a28faa55808
async def subscription_to_generator(nats: NATS, topic: str) -> AsyncGenerator[Msg, Msg]:
queue: asyncio.Queue[Msg] = asyncio.Queue()
async def subscription_callback(msg: Msg) -> None:
await queue.put(msg)
await nats.subscribe(topic, "workers", cb=subscription_callback)
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=60)
queue.task_done()
yield item
except asyncio.TimeoutError:
await nats.close()
yield None
async def event_stream(request: Request, endcode_id: int) -> AsyncGenerator[str, str]:
messages = subscription_to_generator(nc, f'encode')
async for msg in messages:
if msg is None or await request.is_disconnected():
return
msg_data = msg.data.decode()
data_str = f'data: {msg_data}\n\n'
yield data_str
@app.get('/stream')
async def stream(request: Request) -> StreamingResponse:
return StreamingResponse(event_stream(request, "encode"), media_type='text/event-stream')
import asyncio
import json
import re
import nats
pattern = re.compile('(\S+)\s*=\s*(\S+)')
async def handle_progress(reader, writer):
nc = await nats.connect("nats://localhost:4222")
data = ""
while True:
try:
for l in range(0 , 12):
line = await reader.readuntil(separator=b'\n')
data += line.decode()
tosend = dict(re.findall(pattern, data))
await nc.publish("encode", json.dumps(tosend).encode())
data = ""
except asyncio.exceptions.IncompleteReadError:
break
async def main():
server = await asyncio.start_server(
handle_progress, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())
import { useState, useEffect } from 'react'
function App() {
const [data, setData] = useState([])
useEffect(() => {
const sse = new EventSource('http://localhost:8000/stream', { withCredentials: false });
function getRealtimeData(data) {
console.log(data)
setData(data)
}
sse.onmessage = e => getRealtimeData(JSON.parse(e.data)); sse.onerror = () => {
sse.close();
}
return () => {
sse.close();
};
}, []);
return (
<div className="App">
<p>{data.fps}</p>
<p>{data.frame}</p>
<p>{data.bitrate}</p>
<p>{data.out_time}</p>
<p>{data.progress}</p>
</div>
)
}
export default App
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment