Skip to content

Instantly share code, notes, and snippets.

@BananaHemic
Created August 4, 2025 20:35
Show Gist options
  • Save BananaHemic/762099dbcaa2765b6aeb209c81056e11 to your computer and use it in GitHub Desktop.
Save BananaHemic/762099dbcaa2765b6aeb209c81056e11 to your computer and use it in GitHub Desktop.
Per Participant Live Video Python Sample
import asyncio
import httpx
from websockets.asyncio.server import serve
from fastapi import FastAPI, WebSocket, Request, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import uvicorn
import argparse
import av
import json
import time
import websockets
import base64
import cv2
import pygame
from OpenGL.GL import *
from OpenGL.GLUT import *
from dotenv import load_dotenv
load_dotenv()
RECALL_KEY = "YOUR_API_KEY"
RECALL_URL = "https://us-east-1.recall.ai/api/v1/"
MAX_WIDTH = 1280
MAX_HEIGHT = 720
pygame.init()
pygame.display.set_mode((MAX_WIDTH, MAX_HEIGHT), pygame.OPENGL | pygame.DOUBLEBUF)
pygame.display.set_caption("Recall Video Stream Player")
texture_id = glGenTextures(1)
glBindTexture(GL_TEXTURE_2D, texture_id)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MIN_FILTER, GL_LINEAR)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MAG_FILTER, GL_LINEAR)
glEnable(GL_TEXTURE_2D)
glMatrixMode(GL_PROJECTION)
glLoadIdentity()
glOrtho(0, MAX_WIDTH, 0, MAX_HEIGHT, -1, 1)
glMatrixMode(GL_MODELVIEW)
glLoadIdentity()
class H264StreamDecoder:
def __init__(self):
self.codec = av.codec.CodecContext.create('h264', 'r')
def decode(self, h264_bytes: bytes):
frames = []
try:
packets = self.codec.parse(h264_bytes)
for packet in packets:
frames.extend(self.codec.decode(packet))
for frame in frames:
print(f"Decoded frame: Size={frame.width}x{frame.height}")
except Exception as e:
print(f"Decoding error: {e}")
return frames
id_2_decoder = {}
bot_id = None
participant_id = None
app = FastAPI()
f = open("video.h264", "wb")
@app.post("/zoom-webhook")
async def handle_zoom_webhook(request: Request):
return {}
def handle_participant_video(data):
decoder = id_2_decoder.get(data["participant"]["id"])
if decoder is None:
print("No decoder for", data["participant"])
return
if data["participant"]["id"] != participant_id:
return
h264_bytes = base64.b64decode(data["buffer"])
f.write(h264_bytes)
#print("compressed frame len", len(h264_bytes))
frames = decoder.decode(h264_bytes)
for frame in frames:
if frame.is_corrupt:
print("ERROR: received corrupt frame")
continue
#img = frame.to_ndarray(format="bgr24")
#cv2.imshow(data["participant"]["name"], img)
img = frame.to_ndarray(format="rgb24")
# Upload to GPU as texture
glBindTexture(GL_TEXTURE_2D, texture_id)
glTexImage2D(GL_TEXTURE_2D, 0, GL_RGB, frame.width, frame.height, 0, GL_RGB, GL_UNSIGNED_BYTE, img)
# Clear screen and draw textured quad
glClear(GL_COLOR_BUFFER_BIT)
glBegin(GL_QUADS)
glTexCoord2f(0, 1); glVertex2f(0, 0)
glTexCoord2f(1, 1); glVertex2f(frame.width, 0)
glTexCoord2f(1, 0); glVertex2f(frame.width, frame.height)
glTexCoord2f(0, 0); glVertex2f(0, frame.height)
glEnd()
pygame.display.flip()
def handle_participant_join(data):
global participant_id
print(f"Participant join name {data['participant']['name']}, id {data['participant']['id']}")
participant_id = data["participant"]["id"]
id_2_decoder[participant_id] = H264StreamDecoder()
@app.websocket("/recall-websocket")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
print(f"WebSocket connected: {websocket.client.host}")
try:
while True:
txt = await websocket.receive_text()
d = json.loads(txt)
event = d["event"]
data = d["data"]
print(f"Received WebSocket message from {websocket.client.host}: {event}")
if event == "participant_events.join":
handle_participant_join(data["data"])
elif event == "video_separate_h264.data":
handle_participant_video(data["data"])
elif event == "participant_events.webcam_off":
print("webcam off for", data["data"]["participant"]["name"])
elif event == "participant_events.webcam_on":
print("webcam on for", data["data"]["participant"]["name"])
else:
print(f"Unhandled event {event}")
except WebSocketDisconnect:
print(f"WebSocket disconnected: {websocket.client.host}")
except Exception as e:
print(f"WebSocket error with {websocket.client.host}: {e}")
async def main(url: str):
#await start_server(url)
config = uvicorn.Config(app, host="0.0.0.0", port=8001, loop="asyncio", ws_max_size=128 * 1024 * 1024)
server = uvicorn.Server(config)
await server.serve()
async def start_server(url):
payload = {
"meeting_url": url,
"bot_name": "Brendan Meeting Notetaker",
"recording_config": {
"transcript": {
"metadata": { "additionalProp": "string" },
"provider": {
"meeting_captions": {}
},
"diarization": { "use_separate_streams_when_available": False }
},
"realtime_endpoints": [
{
"type": "websocket",
"url": "wss://mullet-deep-duly.ngrok-free.app/recall-websocket",
#"events": ["video_separate_png.data", "participant_events.join", "participant_events.leave", "participant_events.chat_message", "participant_events.webcam_on", "participant_events.webcam_off"]
"events": ["video_separate_h264.data", "participant_events.join", "participant_events.leave", "participant_events.chat_message", "participant_events.webcam_on", "participant_events.webcam_off"]
#"events": ["video_separate_h264.data", "video_separate_png.data", "participant_events.join", "participant_events.leave", "participant_events.chat_message", "participant_events.webcam_on", "participant_events.webcam_off"]
}
],
"retention": {
"type": "timed",
"hours": 12
},
"video_mixed_layout": "gallery_view_v2",
"start_recording_on": "participant_join",
"video_separate_h264": {},
#"video_separate_png": {},
"include_bot_in_recording": { "audio": False },
},
"chat": {
"on_bot_join": {
"send_to": "everyone",
"message": "string",
"pin": False
},
"on_participant_join": {
"message": "string",
"exclude_host": True
}
},
"automatic_leave": {
"waiting_room_timeout": 1200,
"noone_joined_timeout": 1200,
"everyone_left_timeout": {
"timeout": 2,
"activate_after": 40
},
"in_call_not_recording_timeout": 3600,
"in_call_recording_timeout": 90,
"recording_permission_denied_timeout": 60,
"silence_detection": {
"timeout": 3600,
"activate_after": 1200
},
"bot_detection": {
"using_participant_events": {
"timeout": 600,
"activate_after": 1200
},
"using_participant_names": {
"timeout": 60,
"activate_after": 900,
"matches": ["string"]
}
}
},
"variant": {
"zoom": "web",
"google_meet": "web",
"microsoft_teams": "web",
"webex": "web"
},
"zoom": {
},
"google_meet": {
},
"metadata": { "additionalProp": "string" }
}
headers = {
"accept": "application/json",
"content-type": "application/json",
"Authorization": RECALL_KEY
}
async with httpx.AsyncClient() as client:
url = RECALL_URL + "bot"
response = await client.post(url, json=payload, headers=headers)
print("Startup POST response:", response.status_code, response.text)
bot_id = response.json()["id"]
print("Bot ID:", bot_id)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="websocket")
parser.add_argument("url", help="Meeting URL")
args = parser.parse_args()
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If already running (e.g. Jupyter), schedule main()
loop.create_task(main(args.url))
else:
loop.run_until_complete(main(args.url))
except KeyboardInterrupt:
print("Shutting down...")
pygame.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment