Created
September 9, 2025 14:37
-
-
Save cbonesana/49e92a2722f33732a528f0c61772401d to your computer and use it in GitHub Desktop.
FastAPI streaming proxy example with support for streaming responses.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Execute as a normal script, or subdivided using uvicorn: | |
Together: | |
python streams.py | |
GENERATOR: | |
uvicorn idk:app_gen --host localhost --port 8000 | |
PROXY: | |
uvicorn idk:app_proxy --host localhost --port 7000 | |
CLIENT: | |
CLIENT_ONLY=1 python streams.py | |
""" | |
from typing import AsyncGenerator, Any | |
from fastapi import FastAPI | |
from fastapi.responses import StreamingResponse | |
from fastapi.requests import Request | |
from httpx import AsyncClient | |
from pydantic import BaseModel | |
import asyncio | |
import logging | |
import os | |
import uvicorn | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s %(levelname)7s %(name)10s: %(message)s", | |
) | |
CLIENT_ONLY: int = int(os.environ.get("CLIENT_ONLY", "0")) | |
class InData(BaseModel): | |
start: int | |
end: int | |
step: int = 1 | |
sleep: float = 1.0 | |
class RangeChunk(BaseModel): | |
i: int | |
n: int | |
PORT_GENERATOR: int = 8000 | |
PORT_PROXY: int = 7000 | |
app_gen = FastAPI() | |
@app_gen.post("/stream") | |
async def gen_stream(in_data: InData, req: Request) -> StreamingResponse: | |
LOGGER = logging.getLogger("generator") | |
LOGGER.info(f"<- {dict(req.headers)}") | |
LOGGER.info(f"<- {in_data}") | |
async def range_stream(data: InData) -> AsyncGenerator[bytes, Any]: | |
n = 0 | |
for i in range(data.start, data.end, data.step): | |
obj = RangeChunk(i=i, n=n) | |
n += 1 | |
LOGGER.info(f"-> {obj}") | |
yield obj.model_dump_json().encode() | |
await asyncio.sleep(data.sleep) | |
return StreamingResponse( | |
range_stream(in_data), | |
media_type="application/json", | |
) | |
app_proxy = FastAPI() | |
@app_proxy.post("/stream") | |
async def proxy_stream(req: Request) -> StreamingResponse: | |
LOGGER = logging.getLogger("proxy") | |
body = await req.body() | |
LOGGER.info(f"<- {body}") | |
headers = {k: v for k, v in req.headers.items() if k.lower != "host"} | |
LOGGER.info(f"<- {headers}") | |
async def response_stream() -> AsyncGenerator[bytes, None]: | |
async with AsyncClient() as client: | |
async with client.stream( | |
"POST", | |
f"http://localhost:{PORT_GENERATOR}/stream", | |
content=body, | |
headers=headers, | |
timeout=None, | |
) as r: | |
LOGGER.info(f"-> {r.headers}") | |
async for chunk in r.aiter_raw(): | |
if await req.is_disconnected(): | |
LOGGER.warning("xx response stream closed") | |
break | |
LOGGER.info(f"-> {chunk}") | |
yield chunk | |
return StreamingResponse( | |
response_stream(), | |
) | |
async def run(app: FastAPI, port: int) -> None: | |
config = uvicorn.Config( | |
app, | |
host="localhost", | |
port=port, | |
log_level="error", | |
) | |
server = uvicorn.Server(config) | |
await server.serve() | |
async def client() -> None: | |
LOGGER = logging.getLogger("client") | |
in_data = InData( | |
start=1, | |
end=10, | |
step=2, | |
sleep=0.2, | |
) | |
LOGGER.info(f"-> {in_data}") | |
async with AsyncClient() as client: | |
async with client.stream( | |
"POST", | |
f"http://localhost:{PORT_PROXY}/stream", | |
content=in_data.model_dump_json(), | |
timeout=None, | |
) as r: | |
r.raise_for_status() | |
LOGGER.info(f"<- {r.status_code}") | |
LOGGER.info(f"<- {r.headers}") | |
async for chunk in r.aiter_bytes(): | |
LOGGER.info(f"<- {chunk}") | |
async def main() -> None: | |
task_gen = asyncio.create_task(run(app_gen, PORT_GENERATOR)) | |
task_proxy = asyncio.create_task(run(app_proxy, PORT_PROXY)) | |
await asyncio.sleep(2.0) | |
task_client = asyncio.create_task(client()) | |
await task_client | |
await task_gen | |
await task_proxy | |
if __name__ == "__main__": | |
try: | |
loop = asyncio.new_event_loop() | |
if CLIENT_ONLY == 1: | |
loop.run_until_complete(client()) | |
else: | |
loop.run_until_complete(main()) | |
except Exception as _: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment